1. 程式人生 > 實用技巧 >Flink例項(十三):Flink DataStream 八大分割槽策略與自定義分割槽器

Flink例項(十三):Flink DataStream 八大分割槽策略與自定義分割槽器

分割槽策略決定了一條資料如何傳送給下游。Flink中預設提供了八大分割槽策略(也叫分割槽器)。

本文基於Flink 1.9.0總結Flink DataStream中的八大分割槽策略以及手動實現一個自定義分割槽器。

八大分割槽策略繼承關係圖

  • ChannelSelector: 介面,決定將記錄寫入哪個Channel。有3個方法:
  1. void setup(int numberOfChannels): 初始化輸出Channel的數量。
  2. int selectChannel(T record): 根據當前記錄以及Channel總數,決定應將記錄寫入下游哪個Channel。八大分割槽策略的區別主要在這個方法的實現上。
  3. boolean isBroadcast(): 是否是廣播模式。決定了是否將記錄寫入下游所有Channel
  • StreamPartitioner:抽象類,也是所有流分割槽器GlobalPartitioner,ShufflePartitioner,RebalancePartitioner,RescalePartitioner,BroadcastPartitioner,ForwardPartitioner,KeyGroupStreamPartitioner,CustomPartitioner的基類。

注意:

  1. 這裡以及下邊提到的Channel可簡單理解為下游Operator的某個例項。
  2. Flink 中改變並行度,預設RebalancePartitioner
    分割槽策略。
  3. 分割槽策略,可在Flink WebUI上直觀看出,如REBALANCE,即使用了RebalancePartitioner分割槽策略;SHUFFLE,即使用了ShufflePartitioner分割槽策略。

GlobalPartitioner: DataStream => DataStream

GlobalPartitioner,GLOBAL分割槽。將記錄輸出到下游Operator的第一個例項。

selectChannel實現

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        
//對每條記錄,只選擇下游operator的第一個Channel return 0; }

API使用

dataStream
    .setParallelism(2)
    // 採用GLOBAL分割槽策略重分割槽
    .global()
    .print()
    .setParallelism(1);

ShufflePartitioner: DataStream => DataStream

ShufflePartitionerSHUFFLE分割槽。將記錄隨機輸出到下游Operator的每個例項。

selectChannel實現

private Random random = new Random();

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    //對每條記錄,隨機選擇下游operator的某個Channel
    return random.nextInt(numberOfChannels);
}

API使用

dataStream
    .setParallelism(2)
    // 採用SHUFFLE分割槽策略重分割槽
    .shuffle()
    .print()
    .setParallelism(4);

RebalancePartitioner: DataStream => DataStream

RebalancePartitioner,REBALANCE分割槽。將記錄以迴圈的方式輸出到下游Operator的每個例項。

selectChannel實現

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    //第一條記錄,輸出到下游的第一個Channel;第二條記錄,輸出到下游的第二個Channel...如此迴圈
    nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
    return nextChannelToSendTo;
}

API使用

dataStream
        .setParallelism(2)
        // 採用REBALANCE分割槽策略重分割槽
        .rebalance()
        .print()
        .setParallelism(4);

RescalePartitioner: DataStream => DataStream

RescalePartitioner,RESCALE分割槽。基於上下游Operator的並行度,將記錄以迴圈的方式輸出到下游Operator的每個例項。舉例: 上游並行度是2,下游是4,則上游一個並行度以迴圈的方式將記錄輸出到下游的兩個並行度上;上游另一個並行度以迴圈的方式將記錄輸出到下游另兩個並行度上。若上游並行度是4,下游並行度是2,則上游兩個並行度將記錄輸出到下游一個並行度上;上游另兩個並行度將記錄輸出到下游另一個並行度上。

selectChannel實現

private int nextChannelToSendTo = -1;

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    if (++nextChannelToSendTo >= numberOfChannels) {
        nextChannelToSendTo = 0;
    }
    return nextChannelToSendTo;
}

API示例

dataStream
    .setParallelism(2)
    // 採用RESCALE分割槽策略重分割槽
    .rescale()
    .print()
    .setParallelism(4);

BroadcastPartitioner: DataStream => DataStream

BroadcastPartitioner,BROADCAST分割槽。廣播分割槽將上游資料集輸出到下游Operator的每個例項中。適合於大資料集Join小資料集的場景。

selectChannel實現

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    //廣播分割槽不支援選擇Channel,因為會輸出到下游每個Channel中
    throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
}

@Override
public boolean isBroadcast() {
    //啟用廣播模式,此時Channel選擇器會選擇下游所有Channel
    return true;
}

API示例

dataStream
    .setParallelism(2)
    // 採用BROADCAST分割槽策略重分割槽
    .broadcast()
    .print()
    .setParallelism(4);

ForwardPartitioner

ForwardPartitioner,FORWARD分割槽。將記錄輸出到下游本地的operator例項。ForwardPartitioner分割槽器要求上下游運算元並行度一樣。上下游Operator同屬一個SubTasks

selectChannel實現

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return 0;
}

API示例

dataStream
    .setParallelism(2)
    // 採用FORWARD分割槽策略重分割槽
    .forward()
    .print()
    .setParallelism(2);

KeyGroupStreamPartitioner(HASH方式):

KeyGroupStreamPartitioner,HASH分割槽。將記錄按Key的Hash值輸出到下游Operator例項。

selectChannel實現

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    K key;
    try {
        key = keySelector.getKey(record.getInstance().getValue());
    } catch (Exception e) {
        throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
    }
    return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}

// KeyGroupRangeAssignment中的方法
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}

// KeyGroupRangeAssignment中的方法
public static int assignToKeyGroup(Object key, int maxParallelism) {
    return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}

// KeyGroupRangeAssignment中的方法
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
    return MathUtils.murmurHash(keyHash) % maxParallelism;
}

API示例

dataStream
    .setParallelism(2)
    // 採用HASH分割槽策略重分割槽
    .keyBy((KeySelector<Tuple3<String, Integer, String>, String>) value -> value.f0)
    .print()
    .setParallelism(4);

CustomPartitionerWrapper

CustomPartitionerWrapper,CUSTOM分割槽。通過Partitioner例項的partition方法(自定義的)將記錄輸出到下游。

selectChannel實現

Partitioner<K> partitioner;
KeySelector<T, K> keySelector;
public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
    this.partitioner = partitioner;
    this.keySelector = keySelector;
}
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    K key;
    try {
        key = keySelector.getKey(record.getInstance().getValue());
    } catch (Exception e) {
        throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
    }
    return partitioner.partition(key, numberOfChannels);
}

自定義分割槽器將指定的Key分到指定的分割槽

// 自定義分割槽器,將不同的Key(使用者ID)分到指定的分割槽
// key: 根據key的值來分割槽
// numPartitions: 下游運算元並行度
static class CustomPartitioner implements Partitioner<String> {
      @Override
      public int partition(String key, int numPartitions) {
          switch (key){
              case "user_1":
                  return 0;
              case "user_2":
                  return 1;
              case "user_3":
                  return 2;
              default:
                  return 3;
          }
      }
  }

使用自定義分割槽器

dataStream
    .setParallelism(2)
    // 採用CUSTOM分割槽策略重分割槽
    .partitionCustom(new CustomPartitioner(),0)
    .print()
    .setParallelism(4);