Flink运行时之基于Netty的网络通信(下)

文章目录
  1. 1. 客户端核心处理器
  2. 2. 服务端核心处理器

客户端核心处理器

这一篇,我们分析一下客户端协议栈中的核心的处理器PartitionRequestClientHandler,该处理器用于处理服务端的响应消息。

我们以客户端获取到响应之后回调该处理器的channelRead方法为入口来进行分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
//当没有待解析的原始消息时,直接解码消息,否则将消息加入到stagedMessages队列中,等待排队处理
if (!bufferListener.hasStagedBufferOrEvent() && stagedMessages.isEmpty()) {
decodeMsg(msg);
}
else {
stagedMessages.add(msg);
}
}
catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}

这里涉及到两个对象,首先是bufferListener,用于感知可用Buffer的事件侦听器,它是内部实现的BufferListenerTask类型。其次是stagedMessages,用于接收原始未解码消息的队列。

解码方法decodeMsg的主要逻辑包含对两种类型消息的解析。一种是服务端的错误响应消息ErrorResponse,另一种是正常的Buffer请求响应消息BufferResponse。对于错误响应消息会判断是否是致命错误,如果是致命错误,则直接通知所有的InputChannel并关闭它们;如果不是,则让该消息对应的InputChannel按不同情况处理。我们重点关注对BufferResponse的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
//根据响应消息里的receiverId,从注册map里获取到接收该消息的RemoteInputChannel实例
RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
//如果该响应没有对应的接收者,则释放该Buffer,同时通知服务端取消该请求
if (inputChannel == null) {
bufferOrEvent.releaseBuffer();
cancelRequestFor(bufferOrEvent.receiverId);
return true;
}
//接下来才进入到真正的解析逻辑
return decodeBufferOrEvent(inputChannel, bufferOrEvent);
}

在decodeBufferOrEvent中,它会对该消息具体是Buffer还是Event进行区分,如果是Buffer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
if (bufferOrEvent.isBuffer()) {
//空Buffer
if (bufferOrEvent.getSize() == 0) {
inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber);
return true;
}
//获得Buffer提供者,如果为空,则通知服务端取消请求
BufferProvider bufferProvider = inputChannel.getBufferProvider();
if (bufferProvider == null) {
cancelRequestFor(bufferOrEvent.receiverId);
return false;
}
while (true) {
//从Buffer提供者请求Buffer,以放置响应结果数据
Buffer buffer = bufferProvider.requestBuffer();
//如果请求到Buffer,则读取数据同时触发InputChannel的onBuffer回调
//该方法在前文分析输入通道时我们早已提及过,它会将Buffer加入到队列中
if (buffer != null) {
buffer.setSize(bufferOrEvent.getSize());
bufferOrEvent.getNettyBuffer().readBytes(buffer.getNioBuffer());
inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber);
return true;
}
//否则进入等待模式,当有Buffer可用时,会触发bufferListener的onEvent方法
else if (bufferListener.waitForBuffer(bufferProvider, bufferOrEvent)) {
releaseNettyBuffer = false;
return false;
}
else if (bufferProvider.isDestroyed()) {
return false;
}
}
}

如果从Buffer提供者没有获取到Buffer,说明当前没有可用的Buffer资源了,那么将进入等待模式。这里等待Buffer可用是基于事件侦听机制,这个机制是如何实现的呢?在上面的waitForBuffer方法的实现中,通过将当前的BufferListenerTask的bufferListener实例反向注册到Buffer提供者,当Buffer提供者中有Buffer可用时,将会触发bufferListener的onEvent回调方法。这里需要注意的是,当Buffer提供者中的Buffer从无到有,说明有Buffer被回收了,所以onEvent方法是被回收Buffer的线程所调用,而非Netty的I/O线程。

到此,我们才获取到可用的Buffer并读取了响应消息的原始数据,但数据还没有被解码。是不是解码的过程也发生在onEvent方法中呢?其实不然,在onEvent方法里,它将对原始消息的处理权交还给了Netty的I/O线程:

