Apache Flink源码解析之stream-window

文章目录
  1. 1. Window
    1. 1.1. GlobalWindow
    2. 1.2. TimeWindow
  2. 2. WindowAssigner
    1. 2.1. 内置的WindowAssigner
      1. 2.1.1. GlobalWindows
      2. 2.1.2. TumblingEventTimeWindows
      3. 2.1.3. TumblingProcessingTimeWindows
      4. 2.1.4. SlidingProcessingTimeWindows
      5. 2.1.5. SlidingEventTimeWindows
  3. 3. evictors
    1. 3.1. 内置的Evitor
      1. 3.1.1. TimeEvitor
      2. 3.1.2. CountEvictor
      3. 3.1.3. DeltaEvictor
  4. 4. Time
  5. 5. Trigger
    1. 5.1. TriggerContext
    2. 5.2. TrigerResult
    3. 5.3. 内置的Trigger
      1. 5.3.1. EventTimeTrigger
      2. 5.3.2. ProcessingTimeTrigger
      3. 5.3.3. ContinuousEventTimeTrigger
      4. 5.3.4. ContinuousProcessingTimeTrigger
      5. 5.3.5. CountTrigger
      6. 5.3.6. PurgingTrigger
      7. 5.3.7. DeltaTrigger
  6. 6. 小结

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析。本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下。

Window

一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达。

Flink的根窗口对象是一个抽象类,只提供了一个抽象方法:

1
public abstract long maxTimestamp();

用于获取最大的时间戳。Flink提供了两个窗口的具体实现。在实现Window时,子类应该override equalshashCode这两个方法,以使得在逻辑上两个相等的window被认为是同一个。

GlobalWindow

GlobalWindow是一个全局窗口,被实现为单例模式。其maxTimestamp被设置为Long.MAX_VALUE

该类内部有一个静态类定义了GlobalWindow的序列化器:Serializer

TimeWindow

TimeWindow表示一个时间间隔窗口,这体现在其构造器需要注入的两个属性:

  • start : 时间间隔的起始
  • end : 时间间隔的截止

TimeWindow表示的时间间隔为[start, end)。其maxTimestamp的实现为:

1
2
3
public long maxTimestamp() {
return end - 1;
}

其equals的实现中,除了常规比较(比较引用,比较Class的实例),还会比较start,end这两个属性。

TimeWindow也在内部实现了序列化器,该序列化器主要针对startend两个属性。

WindowAssigner

元素的窗口分配器。用于将元素分配给一个或者多个窗口。该抽象类定义了三个抽象方法:

  • assignWindows :将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合
  • getDefaultTrigger :返回跟WindowAssigner关联的默认触发器
  • getWindowSerializer :返回WindowAssigner分配的窗口的序列化器

内置的WindowAssigner

整个类型继承图如下:

flink-stream-window_window-assigner-all-class-diagram

下面会谈到很多基于时间的窗口,这里有两个概念,分别是时间类型窗口类型

时间类型:

  • eventTime :用户赋予的自定义的时间戳(事件时间戳)
  • processingTime : 执行当前task的subtask主机的本地时间戳(系统时间戳)

