Apache Flink fault tolerance源码剖析完结篇

文章目录
  1. 1. 恢复机制实现
  2. 2. 检查点触发机制
  3. 3. 基于Akka消息驱动的协调机制
  4. 4. 基于Zookeeper的高可用
  5. 5. 保存点
  6. 6. 状态终端
  7. 7. 基于Barrier机制的一致性保证
  8. 8. 完整的检查点流程示例
  9. 9. 小结

这篇文章是对Flinkfault tolerance的一个总结。虽然还有些细节没有涉及到,但是基本的实现要点在这个系列中都已提及。

回顾这个系列,每篇文章都至少涉及一个知识点。我们来挨个总结一下。

恢复机制实现

Flink中通常需要进行状态恢复的对象是operator以及function。它们通过不同的方式来达到状态快照以及状态恢复的能力。其中function通过实现Checkpointed的接口,而operator通过实现StreamOpeator接口。这两个接口的行为是类似的。

当然对于数据源组件而言(SourceFunction),要想使得Flink具备完整的失败恢复能力,需要外部数据提供者具备重新消费数据的能力(Apache Kafka提供的message offset机制具备这样的能力,Flink的kafka-connector也利用了这一点来实现数据源的失败恢复,具体的实现见FlinkKafkaConsumerBase)。

检查点触发机制

检查点根据状态的不同,分为:

  • PendingCheckpoint:正在处理的检查点
  • CompletedCheckpoint:完成了的检查点

PendingCheckpoint表示一个检查点已经被创建,但还没有得到所有该应答的task的应答。一旦所有的task都给予应答,那么它将会被转化为一个CompletedCheckpoint

检查点的触发机制是基于时间的周期性触发。触发检查点的驱动者是JobManager,而检查点的执行者则是TaskManager

检查点的触发需要满足很多条件,比如需要所有的task都具备触发检查点的条件等等,检查点才能被触发执行,如果检查点定时任务在执行时遇到上一次正在执行的任务还没有完成,那么当前定时任务将先“入队”,等待上一次任务完成。

基于Akka消息驱动的协调机制

Flink运行时的控制中心是JobManager,检查点的触发由JobManager发起,真正的检查点的执行者为TaskManager。Flink的JobManager以及TaskManager之间利用Akka进行消息通信。因此,检查点的协调机制也基于Akka之上(通过消息来驱动),Flink定义了多个不同的消息对象来驱动检查点执行,比如DeclineCheckpointTriggerCheckpointAcknowledgeCheckpoint等。

基于Zookeeper的高可用

Flink提供了两种恢复模式RecoverMode

  • STANDALONE
  • ZOOKEEPER

STANDALONE表示不对JobManager的失败进行恢复。而ZOOKEEPER表示JobManager将基于Zookeeper实现HA(高可用)。

flink-fault-tolerance-3-overview

作为Flink高可用的实现机制,Zookeeper被用来生成原子的&单调递增的检查点ID,并存储已完成的检查点。

而检查点ID生成器以及已完成的检查点的存储合起来被称之为检查点恢复服务

保存点

所谓的保存点,其实是用户人为触发的一种特殊的检查点。其本质就是检查点,但它相比检查点有两点不同:

  • 用户自行触发
  • 当有新的已完成的检查点产生的时候,不会自动失效

flink-fault-tolerance-4_savepoint-and-checkpoint

保存点是用户人为触发的,如何触发呢?这依赖于Flink提供的client,用户可以通过client(CLI)来触发一个保存点。用户执行触发保存点操作后,client会通过akkaJobManager发一个消息,JobManager接着通知各TaskManager触发检查点。检查点触发完成后,TaskManager会执行JobManager的回调,在回调中JobManager会告知触发保存点的结果(也是通过akka给客户端发消息)。保存点它不会随着新的已完成的检查点产生而自动失效。另外,不同于检查点的是,保存点并不像检查点一样将状态作为自己的一部分一并保存。保存点不存储状态,它只通过一个指针指向具体的检查点所属的状态。

保存点的存储。Flink支持两种形式的保存点的存储:memoryfilesystem。推荐在生产环境下使用filesystem(可以利用hdfs等提供持久化保证)。因为基于memory的保存点存储机制是将保存点存储在JobManager的内存中。一旦JobManager宕机,那么保存点的信息将没有办法被恢复。

状态终端

在Flink中被直接支持的最终状态有:

  • ValueState : 单值状态
  • ListState : 集合状态
  • FoldingState : folding状态,for FoldFunction
  • ReducingState : reducing状态,for ReduceFunction

但最终结合检查点机制进行存储和恢复的状态表示是KvState,它表示通用的用户定义的键值对状态,可以简单得将其看做上面被最终支持的状态的容器。而KvStateSnapshot表示状态KvState的快照,用于对状态进行恢复。StateHandleoperator提供操作状态的接口,将状态从面向存储介质的原始表示还原为对象表示。

状态终端用来对状态进行持久化存储,Flink支持多个状态终端:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend(第三方开发者实现)

基于Barrier机制的一致性保证

Flink提供两种不同的一致性保证:

  • EXACTLY_ONCE:恰巧一次
  • AT_LEAST_ONCE:至少一次

其中EXACTLY_ONCE支持对数据处理精确度要求较高的使用场景,但有时会产生明显的时延。而AT_LEAST_ONCE应对于需要低延时,但对数据的准确性要求并不高的场景。

需要注意的是这里的一致性保证并不是指被处理的元素流过Stream Dataflow的保证,而是指operator在最后一次改变状态之后,后续的数据对状态的改变产生的最终影响(结合检查点)。

一致性保证离不开Flink的checkpoint barrier

单个数据流视角,barrier示意:

flink-stream-fault-tolerance_stream-barriers

分布式多input channel视角,barrier示意图:

flink-stream-fault-tolerance_stream-aligning

该图演示的是多barrier aligning(对齐),但只有EXACTLY_ONCE一致性时才会要求这一点

JobManager将指示source发射barriers。当某个operator从其输入中接收到一个CheckpointBarrier,它将会意识到当前正处于前一个检查点和后一个检查点之间。一旦某operator从它的所有input channel中接收到checkpoint barrier。那么它将意识到该检查点已经完成了。它可以触发operator特殊的检查点行为并将该barrier广播给下游的operator

应对两种不同的一致性保证,Flink提供了两个不同的CheckpointBarrierHandler的实现,它们的对应关系是:

  • BarrierBuffer - EXACTLY_ONCE
  • BarrierTracker - AT_LEAST_ONCE

BarrierBuffer通过阻塞已接收到barrierinput channel并缓存被阻塞的channel中后续流入的数据流,直到所有的barrier都接收到或者不满足特定的检查点的条件后,才会释放这些被阻塞的channel,这个机制被称之为——aligning(对齐)。正是这种机制来实现EXACTLY_ONCE的一致性(它将检查点中的数据精准得隔离开)。

BarrierTrack的实现就要简单地多,它仅仅是对数据流中的barrier进行跟踪,但是数据流中的元素buffer是直接放行的。这种情况会导致同一个检查点中可能会预先混入后续检查点的元素,从而只能提供AT_LEAST_ONCE的一致性。

完整的检查点流程示例

flink-fault-tolerance-7_checkpoint-1

flink-fault-tolerance-7_checkpoint-2

flink-fault-tolerance-7_checkpoint-3

flink-fault-tolerance-7_checkpoint-4

小结

本文是Flink fault tolerance系列的完结篇,对关键概念和流程进行了总结和梳理。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group