1
2
3
4
5
6
7
8
9
10
11
if (buffer != null) {
if (availableBuffer.compareAndSet(null, buffer)) {
ctx.channel().eventLoop().execute(this);
success = true;
}
else {
throw new IllegalStateException("Received a buffer notification, " +
" but the previous one has not been handled yet.");
}
}

代码段中会通过上下文对象获取到Channel所处的EventLoop,然后通过它的execute方法接收一个Runnable实例并在新线程执行。这里接收的this就是当前的bufferListener实例(因为BufferListenerTask也实现了Runnable接口)。所以在BufferListenerTask的onEvent方法中其实存在着一个线程执行的桥接过程。

以上就是NettyClient接收到NettyServer的响应后的处理器逻辑。由于Buffer资源受限,这里并没有直接将原始消息直接交与Netty的I/O线程并写到Buffer中,而是采取了队列缓存原始消息外加Buffer可用事件通知的机制来进行处理。

服务端核心处理器

服务端有两个核心处理器,分别是PartitionRequestServerHandler和PartitionRequestQueue。其中,PartitionRequestServerHandler会依赖PartitionRequestQueue的实例。

我们先来看PartitionRequestServerHandler,它是一种通道流入处理器(ChannelInboundHandler),主要用于初始化数据传输同时分发事件。

首先,PartitionRequestServerHandler会在Channel启动时创建一个容量至少为1的BufferPool。当然最关键的方法还是消息的处理方法channelRead0。

Netty提供了一个简化版的ChannelInboundHandler的实现,名为SimpleChannelInboundHandler。通过继承这个类,你可以非常方便得专注于实现自己的业务逻辑。因此,SimpleChannelInboundHandler类已经对ChannelInboundHandler的channelRead接口方法提供了基础实现,然后提供了名为channelRead0的抽象方法供派生类扩展。

从channelRead0方法的实现来看,客户端的请求消息被划分为三类:

  • 常规的结果分区请求;
  • 任务事件请求;
  • 其他请求;

我们分别来看针对这三类请求消息的处理逻辑,首先是常规的结果分区请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (msgClazz == PartitionRequest.class) {
PartitionRequest request = (PartitionRequest) msg;
try {
//构建结果子分区视图对象,并将其“加入队列”
ResultSubpartitionView subpartition =
partitionProvider.createSubpartitionView(
request.partitionId,
request.queueIndex,
bufferPool);
outboundQueue.enqueue(subpartition, request.receiverId);
}
catch (PartitionNotFoundException notFound) {
respondWithError(ctx, notFound, request.receiverId);
}
}

代码段中的outboundQueue是PartitionRequestQueue的实例,这里注意不要被其类名误导,它本身并不是一个队列数据结构的实现,但它内部的处理机制确实借助了队列结构来排队请求。outboundQueue同时也是在协议栈中紧随着PartitionRequestServerHandler的流入处理器PartitionRequestQueue的实例,这一点下文还会提到。

接着是任务事件请求:

1
2
3
4
5
6
7
8
9
else if (msgClazz == TaskEventRequest.class) {
TaskEventRequest request = (TaskEventRequest) msg;
//针对事件请求,将会通过任务事件分发器进行分发,如果分发失败,将会以错误消息予以响应
if (!taskEventDispatcher.publish(request.partitionId, request.event)) {
respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."),
request.receiverId);
}
}

什么情况下会导致事件分发失败呢?当事件分发时根据其partitionId如果找不到对应的侦听者时,就会认为事件分发失败。

除了上面两种请求之外的其他请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
//如果是取消请求,则调用队列的取消方法
else if (msgClazz == CancelPartitionRequest.class) {
CancelPartitionRequest request = (CancelPartitionRequest) msg;
outboundQueue.cancel(request.receiverId);
}
//如果是关闭请求,则关闭队列
else if (msgClazz == CloseRequest.class) {
outboundQueue.close();
}
else {
LOG.warn("Received unexpected client request: {}", msg);
}