窗口类型:

  • Sliding:滑动窗口,可能会重叠(某个元素可能会身处多个窗口中)
  • Tumbling:非重叠窗口(在assignWindows方法中返回的一般都是Collections.singletonList()

GlobalWindows

该分配器对应于窗口GlobalWindow,它将所有的元素分配给同一个GlobalWindow(本质上而言,GlobalWindow也只有一个实例)。跟GlobalWindow的实现方式一样,GlobalWindows也被实现为单例模式。

方法实现:

  • assignWindows :方法的实现即返回存放GlobalWindow单实例的集合对象
  • getDefaultTrigger :的实现是返回一个不做任何动作的NerverTrigger

TumblingEventTimeWindows

依据给定的窗口大小,结合event-time,返回存储TimeWindow单实例的集合。getDefaultTrigger方法返回EventTimeTrigger类型的实例。

TumblingProcessingTimeWindows

依据给定窗口的大小,结合processing-time,返回存储TimeWindow单实例的集合。需要注意的是,这里依据的是运行当前任务所在主机的本地时间戳。getDefaultTrigger方法返回的是ProcessingTimeTrigger类型的实例。

SlidingProcessingTimeWindows

Sliding窗口不同于Tumbling窗口,它除了指定窗口的大小,还要指定一个滑动值,即slide。所谓的滑动窗口可以这么理解,比如:一分钟里每十秒钟。这里一分钟是窗口大小,每十秒即为滑动值。

在Sliding窗口中,assignWindows方法返回的就不再是单个窗口了,而是窗口的集合。首先计算出窗口的个数:size/slide,然后循环初始化给定的size内不同slide的窗口对象。

SlidingEventTimeWindows

类似SlidingProcessingTimeWindows只不过窗口的start参数的计算方式依赖于系统时间戳。

evictors

evitor : 中文译为驱逐者;顾名思义其用于剔除窗口中的某些元素

它剔除元素的时机是:在触发器触发之后,在窗口被处理(apply windowFunction)之前

该接口只定义了一个方法:

1
int evict(Iterable<StreamRecord<T>> elements, int size, W window);

接口的返回值即表示要剔除元素的个数。

内置的Evitor

Flink内置实现了三个Evitor

  • TimeEvitor
  • CountEvitor
  • DeltaEvitor

TimeEvitor

这个Evitor基于给定的保留时间(keep time)作为剔除规则,大致的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
int toEvict = 0;
long currentTime = Iterables.getLast(elements).getTimestamp();
long evictCutoff = currentTime - windowSize;
for (StreamRecord<Object> record: elements) {
if (record.getTimestamp() > evictCutoff) {
break;
}
toEvict++;
}
return toEvict;
}

大致的逻辑是,先取出最后一个元素的时间戳作为“当前”时间,然后减去期望中的“窗口大小”,得到一个基准时间戳(只需要比基准时间戳大的元素)。

然后从第一个元素开始循环比较每一个元素,如果比基准时间戳小,则累加剔除统计数,一旦发现某个元素的时间戳大于基准时间戳,则直接跳出循环,不再累加了(因为本地窗口中元素是基于时间有序的,这一点由Flink运行时来保证,如果从某个元素开始其时间戳大于基准时间戳,则后续的所有元素都满足这一条件,因此也就没必要再循环下去了)。

CountEvictor

基于容量的Evictor,它通过比对evict方法的第二个参数size来判断应该剔除多少个元素。具体的实现:

1
2
3
4
5
6
7
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
if (size > maxCount) {
return (int) (size - maxCount);
} else {
return 0;
}
}

DeltaEvictor

基于给定的阈值thresholddeltaFunction来进行判断。也是拿当前元素跟最后一个元素一起计算delta跟阈值做对比。

Time

Flink中仅有一个类Time来定义窗口的时间间隔。该时间默认指执行环境下的时间。创建一个Time对象,需要两个参数:

  • size : 时间间隔的大小(数值)
  • unit : TimeUnit的实例,表示时间间隔的单位

该类提供的很多静态方法提供对不同unit的设置。

Trigger

Trigger(触发器)用于决定某个窗口的元素集合什么时候触发计算以及结果什么时候被emit。

以粗粒度来看,Flink主要提供了三种形式的触发方式:

  • 按元素
  • 按系统时间
  • 按事件时间

这体现为Trigger的三个主要的抽象方法:

  • onElement :针对每个元素触发,这主要针对于那些基于元素的触发器,比如后面我们将看到的CountTrigger
  • onProcessingTime :被processing-time(Flink系统时间时间戳)定时器触发
  • onEventTime :被event-time(事件时间戳)定时器触发

以上这些方法中都有一个共同的参数:TriggerContext

TriggerContext

