Apache Flink fault tolerance源码剖析(一)

文章目录
  1. 1. Checkpointed
    1. 1.1. CheckpointedAsynchronously
    2. 1.2. MessageAcknowledgingSourceBase
    3. 1.3. FlinkKafkaConsumerBase
    4. 1.4. StatefulSequenceSource
  2. 2. StreamOperator
    1. 2.1. AbstractStreamOperator
    2. 2.2. AbstractUdfStreamOperator
  3. 3. 小结

因某些童鞋的建议,从这篇文章开始结合源码谈谈Flink Fault Tolerance相关的话题。上篇官方介绍的翻译是理解这个话题的前提,所以如果你想更深入得了解Flink Fault Tolerance的机制,推荐先读一下前篇文章理解它的实现原理。当然原理归原理,原理体现在代码实现里并不是想象中的那么直观。这里的源码剖析也是我学习以及理解的过程。

作为源码解析Flink Fault Tolerance的首篇文章,我们先暂且不谈太有深度的东西,先来了解一下:Flink哪里涉及到检查点/快照机制来保存/恢复状态?这就是本篇要谈的主要内容。

跟Flink官方文档的说明一样,在文章中会混杂着检查点快照这两个术语,不要太过于纠结它们,某种程度它们是一致的。

从这篇文章开始,当我谈及一个类,我会给出它的全限定名,以方便大家对照。

在Flink中,需要具备Fault Tolerance能力的通常是两类对象:function以及operator

其中function通常通过实现Checkpointed来达到这个目的,而operator通过实现StreamOpeator(该接口中包含了快照、恢复状态的接口方法)。

我们会分别来分析这两个接口,然后列举一些典型的需要具备Fault Tolerance功能的对象,并分析它们的实现。

Checkpointed

org.apache.flink.streaming.api.checkpoint.Checkpointed

该接口提供给那些需要持久化状态的functionoperator使用。该接口是同步模式的快照机制。

两个接口方法:

  • snapshotState:获得function或者operator的当前状态(快照),这个状态必须反映该function之前的变更所产生的最终结果

该方法接收两个参数,第一个参数是checkpointId,表示该检查点的ID,第二个参数checkpointTimestamp,检查点的时间戳,被JobManagerSystem.currentTimeMillis()驱动

  • restoreState:用于从之前的检查点中恢复functionoperator的状态,需要注意的是该方法的调用会早于open方法

CheckpointedAsynchronously

org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously

该接口继承自Checkpointed,属于标记型接口,用于跟Checkpointed同步快照机制进行区分。

MessageAcknowledgingSourceBase

org.apache.flink.streaming.api.functions.source

该类是Flink内置的众多SouceFunction之一,也是基于具备ack(确认机制)的Message Queue的实现模板之一。

该类的完整签名:

1
2
3
public abstract class MessageAcknowledgingSourceBase<Type, UId>
extends RichSourceFunction<Type>
implements Checkpointed<SerializedCheckpointData[]>, CheckpointListener

首先我们从该类的实现接口中可以看到它希望保存的状态是:SerializedCheckpointData[]:

SerializedCheckpointData表示作为快照数据的被序列化元素的集合

有两个非常重要的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
/** The list gathering the IDs of messages emitted during the current checkpoint */
private transient List<UId> idsForCurrentCheckpoint;
/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
private transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;
/**
* Set which contain all processed ids. Ids are acknowledged after checkpoints. When restoring
* a checkpoint, ids may be processed again. This happens when the checkpoint completed but the
* ids for a checkpoint haven't been acknowledged yet.
*/
private transient Set<UId> idsProcessedButNotAcknowledged;
  • idsForCurrentCheckpoint:该集合存储着当前检查点覆盖范围内,消费掉的消息的ID集合
  • pendingCheckpoints:该集合收集的是:检查点以及该检查点中那些已经被触发处理的但没有完成的(或没有收到完成通知的)消息的ID集合对(Tuple2)
  • idsProcessedButNotAcknowledged:它用于存储那些已经被处理过的消息的ID,这些消息的ack在检查点完成之后。也就是说,如果从该检查点开始恢复,那么这些id的消息可能会被重放。

看到上面的定义,虽然都是用来存储跟消息ID相关的“集合”,但却是三种不同的数据结构,而且前两个是有序的,最后一个是无序的

后面的文章我们会谈到检查点分:PendingCheckpoint(未完成的)和CompletedCheckpoint(已完成的)