从上面的代码段可见,PartitionRequestServerHandler主要起到消息分发的作用。因此我们会重点分析消息的处理者PartitionRequestQueue。

我们首先分析一下PartitionRequestServerHandler在处理消息时调用的PartitionRequestQueue的实例方法enqueue和cancel起到了什么作用。enqueue方法的实现如下:

1
2
3
public void enqueue(ResultSubpartitionView partitionQueue, InputChannelID receiverId) throws Exception {
ctx.pipeline().fireUserEventTriggered(new SequenceNumberingSubpartitionView(partitionQueue, receiverId));
}

可以看到它把原先的ResultSubpartitionView包装为SequenceNumberingSubpartitionView。然后调用fireUserEventTriggered来触发管道中的下一个ChannelInboundHandler的userEventTriggered方法。

SequenceNumberingSubpartitionView是什么?它是PartitionRequestQueue内部实现的一个ResultSubpartitionView的包装器。该包装器对原始的ResultSubpartitionView做了两件事:对每个即将返回的Buffer累加序列号同时保存相应的接收者(InputChannel)编号。

Buffer的序列号主要用于跟客户端校验消费Buffer的过程是否跟服务端的处理过程保持一致,这主要用于防止Buffer丢失。

那么下一个ChannelInboundHandler是谁呢?我们先回顾一下,在PartitionRequestProtocol协议中所组建的管道中的处理器的顺序:

1
2
3
4
5
6
7
8
9
10
11
12
13
public ChannelHandler[] getServerChannelHandlers() {
PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
partitionProvider, taskEventDispatcher, queueOfPartitionQueues, networkbufferPool);
return new ChannelHandler[] {
messageEncoder,
createFrameLengthDecoder(),
messageDecoder,
serverHandler,
queueOfPartitionQueues
};
}

从上面的代码可见,queueOfPartitionQueues这一实例既作为参数传入PartitionRequestServerHandler的构造器又在ChannelHandler数组中充当处理器。而此处的queueOfPartitionQueues跟PartitionRequestServerHandler中的outboundQueue指向同一个对象。而因为enqueue方法的调用者是PartitionRequestServerHandler的实例方法,所以,下一个ChannelInboundHandler的实例其实就是这里的outboundQueue本身。

所以,fireUserEventTriggered方法的调用,将会触发同一个PartitionRequestQueue实例的userEventTriggered方法。在userEventTriggered方法的实现中,也是按照不同的消息类型来区分处理的。首先当然是SequenceNumberingSubpartitionView类型:

1
2
3
4
5
6
7
8
9
if (msg.getClass() == SequenceNumberingSubpartitionView.class) {
boolean triggerWrite = queue.isEmpty();
//将消息强制转型并加入队列
queue.add((SequenceNumberingSubpartitionView) msg);
//如果队列在消息加入前是空的,则说明可以响应消息给客户端了
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
}

看完了enqueue方法,下面我们来看cancel如何实现:

1
2
3
public void cancel(InputChannelID receiverId) {
ctx.pipeline().fireUserEventTriggered(receiverId);
}

该调用对应了userEventTriggered中的另一段处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
else if (msg.getClass() == InputChannelID.class) {
InputChannelID toCancel = (InputChannelID) msg;
//如果当前InputChannelID已包含在释放过的集合中,那么直接返回
if (released.contains(toCancel)) {
return;
}
//如果当前的结果子分区视图不为空且其接收者编号跟当前待取消的编号相等,则释放相关资源,并将该编号加入已释放集合
if (currentPartitionQueue != null && currentPartitionQueue.getReceiverId().equals(toCancel)) {
currentPartitionQueue.releaseAllResources();
markAsReleased(currentPartitionQueue.receiverId);
currentPartitionQueue = null;
}
else {
int size = queue.size();
//遍历队列,将接收者编号跟当前准备取消的InputChannelID进行比较,
//如果相等则对视图的相关资源进行释放同时将编号加入已释放集合
for (int i = 0; i < size; i++) {
SequenceNumberingSubpartitionView curr = queue.poll();
if (curr.getReceiverId().equals(toCancel)) {
curr.releaseAllResources();
markAsReleased(curr.receiverId);
}
else {
queue.add(curr);
}
}
}
}