顾名思义,它提供触发器执行时的上下文信息,但它只是Trigger的内部接口:

  • getCurrentWatermark :返回当前的watermark
  • registerProcessingTimeTimer :注册一个系统时间的定时器,触发onProcessingTime
  • registerEventTimeTimer :注册一个事件时间的定时器,触发onEventTime
  • deleteProcessingTimeTimer :删除系统时间的定时器
  • deleteEventTimeTimer :删除事件时间的定时器
  • getPartitionedState :用于失败恢复的获取状态的接口

其中,registerXXX/deleteXXX模式对主要针对上面两种基于时间的触发器。而最后一个方法getKeyValueState也是非常重要的,因为它用于获取窗口相关的状态,比如后面谈到的一些触发器是依赖于一些上下文状态的,那些状态的获取就是依靠这个方法。

TrigerResult

Trigger中定义的三个触发方法被调用后,最终要返回一个结果以决定触发之后产生的行为(比如是调用window function还是将窗口丢弃),这个定义触发器触发结果行为是通过TriggerResult来表达的。它是一个枚举类型,有这么几个枚举值:

  • FIRE :window将会被应用window Function进行计算,然后将结果emit出去,但元素并没有被清洗,仍然在window中
  • PURGE :清除window中的元素
  • FIRE_AND_PURGE :同时具备FIREPURGE两种属性产生的行为
  • CONTINUE :不做任何操作

内置的Trigger

Flink内置实现了很多触发器,完整的类图如下:

flink-stream-window_trigger-all-class-diagram

这些触发器都具有一些共性,这里一并说明:

  • 由于Flink在Trigger中已事先将各种触发器类型的回调封装为不同的方法(onXXX),所以后续各种不同的触发器类型的核心逻辑将主要在其特定相关的onXXX方法中,而无关的onXXX方法将直接返回TriggerResult.CONTINUE其实个人认为这种设计方式有欠妥当,因为不利于扩展
  • 因为有不少触发类型依赖于上下文的某些状态值(比如下文典型的ContinuousXXXTrigger),这些状态值将通过TriggerContextgetPartitionedState方法进行存取

EventTimeTrigger

基于事件时间的触发器,对应onEventTime

ProcessingTimeTrigger

基于当前系统时间的触发器,对应onProcessingTime

ContinuousEventTimeTrigger

该触发器是基于事件时间的按照指定时间间隔持续触发的触发器,它的首次触发取决于Watermark。首次触发的判断位于onElement中,它注册下一次(也是首次)触发eventTime 定时器的时机,然后将其first状态标识为false。具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ValueState<Boolean> first = ctx.getPartitionedState(stateDesc);
if (first.value()) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerEventTimeTimer(nextFireTimestamp);
first.update(false);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}

持续的触发依赖于在onEventTime中不断注册下一次触发的定时器:

1
2
3
4
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}

ContinuousProcessingTimeTrigger

基于系统时间的按照指定时间间隔持续触发的触发器,它也是基于保存的状态值fire-timestamp来判断是否需要触发,不过它的循环注册过程是在onElement中。

CountTrigger

基于一个给定的累加值触发,由于累加值不是基于时间而是基于元素的,所有其触发机制实现在onElement中,逻辑很简单,先累加如果大于给定的阈值则触发:

1
2
3
4
5
6
7
8
9
10
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
ValueState<Long> count = ctx.getPartitionedState(stateDesc);
long currentCount = count.value() + 1;
count.update(currentCount);
if (currentCount >= maxCount) {
count.update(0L);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}

PurgingTrigger

该触发器类似于一个包装器,用于将任何给定的触发器转变成purging触发器。它的实现机制是,它接收一个trigger实例,然后在各个onXXX回调上执行该实例的相应的onXXX并获得TriggerResult的实例,进行相应的判断,最后返回FIRE_AND_PURGE枚举值。

DeltaTrigger

基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值,则触发。因为是基于元素的,所以主要逻辑实现在onElement中。

小结

本篇还是侧重于分析跟窗口有关的概念,就目前来看它们并没有太多的关联性,这一点我们在后续会剖析它们如何关联起来实现完整的窗口机制的。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat