Apache Flink流分区器剖析

文章目录
  1. 1. StreamPartitioner
    1. 1.1. GlobalPartitioner
    2. 1.2. ForwardPartitioner
    3. 1.3. ShufflePartitioner
    4. 1.4. HashPartitioner
    5. 1.5. BroadcastPartitioner
    6. 1.6. RebalancePartitioner
    7. 1.7. RescalePartitioner
    8. 1.8. CustomPartitionerWrapper
  2. 2. 小结

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向。

StreamPartitioner

StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法:

1
public abstract StreamPartitioner<T> copy();

但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于——各个分区器需要实现channel选择的接口方法:

1
int[] selectChannels(T record, int numChannels);

该方法针对当前的record以及所有的channel数目,返回一个针对当前这条记录采用的output channel的索引数组。(注意这里返回的是数组,说明一个记录可能会输出到多个channel这点我们后面会谈到)。

该接口方法来自于StreamPartitioner实现的接口ChannelSelector

分区器整体类图:

flink-data-stream-partitioner_class-diagram

GlobalPartitioner

全局分区器,其实现很简单——默认选择了索引为0的channel进行输出。

1
2
3
4
5
6
private int[] returnArray = new int[] { 0 };
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,int numberOfOutputChannels) {
return returnArray;
}

ForwardPartitioner

该分区器将记录转发给在本地运行的下游的(归属于subtask)的operattion。其实现跟上面的GlobalPartitioner一致,就不贴代码了。

ShufflePartitioner

混洗分区器,该分区器会在所有output channel中选择一个随机的进行输出。

1
2
3
4
5
6
7
private int[] returnArray = new int[1];
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,int numberOfOutputChannels) {
returnArray[0] = random.nextInt(numberOfOutputChannels);
return returnArray;
}

HashPartitioner

hash分区器,该分区器对key进行hash后计算得到channel索引。它通过构造器获得KeySelector的实例(该实例用来获取当前记录的key)。

获得key后,通过其hashcodenumberOfOutputChannels取模后计算得出最终输出的channel的索引。

1
2
3
4
5
6
7
8
9
10
11
12
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
Object key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;
return returnArray;
}

BroadcastPartitioner

广播分区器,用于将该记录广播给下游的所有的subtask。这里采用了两个标记:

  • set
  • setNumber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
if (set && setNumber == numberOfOutputChannels) {
return returnArray;
} else {
this.returnArray = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; i++) {
returnArray[i] = i;
}
set = true;
setNumber = numberOfOutputChannels;
return returnArray;
}
}

从上面的实现可见,它返回了一个跟numberOfOutputChannels相等的数组(数组的大小就是即将输出到channel的个数)。

RebalancePartitioner

重平衡分区器,用于实现类似于round-robin这样的轮转模式的分区器。通过累加、取模的形式来实现对输出channel的切换。

1
2
3
4
5
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
return this.returnArray;
}

RescalePartitioner

也是以round-robin的形式将元素分区到下游subtask的子集中。

上游操作所发送的元素被分区到下游操作的哪些子集,依赖于上游和下游操作的并行度。例如,如果上游操作的并行度为2,而下游操作的并行度为4,那么一个上游操作会分发元素给两个下游操作,同时另一个上游操作会分发给另两个下游操作。相反的,如果下游操作的并行度为2,而上游操作的并行度为4,那么两个上游操作会分发数据给一个下游操作,同时另两个上游操作会分发数据给另一个下游操作。

在上下游的并行度不是呈倍数关系的情况下,下游操作会有数量不同的来自上游操作的输入。具体的实现代码同RebalancePartitioner

CustomPartitionerWrapper

自定义分区器包装器,该包装器封装了对于自定义的分区器的实现。自定义的分区测量依赖于Partitioner接口。它提供了自定义分区器的契约。核心接口方法是:

1
2
3
4
5
6
7
8
/**
* Computes the partition for the given key.
*
* @param key The key.
* @param numPartitions The number of partitions to partition into.
* @return The partition index.
*/
int partition(K key, int numPartitions);

该接口方法的描述很清晰,通过给定的key以及numPartitions返回partition的index.

CustomPartitionerWrapper通过构造器注入Partitioner的实例,然后在selectChannels方法中通过partition接口来获得最终的channel索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
K key = null;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
}
returnArray[0] = partitioner.partition(key,
numberOfOutputChannels);
return returnArray;
}

小结

以上的这些分区器,最终会体现在DataStream的API中用来对数据流进行物理分区。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group