来看Checkpointed的两个接口方法的实现:

1
2
3
4
5
6
7
8
9
10
public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);
pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));
idsForCurrentCheckpoint = new ArrayList<>(64);
return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
}

首先是构建快照的方法,它将当前检查点checkpointId及其相关的消费走的消息集合idsForCurrentCheckpoint构建成一个元组Tuple2加入待处理的检查点中pendingCheckpoints。接着重新初始化了idsForCurrentCheckpoint(因为当前这个检查点的快照已经生成了,所以跟当前检查点相关的元素也需要清空掉)。

1
2
3
4
5
6
7
8
public void restoreState(SerializedCheckpointData[] state) throws Exception {
pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
// build a set which contains all processed ids. It may be used to check if we have
// already processed an incoming message.
for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
}
}

接下来是恢复状态的逻辑,它的过程也很简单。首先反序列化状态对象到pendingCheckpoints,然后遍历整个元组集合,针对每个没有完成的检查点元组checkpoint,提取出每个这些checkpoint对应的消息ID集合,将他们全部加入idsProcessedButNotAcknowledged集合中去。

从构建快照的snapshotState方法中,我们看到针对每个checkpointId都将其涵盖范围内的所有的ID集合拼装成一个二元组加入到pendingCheckpoints。而随着消息被消费,如果到最后该checkpointId对应的所有消息ID都被完全处理也就是说该检查点变成了CompletedCheckpoint,那么如何将该二元组从pendingCheckpoints移除?Flink提供了一个CheckpointListener它会在某个检查点完成之后给出通知,客户程序可以订阅它然后进行相应的回调处理notifyCheckpointComplete,该类的回调实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
Tuple2<Long, List<UId>> checkpoint = iter.next();
long id = checkpoint.f0;
if (id <= checkpointId) {
LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
acknowledgeIDs(checkpointId, checkpoint.f1);
// remove deduplication data
idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
// remove checkpoint data
iter.remove();
}
else {
break;
}
}
}

获取到已完成的checkpointId,然后遍历整个pendingCheckpoints集合,找到所有checkpointId小于当前已完成的checkpointId,然后完成三个动作:

  • ack 该checkpointId对应的所有这些消息的ID
  • 将这些消息的ID从idsProcessedButNotAcknowledged中移除
  • 将该二元组从pendingCheckpoints中移除

为什么这里判断条件是<=呢,因为checkpointId是时序递增的,而且Flink保证如果某个检查点完成,那么比该检查点小的检查点肯定也完成了。因为,检查点越小与其有关的消息集合越早被处理。

另外一个需要注意的是,该方法中的acknowledgeIDs是抽象方法,待具体类根据自己的ack机制实现。

RabbitMQ 对接Flink的Source —— RMQSource就是通过继承MessageAcknowledgingSourceBase 实现的

FlinkKafkaConsumerBase

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

FlinkKafkaConsumerBase是Flink实现基于Kafka的Source的基类,Kafka提供基于offset并且可重复消费的机制,使其非常容易实现Fault Tolerance机制,只不过这里实现的是异步模式的检查点机制CheckpointedAsynchronously

该类的完整签名如下:

1
2
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T>

从签名来看,它快照的数据类型是:

1
HashMap<KafkaTopicPartition, Long>

该类型描述了kafka消息消费的具体的信息(包含topic,partition,offset)。

然后继续看snapshotState方法:

1
2
3
4
5
6
7
8
9
10
11
// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
//noinspection unchecked
HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) offsetsState.clone();
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingCheckpoints.put(checkpointId, currentOffsets);
while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingCheckpoints.remove(0);
}

这是其核心代码。它先将offsetsState克隆一份,作为当前检查点的快照,然后放入pendingCheckpoints作为待处理的检查点集合。

这里有一个安全性检查:如果待处理的安全点集合大于默认设定的阈值(100),则移除集合中第一个检查点,这么做的目的是为了防止集合太大导致内存泄漏

restoreState方法的实现,只是将待恢复的偏移量快照对象赋予当前对象的偏移量而已。

