Flink关系型API的公共部分

文章目录
  1. 1. 关系型程序的公共部分
    1. 1.1. TableEnvironment
    2. 1.2. catalog
    3. 1.3. source
    4. 1.4. sink

关系型程序的公共部分

下面的代码段展示了Table&SQL API所编写流式程序的程序模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建TableEnvironment对象
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注册表
tableEnv.registerTable("table1", ...) //或者
tableEnv.registerTableSource("table2", ...) //或者
tableEnv.registerExternalCatalog("extCat", ...)
//基于Table API的查询创建Table对象
val tapiResult = tableEnv.scan("table1").select(...)
//从SQL查询创建Table
val sqlResult = tableEnv.sql("SELECT ... FROM table2 ...")
//将Table API的查询到的结果表输出到TableSink,SQL查询到的结果表同样如此
tapiResult.writeToSink(...)
//触发执行
env.execute()

通过分步解读以上代码段,我们可以发现一个关系型的Flink程序大致分为如下几步:

  • 构建环境对象
  • 注册表、catalog相关的信息(source部分)
  • 调用Table&SQL API创建表、对表进行查询
  • 得到结果表的数据并输出(sink部分)
  • 调用环境对象的execute方法触发程序执行

这几步中,跟关系型API有关的2 ~ 4步,我们发现在一个关系型的程序中,用户既可以混合使用Table&SQL API,并且除了后端环境对象的不同,TableEnvironment相关的部分在API层面上具有相同的抽象,也就是说,一套程序主体既可以适用于batch模式也可以适用于Streaming模式,这对用户而言也许更具吸引力。接下来,我们将对关系型API相关的几步进行解读。

TableEnvironment

跟streaming和batch程序一样,关系型程序也会要求先构建一个环境对象。因为Flink致力于为streaming和batch提供统一的关系型API,因此关系型程序只有唯一的环境对象TableEnvironment。

但具体到一些内部实现上,streaming跟batch还是有着较大的差异。所以,TableEnvironment针对两者又扩展了StreamTableEnvironment和BatchTableEnvironment这两个抽象类。这两个类主要提供streaming和batch的特定语义,比如提供DataSet、DataStream跟Table之间的转换。

最终的关系型程序中,原先streaming跟batch的环境对象和TableEnvironment对象都是必须的,它们承担着不同的职责:

  • streaming/batch 环境对象:辅助构建Table环境对象、触发程序执行调用、构建DataStream、DataSet;
  • TTableEnvironment对象:构建关系型程序的主体逻辑;

catalog

在Calcite中存在多个概念,其中一个概念就是“catalog”。从关系型的观点上来看,catalog处于所有的schema(外部的、概念上的、内部的)以及mapping(外部与概念以及概念与内部之间)之上的[1]。从SQL标准的角度来看,catalog在一个SQL环境中被称之为schame的集合,一个SQL环境包含零个或多个catalog,而一个catalog包含一个或多个schema(总是会包含一个名为“INFORMATION_SCHEMA”的schema)[2]。

在Calcite中,catalog定义了可在SQL查询中被访问的元数据跟命名空间。其中包含了如下几个概念:

  • Schema:一个定义了模式与表的集合,可被任意地嵌套
  • Table:表示一个单独的数据集,字段通过RelDataType来定义
  • RelDataType:表示在数据集中的字段,支持所有的SQL数据类型,包括结构体与数组
  • Statistic:提供用于优化的表统计信息

以一个SQL查询为例,来认识一下catalog中包含的那些概念:

在接下来讲source的这一小节,我们将看到被注册进Table&SQL API中当作“表”使用的对象,最终都会被转换为Calcite所识别的Table对象并加入其Schema中。

另外,Flink允许用户注册外部的catalog以提供如何访问外部数据库的相关信息,通过TableEnvironment对象的registerExternalCatalog方法即可注入。外部的catalog必须继承ExternalCatalog这一trait,它相当于外部数据库跟Table&SQL API的一个连接器。而Table&SQL API某种程度上又充当了外部catalog跟Calcite的连接器,整个桥接模式如下图所示:

对应到代码实现上来,Flink会通过一个ExternalCatalogSchema类来完成跟Calcite的catalog API的对接,包括注册跟获取catalog以及内部的子schema等。示例代码如下:

1
2
3
4
5
6
7
8
//获得TableEnvironment对象
val tableEnv = TableEnvironment.getTableEnvironment(env)
//创建一个外部的catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog
//注册
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦外部的catalog被注册到环境对象,Table&SQL API都可以以类似于“catalog.database.table”这样的全限定名来访问表等信息。当前Flink提供了一个基于内存的ExternalCatalogSchema的实现:InMemoryExternalCatalog,它内部维护了两个映射:

  • 数据库映射:数据库名对应ExternalCatalog实例;
  • 表映射:表名对应ExternalCatalogTable实例;

source

source作为Table&SQL API的数据源,同时也是程序的入口。当前Flink的Table&SQL API整体而言支持三种source:Table source、DataSet以及DataStream,它们都通过特定的API注册到Table环境对象。

我们先来看Table source,它直接以表对象作为source。这里的表对象可细分为:

  • Flink以Table类定义的关系表对象,通过TableEnvironment的registerTable方法注册;
  • 外部source经过桥接而成的表对象,基础抽象为TableSource,通过具体环境对象的registerTableSource;

下图展示了,Table source被注册时,对应的内部转化图(虚线表示对应关系):

由上图可见,不管是直接注册Table对象还是注册外部source,在内部都直接对应了特定的XXXTable对象。

TableSource trait针对Streaming和Batch分部扩展有两个trait,它们是StreamTableSource和BatchTableSource,它们各自都提供了从数据源转换为核心对象(DataStream跟DataSource)的方法。

除了这三个基本的trait之外,还有一些特定对source的需求以独立的trait提供以方便实现者自行组合,比如ProjectableTableSource这一trait,它支持将Projection下推(push-down)到TableSource。Flink内置实现的CsvTableSource就继承了这一trait。

当前Flink所支持的TableSource大致上分为两类:

  • CsvTableSouce:同时可用于Batch跟Streaming模式;
  • kafka系列TableSource:包含Kafka的各个版本(0.8,0.9,0.10)以及各种不同的格式(Json、Avro),基本上它们只支持Streaming模式,它们都依赖于各种kafka的connector;

使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
// specify JSON field names and types
val typeInfo = Types.ROW(
Array("id", "name", "score"),
Array(Types.INT, Types.STRING, Types.DOUBLE)
)
val kafkaTableSource = new Kafka08JsonTableSource(
kafkaTopic,
kafkaProperties,
typeInfo)
tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);

CsvTableSource的构建方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
val csvTableSource = CsvTableSource
.builder
.path("/path/to/your/file.csv")
.field("name", Types.STRING)
.field("id", Types.INT)
.field("score", Types.DOUBLE)
.field("comments", Types.STRING)
.fieldDelimiter("#")
.lineDelimiter("$")
.ignoreFirstLine
.ignoreParseErrors
.commentPrefix("%")
.build

除了以TableSource作为Table&SQL的source,还支持通过特定的环境对象直接注册DataStream、DataSet。注册DataStream的示例如下:

1
2
3
4
5
6
7
8
9
10
11
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val cust = env.fromElements(...)
val ord = env.fromElements(...)
// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)
// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)

注册DataSet的示例如下:

1
2
3
4
5
6
7
8
9
10
11
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val cust = env.fromElements(...)
val ord = env.fromElements(...)
// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)
// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)

以上,通过调用环境对象的register[DataStream/DataSet]方法是一种显式注册的方式,除此之外,还有隐式注册方式。隐式注册方式,通过对DataStream跟DataSet对象增加的toTable方法来实现,使用方式示例如下:

1
2
3
4
5
6
7
8
9
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// read a DataSet from an external source
val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...)
val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sql(
s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

我们知道DataStream跟DataSet原先是没有toTable API的,如何为它们增加该API的呢?答案是利用了Scala的包对象(package object),该特性主要用于兼容旧版本的库或对某些类型的API进行增强。具体而言,toTable API其实是实现在DataSetConversions和DataStreamConversions两个类中,然后在包对象中对他们进行实例化。而定位到toTable的实现时,会看到它们其实是间接调用了特定环境对象的fromDataStream/fromDataSet方法并将当前的DataStream跟DataSet传递给这两个方法并通过方法返回得到Table对象。fromDataStream/fromDataSet方法对在实现时会调用跟registerDataStream/registerDataSet方法对相同的内部注册方法。

fromDataStream/fromDataSet方法通常主要的场景在于为DataStream/DataSet转换为Table对象提供便利,它本身也进行了隐式注册。然而,你也可以对通过这对方法得到的Table对象,再次调用registerTable来进行显式注册,不过通常没有必要。

因此,综合而言,注册DataStream跟DataSet的对应关系如下:

以上我们已经分析了所有的Table source的注册方式,有多种register系列方法并最终对应了内部各种XXXTable对象。稍显混乱,其实这些XXXTable对象是有联系的,并且所有的register系列方法最终都调用了TableEnvironment的registerTableInternal方法。因此其实注册Table source的内部原理是一致的,我们来分析一下。

TableEnvironment内部会以一个SchemaPlus类型的数据结构,它是Calcite中的数据结构,用来存储被注册的表、函数等在内的一系列对象(这些对象统称为Calcite中的Schema)。由此可见它无法直接接受Flink自定义的类似于TableSouce这样的对象,那么这里存在一个问题就是两个框架的衔接问题。这就是Flink定义那么多内部XXXTable类型的原因之一,我们来梳理一下它们之间的关系如下:

上图中的XXXTable对象同时以括号标明了在注册时它是由什么对象转化而来。

sink

sink其实跟source是反向的,一个是将数据源接入进来,另一个是将数据写到外部。因此,我们对比着source来看sink,当你实现一个Table&SQL程序并希望将处理之后的结果输出到外部。通常有以下几种方式:

  • 在Table对象上调用writeToSink API,它接收一个TableSink的实例;
  • 将Table再次转换为DataSet/DataStream,然后像输出DataSet/DataStream一样的方式来处理;

TableSink根据后端模式的差别,提供了两种实现:针对batch的BatchTableSink以及针对streaming的多种sink,它们拥有不同的特征,列举如下:

  • AppendStreamTableSink:它只支持插入变更,如果Table对象同时有更新和删除的变更,那么将会抛出TableException;
  • RetractStreamTableSink:它支持输出一个streaming模式的表,该表上可以有插入、更新和删除变更;
  • UpsertStreamTableSink:它支持输出一个streaming模式的表,该表上可以有插入、更新和删除变更,且还要求表要么有唯一的键字段要么是append-only模式的,如果两者都不满足,将抛出TableException;

跟source一样,内置的CsvTableSink同时兼具streaming跟batch的语义。

TableSink主要通过Table 的writeToSink API对外提供能力,然而最终的实现主要还是在特定的环境对象上。对BatchTableSink而言,BatchTableEnvironment会将具体的Table对象转换为DataSet,然后输出:

1
2
3
4
//将Table翻译为DataSet
val result: DataSet[T] = translate(table)(outputType)
//将DataSet给TableSink以使其输出
batchSink.emitDataSet(result)

针对streaming的各种sink则会在StreamTableEnvironment中挨个枚举不同的sink类型进行处理。但步骤跟BatchTableSink类似:先翻译为DataStream然后输出。

在source中可以直接从DataSet/DataStream转换为Table对象一样,同样从Table对象也可以转换为DataSet/DataStream对象。它们的实现手段都是类似的,通过package object对Table API进行增强,以使得Table 具备toDataSet/toXXXStream的API,最终由特定环境对象的toDataSet/toXXXStream方法完整具体的任务。

我们以CsvTableSink来分析一下,具体的emit是如何实现的,概况来讲有两步:

  1. 对数据集或数据流应用map运算符以CsvFormatter格式化器进行格式化;
  2. 再调用DataSet、DataStream的writeAsText sink到文件系统;

真正复杂的是各个Table环境对象中的translate方法,它们用于将Table翻译为DataSet/DataStream,这其中包含将相关的Table API调用以及SQL查询所对应的关系型的表达式树转换成DataSet/DataStream特定的运算符。这并不是本节的重点,我们将在后续对此进行介绍。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group