Apache Flink源码解析之stream-operator

文章目录
  1. 1. StreamOperator
    1. 1.1. StreamSource
      1. 1.1.1. NonTimestampContext
      2. 1.1.2. ManualWatermarkContext
      3. 1.1.3. AutomaticWatermarkContext
    2. 1.2. OneInputStreamOperator
    3. 1.3. TwoInputStreamOperator
    4. 1.4. 辅助实现类
      1. 1.4.1. Output
      2. 1.4.2. TimeCharacteristic
      3. 1.4.3. TimestampedCollector
      4. 1.4.4. AbstractStreamOperator
      5. 1.4.5. AbstractUdfStreamOperator
      6. 1.4.6. ChainingStrategy
    5. 1.5. 内置的Operator实现
      1. 1.5.1. StreamCounter
      2. 1.5.2. StreamProject
      3. 1.5.3. StreamFilter
      4. 1.5.4. StreamMap
      5. 1.5.5. StreamFlatMap
      6. 1.5.6. StreamGroupedFold
      7. 1.5.7. StreamGroupedReduce
      8. 1.5.8. StreamSink
  2. 2. 小结

前面我们谈论了Flink stream中的transformation。你可以将transformation看成编写Flink程序并构建流式处理程序的必要组成部分(静态表现形式);而本篇我们将探讨transformation在Flink运行时对应的动态表现形式——operator。他们之间的映射关系见下图:

flink-stream-operator_transformation-operator

具体的探讨可以查看前文:Flink中的一些核心概念

StreamOperator

所有operator的最终基类,operator的分类方式,按照输入流个数不同分为:

  • 无输入:StreamSource
  • 单个流输入:OneInputStreamOperator
  • 两个流输入:TwoInputStreamOperator

跟生命周期有关的核心抽象方法:

  • setup : 实例化operator
  • open :该方法会在任何元素被处理之前执行,它的实现通常包含了operator的初始化逻辑
  • close :该方法在所有的元素都进入到operator被处理之后调用
  • dispose :该方法在operator生命周期的最后阶段执行,主要用于回收资源

StreamOperator及其实现类中还包含了一些状态恢复与保存相关的逻辑,但这些不是本文的主题,所有暂时不做探讨。

先来看一下整个package的类关系图:

flink-stream-operator_all-class-diagram

我们整个剖析方式大致也按照以上operator的分类方式以及类的层次结构来。

StreamSource

作为一个流处理DAG的起点,source operator相比其他operator无疑是特别的(从类的继承关系图也可以看出来)。

它需要接受SourceFunction的实例。并且我们可以看到,它的chaining strategyHEAD(它表示operator不能有前置operator,但可以作为其他operator的前置operator,下文会谈到)。

1
this.chainingStrategy = ChainingStrategy.HEAD;

StreamSource的实现略显复杂,因为它涉及到我们前面文章谈SourceFunction时谈到的SourceFunction.SourceContext的实现。在这里提供了三个实现,分别对应我们之前谈到的Flink对事件时间的三个分类:

flink-stream-operator_streamsource

  • NonTimestampContext:针对ProcessingTime,该SourceContext将时间戳设置为-1,并且不发射watermark
  • AutomaticWatermarkContext:针对IngestionTime,提供自动的watermark发射机制的SourceContext
  • ManualWatermarkContext:针对EventTime的人工发射watermarkSourceContext

它们之间的对应关系也体现在其run方法的实现中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
switch (timeCharacteristic) {
case EventTime:
ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
break;
case IngestionTime:
ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
break;
case ProcessingTime:
ctx = new NonTimestampContext<>(this, lockingObject, collector);
break;
default:
throw new Exception(String.valueOf(timeCharacteristic));
}

run方法内部会调用SourceFunctionrun方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
try {
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
}

StreamSource通过一个属性:canceledOrStopped来控制sourceFunction的停止。