MessageAcknowledgingSourceBase,为了得到检查点完成的通知,FlinkKafkaConsumerBase也实现了CheckpointListener接口,以在检查点完成时进行回调处理。来看看notifyCheckpointComplete方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
HashMap<KafkaTopicPartition, Long> checkpointOffsets;
// the map may be asynchronously updates when snapshotting state, so we synchronize
synchronized (pendingCheckpoints) {
final int posInMap = pendingCheckpoints.indexOf(checkpointId);
if (posInMap == -1) {
LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
return;
}
//noinspection unchecked
checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingCheckpoints.remove(0);
}
}
if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
LOG.debug("Checkpoint state was empty.");
return;
}
commitOffsets(checkpointOffsets);

可以看到核心代码都进行了同步保护,因为pendingCheckpoints很可能会被异步更新。它先根据完成了的检查点,获得其在pendingCheckpoints中的索引。如果判断索引不存在,则直接退出。否则,移除该索引对应的快照信息,然后将小于当前索引(较旧的)的快照信息也一并移除(这一点我之前解释过,因为所有的检查点都是按时间递增有序的)。最后将当前完成的检查点对应的消息的偏移量进行commit,也即commitOffsets。只不过这里该方法被定义为抽象方法,因为Kafka不同版本的API差别的原因,由适配不同版本的consumer各自实现。

Flink以Kafka作为Source的具体实现机制,不是本文的重点,后续可以另开文章进行讲解

StatefulSequenceSource

org.apache.flink.streaming.api.functions.source

这是一个有状态的、给定起始和截止元素的并行序列发射器,由于它需要提供exactly once保证,所以它实现了Checkpointed接口。
它主要是用来维护一个称之为collected的发射进度状态,对其进行快照以便于实现fault tolerance

StreamOperator

org.apache.flink.streaming.api.operators. StreamOperator

StreamOperator内置了我们上面谈到的几个跟检查点相关的接口方法:

  • snapshotOperatorState
  • restoreState
  • notifyOfCompletedCheckpoint

这三个方法来自于我们之前谈到的Checkpointed以及CheckpointListener。这也由此可见,在operator中快照机制由可选项变成了必选项。

这是不难理解的,因为operator处于运行时,诸如分区信息都是必须要做快照的。

这里需要注意的是snapshotOperatorState方法,它返回值为StreamTaskState。它是表示task所有状态的一个容器对象,它包含了三类状态:

  • operatorState
  • functionState
  • kvStates

这不是本文的重点,后面的文章再谈

AbstractStreamOperator

org.apache.flink.streaming.api.operators.AbstractStreamOperator

AbstractStreamOperatorStreamOperator的抽闲类,为operator的实现提供模板,当然也为以上的三个跟快照相关的接口方法的实现提供了模板。

来看snapshotOperatorState方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
// here, we deal with key/value state snapshots
StreamTaskState state = new StreamTaskState();
if (stateBackend != null) {
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
stateBackend.snapshotPartitionedState(checkpointId, timestamp);
if (partitionedSnapshots != null) {
state.setKvStates(partitionedSnapshots);
}
}
return state;
}

可以看到它依赖于一个叫stateBackend的东西,在之前一篇文章中我们有谈及过,它是state最终的持久化机制的实现。并且从注释可以看到这里只提供了针对key/value状态的快照模板。

AbstractUdfStreamOperator

org.apache.flink.streaming.api.operators

该抽象类继承自AbstractStreamOperator,用于进一步为operator的实现提供模板,不过从类名可以看出来,它主要是为用户定义函数(udf)的operator提供模板。

snapshotOperatorState方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);
if (userFunction instanceof Checkpointed) {
@SuppressWarnings("unchecked")
Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
Serializable udfState;
try {
udfState = chkFunction.snapshotState(checkpointId, timestamp);
}
catch (Exception e) {
throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
}
if (udfState != null) {
try {
AbstractStateBackend stateBackend = getStateBackend();
StateHandle<Serializable> handle =
stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
state.setFunctionState(handle);
}
catch (Exception e) {
throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
+ e.getMessage(), e);
}
}
}
return state;
}

这里我们终于再次看到了Checkpointed接口。这是因为function只是静态的函数,它的运行还必须借助于operator,因此其状态也必须借助于operator来帮助其与Flink的运行时交互以达到最终的持久化的目的。

函数状态的持久化代码:

1
2
3
4
AbstractStateBackend stateBackend = getStateBackend();
StateHandle<Serializable> handle =
stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
state.setFunctionState(handle);

小结

本篇剖析了Flink针对Function以及Operator如何做快照以及如何恢复的实现。虽然,还没有涉及到fault tolerance的最终实现机制,但是这是我们的入口。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group