1. 程式人生 > 程式設計 >Flink 系列(七)—— Flink 狀態管理與檢查點機制

Flink 系列(七)—— Flink 狀態管理與檢查點機制

一、狀態分類

相對於其他流計算框架,Flink 一個比較重要的特性就是其支援有狀態計算。即你可以將中間的計算結果進行儲存,並提供給後續的計算使用:

https://github.com/heibaiying

具體而言,Flink 又將狀態 (State) 分為 Keyed State 與 Operator State:

2.1 運算元狀態

運算元狀態 (Operator State):顧名思義,狀態是和運算元進行繫結的,一個運算元的狀態不能被其他運算元所訪問到。官方檔案上對 Operator State 的解釋是:each operator state is bound to one parallel operator instance,所以更為確切的說一個運算元狀態是與一個併發的運算元例項所繫結的,即假設運算元的並行度是 2,那麼其應有兩個對應的運算元狀態:

https://github.com/heibaiying

2.2 鍵控狀態

鍵控狀態 (Keyed State) :是一種特殊的運算元狀態,即狀態是根據 key 值進行區分的,Flink 會為每類鍵值維護一個狀態例項。如下圖所示,每個顏色代表不同 key 值,對應四個不同的狀態例項。需要注意的是鍵控狀態只能在 KeyedStream 上進行使用,我們可以通過 stream.keyBy(...) 來得到 KeyedStream

https://github.com/heibaiying

二、狀態程式設計

2.1 鍵控狀態

Flink 提供了以下資料格式來管理和儲存鍵控狀態 (Keyed State):

  • ValueState:儲存單值型別的狀態。可以使用 update(T) 進行更新,並通過 T value()
    進行檢索。
  • ListState:儲存列表型別的狀態。可以使用 add(T)addAll(List) 新增元素;並通過 get() 獲得整個列表。
  • ReducingState:用於儲存經過 ReduceFunction 計算後的結果,使用 add(T) 增加元素。
  • AggregatingState:用於儲存經過 AggregatingState 計算後的結果,使用 add(IN) 新增元素。
  • FoldingState:已被標識為廢棄,會在未來版本中移除,官方推薦使用 AggregatingState 代替。
  • MapState:維護 Map 型別的狀態。

以上所有增刪改查方法不必硬記,在使用時通過語法提示來呼叫即可。這裡給出一個具體的使用示例:假設我們正在開發一個監控系統,當監控資料超過閾值一定次數後,需要發出報警資訊。這裡之所以要達到一定次數,是因為由於偶發原因,偶爾一次超過閾值並不能代表什麼,故需要達到一定次數後才觸發報警,這就需要使用到 Flink 的狀態程式設計。相關程式碼如下:

public class ThresholdWarning extends 
    RichFlatMapFunction<Tuple2<String,Long>,Tuple2<String,List<Long>>> {

    // 通過ListState來儲存非正常資料的狀態
    private transient ListState<Long> abnormalData;
    // 需要監控的閾值
    private Long threshold;
    // 觸發報警的次數
    private Integer numberOfTimes;

    ThresholdWarning(Long threshold,Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
    }

    @Override
    public void open(Configuration parameters) {
        // 通過狀態名稱(控制程式碼)獲取狀態例項,如果不存在則會自動建立
        abnormalData = getRuntimeContext().getListState(
            new ListStateDescriptor<>("abnormalData",Long.class));
    }

    @Override
    public void flatMap(Tuple2<String,Long> value,Collector<Tuple2<String,List<Long>>> out)
        throws Exception {
        Long inputValue = value.f1;
        // 如果輸入值超過閾值,則記錄該次不正常的資料資訊
        if (inputValue >= threshold) {
            abnormalData.add(inputValue);
        }
        ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
        // 如果不正常的資料出現達到一定次數,則輸出報警資訊
        if (list.size() >= numberOfTimes) {
            out.collect(Tuple2.of(value.f0 + " 超過指定閾值 ",list));
            // 報警資訊輸出後,清空狀態
            abnormalData.clear();
        }
    }
}
複製程式碼

呼叫自定義的狀態監控,這裡我們使用 a,b 來代表不同型別的監控資料,分別對其資料進行監控:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String,Long>> tuple2DataStreamSource = env.fromElements(
    Tuple2.of("a",50L),Tuple2.of("a",80L),400L),100L),200L),Tuple2.of("b",500L),600L),700L));
tuple2DataStreamSource
    .keyBy(0)
    .flatMap(new ThresholdWarning(100L,3))  // 超過100的閾值3次後就進行報警
    .printToErr();
env.execute("Managed Keyed State");
複製程式碼

輸出如下結果如下:

https://github.com/heibaiying

2.2 狀態有效期

以上任何型別的 keyed state 都支援配置有效期 (TTL) ,示例如下:

StateTtlConfig ttlConfig = StateTtlConfig
    // 設定有效期為 10 秒
    .newBuilder(Time.seconds(10))  
    // 設定有效期更新規則,這裡設定為當建立和寫入時,都重置其有效期到規定的10秒
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) 
    /*設定只要值過期就不可見,另外一個可選值是ReturnExpiredIfNotCleanedUp,
     代表即使值過期了,但如果還沒有被物理刪除,就是可見的*/
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData",Long.class);
descriptor.enableTimeToLive(ttlConfig);
複製程式碼

2.3 運算元狀態

相比於鍵控狀態,運算元狀態目前支援的儲存型別只有以下三種:

  • ListState:儲存列表型別的狀態。
  • UnionListState:儲存列表型別的狀態,與 ListState 的區別在於:如果並行度發生變化,ListState 會將該運算元的所有併發的狀態例項進行彙總,然後均分給新的 Task;而 UnionListState 只是將所有併發的狀態例項彙總起來,具體的劃分行為則由使用者進行定義。
  • BroadcastState:用於廣播的運算元狀態。

這裡我們繼續沿用上面的例子,假設此時我們不需要區分監控資料的型別,只要有監控資料超過閾值並達到指定的次數後,就進行報警,程式碼如下:

public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String,List<Tuple2<String,Long>>>> implements CheckpointedFunction {

    // 非正常資料
    private List<Tuple2<String,Long>> bufferedData;
    // checkPointedState
    private transient ListState<Tuple2<String,Long>> checkPointedState;
    // 需要監控的閾值
    private Long threshold;
    // 次數
    private Integer numberOfTimes;

    ThresholdWarning(Long threshold,Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
        this.bufferedData = new ArrayList<>();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注意這裡獲取的是OperatorStateStore
        checkPointedState = context.getOperatorStateStore().
            getListState(new ListStateDescriptor<>("abnormalData",TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {
                })));
        // 如果發生重啟,則需要從快照中將狀態進行恢復
        if (context.isRestored()) {
            for (Tuple2<String,Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }
    }

    @Override
    public void flatMap(Tuple2<String,List<Tuple2<String,Long>>>> out) {
        Long inputValue = value.f1;
        // 超過閾值則進行記錄
        if (inputValue >= threshold) {
            bufferedData.add(value);
        }
        // 超過指定次數則輸出報警資訊
        if (bufferedData.size() >= numberOfTimes) {
             // 順便輸出狀態例項的hashcode
             out.collect(Tuple2.of(checkPointedState.hashCode() + "閾值警報!",bufferedData));
            bufferedData.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 在進行快照時,將資料儲存到checkPointedState
        checkPointedState.clear();
        for (Tuple2<String,Long> element : bufferedData) {
            checkPointedState.add(element);
        }
    }
}
複製程式碼

呼叫自定義運算元狀態,這裡需要將並行度設定為 1:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟檢查點機制
env.enableCheckpointing(1000);
// 設定並行度為1
DataStreamSource<Tuple2<String,Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
    Tuple2.of("a",700L));
tuple2DataStreamSource
    .flatMap(new ThresholdWarning(100L,3))
    .printToErr();
env.execute("Managed Keyed State");
}
複製程式碼

此時輸出如下:

https://github.com/heibaiying

在上面的呼叫程式碼中,我們將程式的並行度設定為 1,可以看到三次輸出中狀態例項的 hashcode 全是一致的,證明它們都同一個狀態例項。假設將並行度設定為 2,此時輸出如下:

https://github.com/heibaiying

可以看到此時兩次輸出中狀態例項的 hashcode 是不一致的,代表它們不是同一個狀態例項,這也就是上文提到的,一個運算元狀態是與一個併發的運算元例項所繫結的。同時這裡只輸出兩次,是因為在併發處理的情況下,執行緒 1 可能拿到 5 個非正常值,執行緒 2 可能拿到 4 個非正常值,因為要大於 3 次才能輸出,所以在這種情況下就會出現只輸出兩條記錄的情況,所以需要將程式的並行度設定為 1。

三、檢查點機制

3.1 CheckPoints

為了使 Flink 的狀態具有良好的容錯性,Flink 提供了檢查點機制 (CheckPoints) 。通過檢查點機制,Flink 定期在資料流上生成 checkpoint barrier ,當某個運算元收到 barrier 時,即會基於當前狀態生成一份快照,然後再將該 barrier 傳遞到下游運算元,下游運算元接收到該 barrier 後,也基於當前狀態生成一份快照,依次傳遞直至到最後的 Sink 運算元上。當出現異常後,Flink 就可以根據最近的一次的快照資料將所有運算元恢復到先前的狀態。

https://github.com/heibaiying

3.2 開啟檢查點

預設情況下,檢查點機制是關閉的,需要在程式中進行開啟:

// 開啟檢查點機制,並指定狀態檢查點之間的時間間隔
env.enableCheckpointing(1000); 

// 其他可選配置如下:
// 設定語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 設定兩個檢查點之間的最小時間間隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 設定執行Checkpoint操作時的超時時間
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 設定最大併發執行的檢查點的數量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 將檢查點持久化到外部儲存
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 如果有更近的儲存點時,是否將作業回退到該檢查點
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
複製程式碼

3.3 儲存點機制

儲存點機制 (Savepoints) 是檢查點機制的一種特殊的實現,它允許你通過手工的方式來觸發 Checkpoint,並將結果持久化儲存到指定路徑中,主要用於避免 Flink 叢集在重啟或升級時導致狀態丟失。示例如下:

# 觸發指定id的作業的Savepoint,並將結果儲存到指定目錄下
bin/flink savepoint :jobId [:targetDirectory]
複製程式碼

更多命令和配置可以參考官方檔案:savepoints

四、狀態後端

4.1 狀態管理器分類

預設情況下,所有的狀態都儲存在 JVM 的堆記憶體中,在狀態資料過多的情況下,這種方式很有可能導致記憶體溢位,因此 Flink 該提供了其它方式來儲存狀態資料,這些儲存方式統一稱為狀態後端 (或狀態管理器):

https://github.com/heibaiying

主要有以下三種:

1. MemoryStateBackend

預設的方式,即基於 JVM 的堆記憶體進行儲存,主要適用於本地開發和除錯。

2. FsStateBackend

基於檔案系統進行儲存,可以是本地檔案系統,也可以是 HDFS 等分散式檔案系統。 需要注意而是雖然選擇使用了 FsStateBackend ,但正在進行的資料仍然是儲存在 TaskManager 的記憶體中的,只有在 checkpoint 時,才會將狀態快照寫入到指定檔案系統上。

3. RocksDBStateBackend

RocksDBStateBackend 是 Flink 內建的第三方狀態管理器,採用嵌入式的 key-value 型資料庫 RocksDB 來儲存正在進行的資料。等到 checkpoint 時,再將其中的資料持久化到指定的檔案系統中,所以採用 RocksDBStateBackend 時也需要配置持久化儲存的檔案系統。之所以這樣做是因為 RocksDB 作為嵌入式資料庫安全性比較低,但比起全檔案系統的方式,其讀取速率更快;比起全記憶體的方式,其儲存空間更大,因此它是一種比較均衡的方案。

4.2 配置方式

Flink 支援使用兩種方式來配置後端管理器:

第一種方式:基於程式碼方式進行配置,只對當前作業生效:

// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
複製程式碼

配置 RocksDBStateBackend 時,需要額外匯入下面的依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.9.0</version>
</dependency>
複製程式碼

第二種方式:基於 flink-conf.yaml 配置檔案的方式進行配置,對所有部署在該叢集上的作業都生效:

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
複製程式碼

注:本篇文章所有示例程式碼下載地址:flink-state-management

參考資料

更多大資料系列文章可以參見 GitHub 開源專案大資料入門指南