整个StreamSource的运行逻辑由run来表述,通过cancel来控制停止逻辑。

NonTimestampContext

NonTimestampContext会忽略时间戳,因此它的实现里稍微特别一点的地方在下面的这两个方法:

1
2
3
4
public void collectWithTimestamp(T element, long timestamp) {
// ignore the timestamp
collect(element);
}

以及

1
2
3
4
public void emitWatermark(Watermark mark) {
owner.checkAsyncException();
// do nothing else
}

第一个方法忽略了时间戳,第二个方法不发送watermark

ManualWatermarkContext

无需特别说明

AutomaticWatermarkContext

该类是自动发送watermark的实现,在构造器中接收参数watermarkInterval来指定自动发送watermark的时间间隔。具体的实现机制是,新建一个独立的发射线程,以指定的时间间隔发射:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
this.scheduleExecutor = Executors.newScheduledThreadPool(1);
this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
final long currentTime = System.currentTimeMillis();
if (currentTime > nextWatermarkTime) {
// align the watermarks across all machines. this will ensure that we
// don't have watermarks that creep along at different intervals because
// the machine clocks are out of sync
final long watermarkTime = currentTime - (currentTime % watermarkInterval);
synchronized (lockingObjectParam) {
if (currentTime > nextWatermarkTime) {
outputParam.emitWatermark(new Watermark(watermarkTime));
nextWatermarkTime += watermarkInterval;
}
}
}
}
}, 0, watermarkInterval, TimeUnit.MILLISECONDS);

除了这种基于时间的以固定频率发射watermark的机制,在collect方法被调用时,也会检查当前的时间戳,如果达到发送条件也会触发emit watermark

而因为该类实现的是自动发送,在构造器中实现一个定时发送机制,所以emitWatermark方法也就不需要再实现发送逻辑(因为已不再需要用户程序调用emitWatermark方法了),而该方法在该类中的主要任务是负责停止自动发送。停止自动发送的触发条件是收到最后一个元素的信号(将最后一个元素的时间戳设置为Long.MAX_VALUE),emitWatermark收到该标识后,再将其往下游传递并关闭定时发送线程。

OneInputStreamOperator

单一输入流的operator接口,继承自StreamOperator。提供了两个接口方法:

  • processElement:处理到达该operator的一个元素
  • processWatermark:处理一个Watermark

TwoInputStreamOperator

支持两个流作为输入的operator,同样继承自StreamOperator。扩充了多个接口方法:

  • processElement1 : 处理来自第一个输入的某个元素
  • processElement2 : 处理来自第二个输入的某个元素
  • processWatermark1 : 处理来自第一个输入的一个Watermark
  • processWatermark2 : 处理来自第二个输入的一个Watermark

辅助实现类

Output

Collector的扩展,增加了发射WaterMark的功能。该接口主要供operator用于发射元素或者WaterMark

  • emitWatermark : 该发射WaterMark将广播给下游的所有operator

TimeCharacteristic

Flink在涉及到时间相关的处理时,将时间划分为三类。而时间类型的定义在Flink中就是用该枚举来表示:

  • ProcessingTime
  • IngestionTime
  • EventTime

这三种时间类型之前我们曾多次提及,这里不再啰嗦

TimestampedCollector

Output的包装器实现,它用于给元素设置时间戳

AbstractStreamOperator

该抽象类为实现一个具体的operator提供基本的支持,Flink内置提供的operator全部都直接或间接继承自AbstractStreamOperator

它内部包含了三大类的属性:

  • 配置属性
  • 运行时属性
  • 键值对状态属性

大都数方法都是辅助方法,值得一提的是setup方法。从这里我们可以看到所有operator标识符的生成方式:

1
String operatorIdentifier = getClass().getSimpleName() + "_" + config.getVertexID() + "_" + runtimeContext.getIndexOfThisSubtask();

可以看到标识是由”_”间隔的三段拼接而成。三段分别是:类名,vertex id,以及当前subtask的索引。

