Apache Flink fault tolerance源码剖析(二)

文章目录
  1. 1. 周期性的检查点触发机制
    1. 1.1. ScheduledTrigger
    2. 1.2. startCheckpointScheduler
    3. 1.3. stopCheckpointScheduler
    4. 1.4. triggerCheckpoint
  2. 2. 基于Actor的消息驱动的协同机制
    1. 2.1. AbstractCheckpointMessage
    2. 2.2. TriggerCheckpoint
      1. 2.2.1. 触发消息
      2. 2.2.2. 消息处理
    3. 2.3. DeclineCheckpoint
      1. 2.3.1. 触发消息
      2. 2.3.2. 消息处理
    4. 2.4. AcknowledgeCheckpoint
      1. 2.4.1. 触发消息
      2. 2.4.2. 消息处理
    5. 2.5. NotifyCheckpointComplete
      1. 2.5.1. 触发消息
      2. 2.5.2. 消息处理
  3. 3. 小结

继续Flink Fault Tolerance机制剖析。上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制。这篇涉及到一个非常关键的类——CheckpointCoordinator

org.apache.flink.runtime.checkpoint.CheckpointCoordinator

该类可以理解为检查点的协调器,用来协调operatorstate的分布式快照。

周期性的检查点触发机制

检查点的触发机制是基于定时器的周期性触发。这涉及到一个定时器的实现类ScheduledTrigger

ScheduledTrigger

触发检查点的定时任务类。其实现就是调用triggerCheckpoint方法。这个方法后面会具体介绍。

1
2
3
4
5
6
7
8
public void run() {
try {
triggerCheckpoint(System.currentTimeMillis());
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint", e);
}
}

startCheckpointScheduler

启动触发检查点的定时任务的方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
stopCheckpointScheduler();
try {
// Multiple start calls are OK
checkpointIdCounter.start();
} catch (Exception e) {
String msg = "Failed to start checkpoint ID counter: " + e.getMessage();
throw new RuntimeException(msg, e);
}
periodicScheduling = true;
currentPeriodicTrigger = new ScheduledTrigger();
timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
}
}

方法的实现包含两个主要动作:

  • 启动检查点ID计数器checkpointIdCounter
  • 启动触发检查点的定时任务

stopCheckpointScheduler

关闭定时任务的方法,用来释放资源,重置一些标记变量。

triggerCheckpoint

该方法是触发一个新的检查点的核心逻辑。

首先,方法中会去判断一个flag:triggerRequestQueued。该标识表示是否一个检查点的触发请求不能被立即执行。

1
2
3
4
5
// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
LOG.warn("Trying to trigger another checkpoint while one was queued already");
return false;
}

如果不能被立即执行,则直接返回。

不能被立即执行的原因是:还有其他处理没有完成。

接着检查正在并发处理的未完成的检查点:

1
2
3
4
5
6
7
8
9
// if too many checkpoints are currently in progress, we need to mark that a request is queued
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel();
currentPeriodicTrigger = null;
}
return false;
}

如果未完成的检查点过多,大于允许的并发处理的检查点数目的阈值,则将当前检查点的触发请求设置为不能立即执行,如果定时任务已经启动,则取消定时任务的执行,并返回。

以上这些检查处于基于锁机制实现的同步代码块中。

接着检查需要被触发检查点的task是否都处于运行状态:

1
2
3
4
5
6
7
8
9
10
11
ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee != null && ee.getState() == ExecutionState.RUNNING) {
triggerIDs[i] = ee.getAttemptId();
} else {
LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getSimpleName());
return false;
}
}

只要有一个task不满足条件,则不会触发检查点,并立即返回。

然后检查是否所有需要ack检查点的task都处于运行状态:

1
2
3
4
5
6
7
8
9
10
11
12
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
ev.getSimpleName());
return false;
}
}

如果有一个task不满足条件,则不会触发检查点,并立即返回。

以上条件都满足(即没有return false;),才具备触发一个检查点的基本条件。

下一步,获得checkpointId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final long checkpointID;
if (nextCheckpointId < 0) {
try {
// this must happen outside the locked scope, because it communicates
// with external services (in HA mode) and may block for a while.
checkpointID = checkpointIdCounter.getAndIncrement();
}
catch (Throwable t) {
int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
return false;
}
}
else {
checkpointID = nextCheckpointId;
}