接下来,我们来分析一下处理器输出响应消息的writeAndFlushNextMessageIfPossible方法。在分析该方法的实现之前,我们先看一下,该方法何时会触发?当前在PartitionRequestQueue中该方法共有三个调用点。

第一个调用点位于ChannelInboundHandler的channelWritabilityChanged事件回调方法中。

channelWritabilityChanged方法是ChannelInboundHandler的接口方法,当Channel的可写状态发生改变时会被调用。Channel的isWritable()方法可以用来检测其可写性。可写性的阈值范围可以通过Channel.config().setWriteHighWaterMark()以及Channel.config().setWriteLowWaterMark()进行设置。

第二个调用点位于userEventTriggered回调方法中,这在我们上文分析该方法时已经提及过。

第三个调用点处于PartitionRequestQueue内部对ChannelFutureListener接口的实现类WriteAndFlushNextMessageIfPossibleListener中。

ChannelFutureListener用于注册到ChannelFuture中,当I/O操作完成之后,会触发对其方法operationComplete的调用。

而WriteAndFlushNextMessageIfPossibleListener的实现,就是在其operationComplete方法中触发了对writeAndFlushNextMessageIfPossible方法的调用。那么WriteAndFlushNextMessageIfPossibleListener何时会被注册到ChannelFuture呢,毕竟不注册是不会触发operationComplete的。而注册点正好位于writeAndFlushNextMessageIfPossible的实现中。

现在,我们就来分析该方法的实现,其核心代码段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//如果channel的状态为可写才会继续执行如下逻辑
if (channel.isWritable()) {
while (true) {
//如果当前结果子分区视图为空,同时队列里也没有待处理的记录了,则退出循环
if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
return;
}
//从结果子分区视图获得待响应的原始数据
buffer = currentPartitionQueue.getNextBuffer();
//如果为null,则不做响应,继续循环处理队列中的记录
if (buffer == null) {
if (currentPartitionQueue.registerListener(null)) {
currentPartitionQueue = null;
}
else if (currentPartitionQueue.isReleased()) {
markAsReleased(currentPartitionQueue.getReceiverId());
Throwable cause = currentPartitionQueue.getFailureCause();
if (cause != null) {
ctx.writeAndFlush(new NettyMessage.ErrorResponse(
new ProducerFailedException(cause),
currentPartitionQueue.receiverId));
}
currentPartitionQueue = null;
}
}
//buffer不为null,给予客户端响应
else {
//构建出最终的响应对象,这里就能看出,为什么要实现SequenceNumberingSubpartitionView这一包装器了
//因为这里用到了sequenceNumber以及receiverId
BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(),
currentPartitionQueue.getReceiverId());
//如果该Buffer并不是数据,而是表示子分区消费结束的事件,则会进行特殊的处理
if (!buffer.isBuffer() &&
EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() ==
EndOfPartitionEvent.class) {
//通知子分区消费完成,并释放相关资源
currentPartitionQueue.notifySubpartitionConsumed();
currentPartitionQueue.releaseAllResources();
markAsReleased(currentPartitionQueue.getReceiverId());
currentPartitionQueue = null;
}
//将响应对象写入网络准备发送给请求客户端,这里就是第三个调用点中注册ChannelFutureListener的位置了
//等到Netty的I/O线程处理完成后,将会触发writeAndFlushNextMessageIfPossible被再次调用
//从而形成了处理数据与注册回调之间的循环
channel.writeAndFlush(resp).addListener(writeListener);
return;
}
}
}

以上就是PartitionRequestQueue的核心逻辑,它自身不是队列结构的实现,但是它内部采用队列来对用于响应数据的ResultSubpartitionView进行缓冲,从而保证了服务端的响应速度处于合适的范围。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group