然后基于此标识,创建了用于存储状态的stateBackend

1
stateBackend = container.createStateBackend(operatorIdentifier, keySerializer);

stateBackenddispose方法中会被关闭。

AbstractStreamOperator并没有对open/close等生命周期方法提供具体的实现,这些方法的具体实现被后延至后面谈到的AbstractUdfStreamOperator中。

AbstractUdfStreamOperator

该类主要针对operator生命周期相关的方法(open/close/dispose)提供了模板实现。而这些实现都统一针对用户定义的Function的实例(简称udf)。

ChainingStrategy

该枚举定义了operatorchain strategy(链接策略)。当一个operator链接到其前置operator时,意味着它们将在同一个线程上执行。StreamOperator的默认值是HEAD,这意味着它将没有前置operator,不过它有可能成为其他operator的前置operator。大部分StreamOperator将该枚举以ALWAYS覆盖,表示它们将链接到一个前置operator

它的三个枚举值:

  • ALWAYS :上面已经提到过,它允许将当前operator链接到某前置operator,这是提升性能的良好实践,它能够提升operator的并行度
  • NEVER :该策略不支持operator被链接到某前置operator也不支持被作为其他operator的前置operator
  • HEAD :该策略表示operator没有前置operator,不过可以作为其他operatorchain header

内置的Operator实现

StreamCounter

元素累加器,没有什么特别的

StreamProject

这里需要解释一下,此处的project,并非通常所指的项目的意思,而是投射、投影的意思。你可以将其类比于SQL中的SELECT子句。因此他允许你选择你需要的fields集合。这通过其构造器的一个字段索引数组来指定:

processElement方法中,它依次遍历所有需要的字段索引,将元素中需要的字段提取出来,放入一个用于输出的outTuple,最后再将其发射出去:

1
2
3
4
5
6
public void processElement(StreamRecord<IN> element) throws Exception {
for (int i = 0; i < this.numFields; i++) {
outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
}
output.collect(element.replace(outTuple));
}

StreamFilter

filter operator,处理逻辑很简单,根据自定义的FilterFunction方法,对每个元素进行过滤,如果满足过滤条件,则将该元素emit出去。

StreamMap

map operator,根据传入的MapFunction,对每个元素应用map操作后将其发射出去。

StreamFlatMap

flatmap operator接收FlatMapFunction函数,有一些特别之处:在其open方法中,它初始化了一个TimestampedCollector,作为传递给FlatMapFunctioncollector,该collector是给那些特定的userFunction使用的,并且用于给他们操作的元素设置时间戳。

StreamGroupedFold

分组的fold operatorfold函数的执行依赖于一个初始化值initialValue。因此这里涉及到状态保存。并且状态是跟具体的分区关联的。因此,在open方法的实现中,需要获得跟分区关联的ValueState

1
2
ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
values = getPartitionedState(stateId);

processElement方法的实现,涉及到一系列的操作:从ValueState中获取数据,作为“新”的初始值跟当前元素一起进行fold函数运算,获得结果后更新ValueState,然后将获得的结果emit出去。

StreamGroupedReduce

按分组进行reduce操作的operator.

基于特定的状态名称:

1
private static final String STATE_NAME = "_op_state";

构建状态id

1
ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null);

然后再获取状态值:

1
values = getPartitionedState(stateId);

以上两个动作在open方法中实现

processElement方法中,分为两种情况:

  • 如果之前已存在状态值,那么拿当前值跟之前的状态值做reduce并获得结果,将结果再次更新到最新状态并emit出去
  • 如果之前不存在状态值,那么直接将当前值更新到状态中,并将当前值emit出去

StreamSink

sink operator,通常是流处理的最后一个operator。它接收SinkFunction的实例。在processElement中依次调用其invoke方法。

小结

本文主要探讨了stream transformation的运行时形式operator的大致实现。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group