这依赖于该方法的另一个参数nextCheckpointId,如果其值为-1,则起到标识的作用,指示checkpointId将从外部获取(比如Zookeeper,后续文章会谈及检查点ID的生成机制)。

接着创建一个PendingCheckpoint对象:

1
final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);

该类表示一个待处理的检查点。

与此同时,会定义一个针对当前检查点超时进行资源清理的取消器canceller。该取消器主要是针对检查点没有释放资源的情况进行资源释放操作,同时还会调用triggerQueuedRequests方法启动一个触发检查点的定时任务,如果有的话(取决于triggerRequestQueued是否为true)。

然后会再次进入同步代码段,对上面的是否新建检查点的判断条件做二次检查,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得checkpointId的代码,不在同步块中。

检查后,如果触发检查点的条件仍然是满足的,那么将上面创建的PendingCheckpoint对象加入集合中:

1
pendingCheckpoints.put(checkpointID, checkpoint);

同时会启动针对当前检查点的超时取消器:

1
timer.schedule(canceller, checkpointTimeout);

接下来会发送消息给task以真正触发检查点(基于消息驱动的协同机制):

1
2
3
4
5
for (int i = 0; i < tasksToTrigger.length; i++) {
ExecutionAttemptID id = triggerIDs[i];
TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}

基于Actor的消息驱动的协同机制

上面已经谈到了检查点的触发机制是基于定时任务的周期性触发,那么定时任务的启停机制又是什么?Flink使用的是基于AKKA的Actor模型的消息驱动机制。

CheckpointCoordinatorDeActivator是一个Actor的实现,它用于基于消息来驱动检查点的定时任务的启停:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void handleMessage(Object message) {
if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
if (status == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
// we ignore all other messages
}

Actor会收到Job状态的变化通知:JobStatusChanged。一旦变成RUNNING,那么检查点的定时任务会被立即启动;否则会被立即关闭。

Actor被创建的代码是CheckpointCoordinator中的createActivatorDeactivator方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
if (jobStatusListener == null) {
Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
// wrap the ActorRef in a AkkaActorGateway to support message decoration
jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
}
return jobStatusListener;
}
}

既然,是基于消息驱动机制,那么就需要各种类型的消息对应不同的业务逻辑。这些消息在Flink中被定义在package:org.apache.flink.runtime.messages.checkpoint中。

类图如下:

flink-fault-tolerance-2_message-class-diagram

AbstractCheckpointMessage

检查点消息的基础抽象类,提供了三个公共属性(从构造器注入):

  • job:JobID的实例,表示当前这条消息实例的归属;
  • taskExecutionId:ExecutionAttemptID的实例,表示检查点的源/目的任务
  • checkpointId:当前消息协调的检查点ID

除此之外,该实现仅仅override了hashCodeequals方法。

TriggerCheckpoint

该消息由JobManager发送给TaskManager,用于告诉一个task触发它的检查点。

触发消息

位于CheckpointCoordinator类的triggerCheckpoint中,上面已经提及过。

1
2
3
4
5
for (int i = 0; i < tasksToTrigger.length; i++) {
ExecutionAttemptID id = triggerIDs[i];
TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}

消息处理

TaskManagerhandleCheckpointingMessage实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
case message: TriggerCheckpoint =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, timestamp)
} else {
log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
}

主要是触发检查点屏障Barrier

DeclineCheckpoint

该消息由TaskManager发送给JobManager,用于告诉检查点协调器:检查点的请求还没有能够被处理。这种情况通常发生于:某task已处于RUNNING状态,但在内部可能还没有准备好执行检查点。

它除了AbstractCheckpointMessage需要的三个属性外,还需要用于关联检查点的timestamp

触发消息

位于Task类的triggerCheckpointBarrier方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
if (!success) {
DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
jobManager.tell(decline);
}
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new RuntimeException(
"Error while triggering checkpoint for " + taskName,
t));
}
}
}
};

消息处理

位于JobManagerhandleCheckpointMessage

具体的实现在CheckpointCoordinatorreceiveDeclineMessage中:

首先从接收的消息中(DeclineCheckpoint)获得检查点编号:

1
final long checkpointId = message.getCheckpointId();

接下来的逻辑是判断当前检查点是否是未完成的检查点:isPendingCheckpoint

