Flink-Table-SQL系列之source

文章目录

source作为Table&SQL API的数据源,同时也是程序的入口。当前Flink的Table&SQL API整体而言支持三种source:Table source、DataSet以及DataStream,它们都通过特定的API注册到Table环境对象。

我们先来看Table source,它直接以表对象作为source。这里的表对象可细分为:

  • Flink以Table类定义的关系表对象,通过TableEnvironment的registerTable方法注册;
  • 外部source经过桥接而成的表对象,基础抽象为TableSource,通过具体环境对象的registerTableSource;

下图展示了,Table source被注册时,对应的内部转化图(虚线表示对应关系):

Table-source-inner-map

由上图可见,不管是直接注册Table对象还是注册外部source,在内部都直接对应了特定的XXXTable对象。

TableSource trait针对Streaming和Batch分部扩展有两个trait,它们是StreamTableSource和BatchTableSource,它们各自都提供了从数据源转换为核心对象(DataStream跟DataSource)的方法。

除了这三个基本的trait之外,还有一些特定对source的需求以独立的trait提供以方便实现者自行组合,比如ProjectableTableSource这一trait,它支持将Projection下推(push-down)到TableSource。Flink内置实现的CsvTableSource就继承了这一trait。

当前Flink所支持的TableSource大致上分为两类:

  • CsvTableSouce:同时可用于Batch跟Streaming模式;
  • kafka系列TableSource:包含Kafka的各个版本(0.8,0.9,0.10)以及各种不同的格式(Json、Avro),基本上它们只支持Streaming模式,它们都依赖于各种kafka的connector;

使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
// specify JSON field names and types
val typeInfo = Types.ROW(
Array("id", "name", "score"),
Array(Types.INT, Types.STRING, Types.DOUBLE)
)
val kafkaTableSource = new Kafka08JsonTableSource(
kafkaTopic,
kafkaProperties,
typeInfo)
tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);

CsvTableSource的构建方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
val csvTableSource = CsvTableSource
.builder
.path("/path/to/your/file.csv")
.field("name", Types.STRING)
.field("id", Types.INT)
.field("score", Types.DOUBLE)
.field("comments", Types.STRING)
.fieldDelimiter("#")
.lineDelimiter("$")
.ignoreFirstLine
.ignoreParseErrors
.commentPrefix("%")

除了以TableSource作为Table&SQL的source,还支持通过特定的环境对象直接注册DataStream、DataSet。注册DataStream的示例如下:

1
2
3
4
5
6
7
8
9
10
11
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val cust = env.fromElements(...)
val ord = env.fromElements(...)
// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)
// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)

注册DataSet的示例如下:

1
2
3
4
5
6
7
8
9
10
11
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val cust = env.fromElements(...)
val ord = env.fromElements(...)
// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)
// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)

以上,通过调用环境对象的registerXXX方法是一种显式注册的方式,除此之外,还有隐式注册方式。隐式注册方式,通过对DataStream跟DataSet对象增加的toTable方法来实现,使用方式示例如下:

1
2
3
4
5
6
7
8
9
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// read a DataSet from an external source
val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...)
val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sql(
s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

我们知道DataStream跟DataSet原先是没有toTable API的,如何为它们增加该API的呢?答案是利用了Scala的包对象(package object),该特性主要用于兼容旧版本的库或对某些类型的API进行增强。具体而言,toTable API其实是实现在DataSetConversions和DataStreamConversions两个类中,然后在包对象中对他们进行实例化。而定位到toTable的实现时,会看到它们其实是间接调用了特定环境对象的fromDataStream/fromDataSet方法并将当前的DataStream跟DataSet传递给这两个方法并通过方法返回得到Table对象。fromDataStream/fromDataSet方法对在实现时会调用跟registerDataStream/registerDataSet方法对相同的内部注册方法。

fromDataStream/fromDataSet方法通常主要的场景在于为DataStream/DataSet转换为Table对象提供便利,它本身也进行了隐式注册。然而,你也可以对得到的Table对象,再次调用registerTable来进行显式注册,不过通常没有必要。

因此,综合而言,注册DataStream跟DataSet的对应关系如下:

DataSet-DataStream-inner-table-mapping

以上我们已经分析了所有的Table source的注册方式,有多种register系列方法并最终对应了内部各种XXXTable对象。稍显混乱,其实这些XXXTable对象是有联系的,并且所有的register系列方法最终都调用了TableEnvironment的registerTableInternal方法。因此其实注册Table source的内部原理是一致的,我们来分析一下。

TableEnvironment内部会以一个SchemaPlus类型的数据结构,它是Calcite中的数据结构,用来存储被注册的表、函数等在内的一系列对象(这些对象统称为Calcite中的Schema)。由此可见它无法直接接受Flink自定义的类似于TableSouce这样的对象,那么这里存在一个问题就是两个框架的衔接问题。这就是Flink定义那么多内部XXXTable类型的原因之一,我们来梳理一下它们之间的关系如下:

table-source-mapping-and-relationship

上图中的XXXTable对象同时以括号标明了在注册时它是由什么对象转化而来。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group