Flink例項(十三):Flink DataStream 八大分割槽策略與自定義分割槽器
分割槽策略決定了一條資料如何傳送給下游。Flink中預設提供了八大分割槽策略(也叫分割槽器)。
本文基於Flink 1.9.0總結Flink DataStream中的八大分割槽策略以及手動實現一個自定義分割槽器。
八大分割槽策略繼承關係圖
ChannelSelector
: 介面,決定將記錄寫入哪個Channel
。有3個方法:
void setup(int numberOfChannels)
: 初始化輸出Channel
的數量。int selectChannel(T record)
: 根據當前記錄以及Channel
總數,決定應將記錄寫入下游哪個Channel
。八大分割槽策略的區別主要在這個方法的實現上。boolean isBroadcast()
: 是否是廣播模式。決定了是否將記錄寫入下游所有Channel
。
StreamPartitioner
:抽象類,也是所有流分割槽器GlobalPartitioner
,ShufflePartitioner
,RebalancePartitioner
,RescalePartitioner
,BroadcastPartitioner
,ForwardPartitioner
,KeyGroupStreamPartitioner
,CustomPartitioner
的基類。
注意:
- 這裡以及下邊提到的
Channel
可簡單理解為下游Operator
的某個例項。 - Flink 中改變並行度,預設
RebalancePartitioner
- 分割槽策略,可在Flink WebUI上直觀看出,如
REBALANCE
,即使用了RebalancePartitioner
分割槽策略;SHUFFLE
,即使用了ShufflePartitioner
分割槽策略。
GlobalPartitioner: DataStream => DataStream
GlobalPartitioner
,GLOBAL分割槽
。將記錄輸出到下游Operator
的第一個例項。
selectChannel
實現
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//對每條記錄,只選擇下游operator的第一個Channel return 0; }
API使用
dataStream .setParallelism(2) // 採用GLOBAL分割槽策略重分割槽 .global() .print() .setParallelism(1);
ShufflePartitioner: DataStream => DataStream
ShufflePartitioner
,SHUFFLE分割槽
。將記錄隨機輸出到下游Operator
的每個例項。
selectChannel
實現
private Random random = new Random(); @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //對每條記錄,隨機選擇下游operator的某個Channel return random.nextInt(numberOfChannels); }
API使用
dataStream .setParallelism(2) // 採用SHUFFLE分割槽策略重分割槽 .shuffle() .print() .setParallelism(4);
RebalancePartitioner: DataStream => DataStream
RebalancePartitioner
,REBALANCE分割槽
。將記錄以迴圈的方式輸出到下游Operator
的每個例項。
selectChannel
實現
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //第一條記錄,輸出到下游的第一個Channel;第二條記錄,輸出到下游的第二個Channel...如此迴圈 nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; }
API使用
dataStream .setParallelism(2) // 採用REBALANCE分割槽策略重分割槽 .rebalance() .print() .setParallelism(4);
RescalePartitioner: DataStream => DataStream
RescalePartitioner
,RESCALE分割槽
。基於上下游Operator
的並行度,將記錄以迴圈的方式輸出到下游Operator
的每個例項。舉例: 上游並行度是2,下游是4,則上游一個並行度以迴圈的方式將記錄輸出到下游的兩個並行度上;上游另一個並行度以迴圈的方式將記錄輸出到下游另兩個並行度上。若上游並行度是4,下游並行度是2,則上游兩個並行度將記錄輸出到下游一個並行度上;上游另兩個並行度將記錄輸出到下游另一個並行度上。
selectChannel
實現
private int nextChannelToSendTo = -1; @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { if (++nextChannelToSendTo >= numberOfChannels) { nextChannelToSendTo = 0; } return nextChannelToSendTo; }
API示例
dataStream .setParallelism(2) // 採用RESCALE分割槽策略重分割槽 .rescale() .print() .setParallelism(4);
BroadcastPartitioner: DataStream => DataStream
BroadcastPartitioner
,BROADCAST分割槽
。廣播分割槽將上游資料集輸出到下游Operator
的每個例項中。適合於大資料集Join小資料集的場景。
selectChannel
實現
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //廣播分割槽不支援選擇Channel,因為會輸出到下游每個Channel中 throw new UnsupportedOperationException("Broadcast partitioner does not support select channels."); } @Override public boolean isBroadcast() { //啟用廣播模式,此時Channel選擇器會選擇下游所有Channel return true; }
API示例
dataStream .setParallelism(2) // 採用BROADCAST分割槽策略重分割槽 .broadcast() .print() .setParallelism(4);
ForwardPartitioner
ForwardPartitioner
,FORWARD分割槽
。將記錄輸出到下游本地的operator例項。ForwardPartitioner
分割槽器要求上下游運算元並行度一樣。上下游Operator
同屬一個SubTasks
。
selectChannel
實現
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return 0; }
API示例
dataStream .setParallelism(2) // 採用FORWARD分割槽策略重分割槽 .forward() .print() .setParallelism(2);
KeyGroupStreamPartitioner(HASH方式):
KeyGroupStreamPartitioner
,HASH分割槽
。將記錄按Key的Hash值輸出到下游Operator
例項。
selectChannel
實現
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels); } // KeyGroupRangeAssignment中的方法 public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); } // KeyGroupRangeAssignment中的方法 public static int assignToKeyGroup(Object key, int maxParallelism) { return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); } // KeyGroupRangeAssignment中的方法 public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { return MathUtils.murmurHash(keyHash) % maxParallelism; }
API示例
dataStream .setParallelism(2) // 採用HASH分割槽策略重分割槽 .keyBy((KeySelector<Tuple3<String, Integer, String>, String>) value -> value.f0) .print() .setParallelism(4);
CustomPartitionerWrapper
CustomPartitionerWrapper
,CUSTOM分割槽
。通過Partitioner
例項的partition
方法(自定義的)將記錄輸出到下游。
selectChannel
實現
Partitioner<K> partitioner; KeySelector<T, K> keySelector; public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) { this.partitioner = partitioner; this.keySelector = keySelector; } @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance(), e); } return partitioner.partition(key, numberOfChannels); }
自定義分割槽器將指定的Key分到指定的分割槽
// 自定義分割槽器,將不同的Key(使用者ID)分到指定的分割槽 // key: 根據key的值來分割槽 // numPartitions: 下游運算元並行度 static class CustomPartitioner implements Partitioner<String> { @Override public int partition(String key, int numPartitions) { switch (key){ case "user_1": return 0; case "user_2": return 1; case "user_3": return 2; default: return 3; } } }
使用自定義分割槽器
dataStream .setParallelism(2) // 採用CUSTOM分割槽策略重分割槽 .partitionCustom(new CustomPartitioner(),0) .print() .setParallelism(4);