接下来分为三种情况对待:

  • 如果是未完成的检查点,并且相关资源没有被释放(检查点没有被discarded
1
2
3
4
isPendingCheckpoint = true;
pendingCheckpoints.remove(checkpointId);
checkpoint.discard(userClassLoader);
rememberRecentCheckpointId(checkpointId);

isPendingCheckpointtrue,根据检查点编号,将检查点从未完成的检查点集合中移除,discard检查点,记住最近的检查点(将其保持到到一个最近的检查点列表中)。

接下来查找是否还有待处理的检查点,根据检查点时间戳来判断:

1
2
3
4
5
6
7
8
9
boolean haveMoreRecentPending = false;
Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
while (entries.hasNext()) {
PendingCheckpoint p = entries.next().getValue();
if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
haveMoreRecentPending = true;
break;
}
}

根据标识haveMoreRecentPending来进入不同的处理逻辑:

1
2
3
4
5
6
7
if (!haveMoreRecentPending && !triggerRequestQueued) {
LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
triggerCheckpoint(System.currentTimeMillis());
} else if (!haveMoreRecentPending) {
LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
triggerQueuedRequests();
}

如果有需要处理的检查点,并且当前能立即处理,则立即触发检查点定时任务;如果有需要处理的检查点,但不能立即处理,则触发入队的定时任务。

  • 如果是未完成的检查点,并且检查点已经被discarded

抛出IllegalStateException异常

  • 如果不是未完成的检查点

如果在最近未完成的检查点列表中找到,则有可能表示消息来迟了,将isPendingCheckpoint置为true,否则将isPendingCheckpoint置为false.

最后返回isPendingCheckpoint

AcknowledgeCheckpoint

该消息是一个应答信号,表示某个独立的task的检查点已经完成。也是由TaskManager发送给JobManager。该消息会携带task的状态:

  • state
  • stateSize

触发消息

RuntimeEnvironment类的acknowledgeCheckpoint方法。

消息处理

具体的实现在CheckpointCoordinatorreceiveAcknowledgeMessage中,开始的实现同receiveDeclineMessage,也是判断当前接收到的消息中包含的检查点是否是待处理的检查点。如果是,并且也没有discard掉,则执行如下逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) {
if (checkpoint.isFullyAcknowledged()) {
completed = checkpoint.toCompletedCheckpoint();
completedCheckpointStore.addCheckpoint(completed);
LOG.info("Completed checkpoint " + checkpointId + " (in " +
completed.getDuration() + " ms)");
LOG.debug(completed.getStates().toString());
pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId);
dropSubsumedCheckpoints(completed.getTimestamp());
onFullyAcknowledgedCheckpoint(completed);
triggerQueuedRequests();
}
}

检查点首先应答相关的task,如果检查点已经完全应答完成,则将检查点转换成CompletedCheckpoint,然后将其加入completedCheckpointStore列表,并从pendingCheckpoints中移除。然后调用dropSubsumedCheckpoints它会从pendingCheckpointsdiacard所有时间戳小于当前检查点的时间戳,并从集合中移除。

最后,如果该检查点被转化为已完成的检查点,则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (completed != null) {
final long timestamp = completed.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
}
}
statsTracker.onCompletedCheckpoint(completed);
}

迭代所有待commit的task,发送NotifyCheckpointComplete消息。同时触发状态跟踪器的onCompletedCheckpoint回调方法。

NotifyCheckpointComplete

该消息由JobManager发送给TaskManager,用于告诉一个task它的检查点已经得到完成确认,task可以向第三方提交该检查点。

触发消息

位于CheckpointCoordinator类的receiveAcknowledgeMessage方法中,当检查点acktask完成,转化为CompletedCheckpoint之后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (completed != null) {
final long timestamp = completed.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
}
}
statsTracker.onCompletedCheckpoint(completed);
}

消息处理

TaskManagerhandleCheckpointingMessage

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
case message: NotifyCheckpointComplete =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.notifyCheckpointComplete(checkpointId)
} else {
log.debug(
s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
}

主要是触发tasknotifyCheckpointComplete方法。

小结

这篇文章主要讲解了检查点的基于定时任务的周期性的触发机制,以及基于Akka的Actor模型的消息驱动的协同处理机制。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group