1. 程式人生 > 實用技巧 >《Flink 原理與實現:詳解 Flink 中的狀態管理》

《Flink 原理與實現:詳解 Flink 中的狀態管理》

使用 Keyed State

首先看一下 Keyed State 下,我們可以用哪些原子狀態:

  • ValueState:即型別為 T 的單值狀態。這個狀態與對應的 key 繫結,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值。
  • ListState:即 key 上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍歷狀態值。
  • ReducingState:這種狀態通過使用者傳入的 reduceFunction,每次呼叫add方法新增值的時候,會呼叫 reduceFunction,最後合併到一個單一的狀態值。
  • FoldingState:跟 ReducingState 有點類似,不過它的狀態值型別可以與add方法中傳入的元素型別不同(這種狀態將會在 Flink 未來版本中被刪除)。
  • MapState:即狀態值為一個 map。使用者通過putputAll方法新增元素。

以上所有的狀態型別,都有一個clear方法,可以清除當前 key 對應的狀態。

需要注意的是,以上所述的 State 物件,僅僅用於與狀態進行互動(更新、刪除、清空等),而真正的狀態值,有可能是存在記憶體、磁碟、或者其他分散式儲存系統中。相當於我們只是持有了這個狀態的控制代碼 (state handle)。

接下來看下,我們如何得到這個狀態控制代碼。Flink 通過StateDescriptor

來定義一個狀態。這是一個抽象類,內部定義了狀態名稱、型別、序列化器等基礎資訊。與上面的狀態對應,從StateDescriptor派生了ValueStateDescriptor,ListStateDescriptor等 descriptor。

具體如下:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)


(4 條訊息)Flink 原理與實現:詳解 Flink 中的狀態管理_xorxos 的專欄 - CSDN 部落格_flink 狀態管理

【本文轉自Flink 原理與實現:詳解 Flink 中的狀態管理

Flink 原理與實現系列文章 :

Flink 原理與實現:架構和拓撲概覽
Flink 原理與實現:如何生成 StreamGraph
Flink 原理與實現:如何生成 JobGraph
Flink 原理與實現:如何生成 ExecutionGraph 及物理執行圖
Flink 原理與實現:Operator Chain 原理

上面 Flink 原理與實現的文章中,有引用 word count 的例子,但是都沒有包含狀態管理。也就是說,如果一個 task 在處理過程中掛掉了,那麼它在記憶體中的狀態都會丟失,所有的資料都需要重新計算。從容錯和訊息處理的語義上 (at least once, exactly once),Flink 引入了 state 和 checkpoint。

首先區分一下兩個概念,state 一般指一個具體的 task/operator 的狀態。而 checkpoint 則表示了一個 Flink Job,在一個特定時刻的一份全域性狀態快照,即包含了所有 task/operator 的狀態。

Flink 通過定期地做 checkpoint 來實現容錯和恢復。

State

Keyed State 和 Operator State

Flink 中包含兩種基礎的狀態:Keyed State 和 Operator State。

Keyed State

顧名思義,就是基於 KeyedStream 上的狀態。這個狀態是跟特定的 key 繫結的,對 KeyedStream 流上的每一個 key,可能都對應一個 state。

Operator State

與 Keyed State 不同,Operator State 跟一個特定 operator 的一個併發例項繫結,整個 operator 只對應一個 state。相比較而言,在一個 operator 上,可能會有很多個 key,從而對應多個 keyed state。

舉例來說,Flink 中的 Kafka Connector,就使用了 operator state。它會在每個 connector 例項中,儲存該例項中消費 topic 的所有 (partition, offset) 對映。

原始狀態和 Flink 託管狀態 (Raw and Managed State)

Keyed State 和 Operator State,可以以兩種形式存在:原始狀態和託管狀態。

託管狀態是由 Flink 框架管理的狀態,如 ValueState, ListState, MapState 等。

下面是 Flink 整個狀態框架的類圖,還是比較複雜的,可以先掃一眼,看到後面再回過來看:

通過框架提供的介面,我們來更新和管理狀態的值。

而 raw state 即原始狀態,由使用者自行管理狀態具體的資料結構,框架在做 checkpoint 的時候,使用 byte[] 來讀寫狀態內容,對其內部資料結構一無所知。

通常在 DataStream 上的狀態推薦使用託管的狀態,當實現一個使用者自定義的 operator 時,會使用到原始狀態。

下文中所提到的狀態,如果沒有特殊說明,均為託管狀態。

使用 Keyed State

首先看一下 Keyed State 下,我們可以用哪些原子狀態:

  • ValueState:即型別為 T 的單值狀態。這個狀態與對應的 key 繫結,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值。
  • ListState:即 key 上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍歷狀態值。
  • ReducingState:這種狀態通過使用者傳入的 reduceFunction,每次呼叫add方法新增值的時候,會呼叫 reduceFunction,最後合併到一個單一的狀態值。
  • FoldingState:跟 ReducingState 有點類似,不過它的狀態值型別可以與add方法中傳入的元素型別不同(這種狀態將會在 Flink 未來版本中被刪除)。
  • MapState:即狀態值為一個 map。使用者通過putputAll方法新增元素。

以上所有的狀態型別,都有一個clear方法,可以清除當前 key 對應的狀態。

需要注意的是,以上所述的 State 物件,僅僅用於與狀態進行互動(更新、刪除、清空等),而真正的狀態值,有可能是存在記憶體、磁碟、或者其他分散式儲存系統中。相當於我們只是持有了這個狀態的控制代碼 (state handle)。

接下來看下,我們如何得到這個狀態控制代碼。Flink 通過StateDescriptor來定義一個狀態。這是一個抽象類,內部定義了狀態名稱、型別、序列化器等基礎資訊。與上面的狀態對應,從StateDescriptor派生了ValueStateDescriptor,ListStateDescriptor等 descriptor。

具體如下:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

接下來我們看一下建立和使用 ValueState 的例子:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
    /**
     * ValueState狀態控制代碼. 第一個值為count,第二個值為sum。
     */
    private transient ValueState<Tuple2<Long, Long>> sum;
 
    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        // 獲取當前狀態值
        Tuple2<Long, Long> currentSum = sum.value();
 
        // 更新
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;
 
        // 更新狀態值
        sum.update(currentSum);
        
        // 如果count >=2 清空狀態值,重新計算
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }
 
    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // 狀態名稱
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 狀態型別
                        Tuple2.of(0L, 0L)); // 狀態預設值
        sum = getRuntimeContext().getState(descriptor);
    }
}
 
// ...
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();
 
// the printed output will be (1,4) and (1,5)

由於狀態需要從RuntimeContext中建立和獲取,因此如果要使用狀態,必須使用 RichFunction。普通的 Function 是無狀態的。

KeyedStream 上的 scala api 則提供了一些語法糖,讓建立和使用狀態更加方便:

val stream: DataStream[(String, Int)] = ...
 
val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

Inside Keyed State

上面以 Keyed State 為例講了如何使用狀態,接下來我們從程式碼層面分析一下,框架在內部做了什麼事情。

先看下上面例子中open方法中獲取狀態控制代碼的程式碼:

    sum = getRuntimeContext().getState(descriptor);

它呼叫了RichFlatMapFunction.getRuntimeContext().getState方法,最終會呼叫StreamingRuntimeContext.getState方法:

    public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
        return keyedStateStore.getState(stateProperties);
    }

checkPreconditionsAndGetKeyedStateStore方法中:

    KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
    return keyedStateStore;

即返回了AbstractStreamOperator.keyedStateStore變數。這個變數的初始化在AbstractStreamOperator.initState方法中:

    private void initKeyedState() {
        try {
            TypeSerializer<Object> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
            // create a keyed state backend if there is keyed state, as indicated by the presence of a key serializer
            if (null != keySerializer) {
                KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
                        container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
 
                long estimatedStateSizeInMB = config.getStateSize();
 
                this.keyedStateBackend = container.createKeyedStateBackend(
                        keySerializer,
                        // The maximum parallelism == number of key group
                        container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
                        subTaskKeyGroupRange,
                        estimatedStateSizeInMB);
 
                this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
            }
 
        // ...
    }

它先呼叫StreamTask.createKeyedStateBackend方法建立 stateBackend,然後將 stateBackend 傳入 DefaultKeyedStateStore。

StreamTask.createKeyedStateBackend方法通過它內部的 stateBackend 來建立 keyed statebackend:

    backend = stateBackend.createKeyedStateBackend(
            getEnvironment(),
            getEnvironment().getJobID(),
            operatorIdentifier,
            keySerializer,
            numberOfKeyGroups,
            keyGroupRange,
            estimatedStateSizeInMB,
            getEnvironment().getTaskKvStateRegistry());

看一下 statebackend 的初始化,在StreamTask.createStateBackend方法中,這個方法會根據配置項state.backend的值建立 backend,其中內建的 backend 有jobmanager,filesystem,rocksdb

jobmanager的 state backend 會把狀態儲存在 job manager 的記憶體中。
filesystem會把狀態存在檔案系統中,有可能是本地檔案系統,也有可能是 HDFS、S3 等分散式檔案系統。
rocksdb會把狀態存在 rocksdb 中。

所以可以看到,建立了 state backend 之後,建立 keyed stated backend,實際上就是呼叫具體的 state backend 來建立。我們以 filesystem 為例,實際就是FsStateBackend.createKeyedStateBackend方法,這個方法也很簡單,直接返回了HeapKeyedStateBackend物件。

先不展開說HeapKeyedStateBackend類,我們返回去看建立 keyed state,最終返回的是DefaultKeyedStateStore物件,它的getState,getListState,getReducingState等方法,都是對底層 keyed state backend 的一層封裝,keyedStateBackend.getPartitionedState來返回具體的 state handle(DefaultKeyedStateStore.getPartitionedState方法)。

這個方法實際呼叫了AbstractKeyedStateBackend.getPartitionedState方法,HeapKeyedStateBackendRocksDBKeyedStateBackend都從這個基類派生。

這個類有一個成員變數:

    private final HashMap<String, InternalKvState<?>> keyValueStatesByName;

它儲存了的一個對映。map value 中的 InternalKvState,實際為建立的HeapValueState,HeapListState,RocksDBValueState,RocksDBListStat等實現。

回到上面AbstractKeyedStateBackend.getPartitionedState,正常的程式碼路徑下,它會呼叫AbstractKeyedStateBackend.getOrCreateKeyedState方法來建立這個 InternalKvState,其方法如下:

        S state = stateDescriptor.bind(new StateBackend() {
            @Override
            public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
            }
 
            @Override
            public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
            }
        // ...

AbstractKeyedStateBackend.createValueStateAbstractKeyedStateBackend.createListState等方法是 AbstractKeyedStateBackend 的抽象方法,具體還是在 HeapKeyedStateBackend、RocksDBKeyedStateBackend 等類中實現的,所以這裡建立的 state 只是一個代理,它 proxy 了具體的上層實現。在我們的例子中,最後繞了一個圈,呼叫的仍然是HeapKeyedStateBackend.createValueState方法,並將 state name 對應的 state handle 放入到 keyValueStatesByName 這個 map 中,保證在一個 task 中只有一個同名的 state handle。

回來看HeapKeyedStateBackend,這個類有一個成員變數:

    private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();

它的 key 為 state name, value 為 StateTable,用來儲存這個 state name 下的狀態值。它會將所有的狀態值儲存在記憶體中。

它的createValueState方法:

        StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
        return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);

即先註冊 StateTable,然後返回一個 HeapValueState。

這裡整理一下從應用層面建立一個 ValueState 的 state handle 的過程:

sum = getRuntimeContext().getState(descriptor) (app code)
  --> RichFlatMapFunction.getRuntimeContext().getState
  --> StreamingRuntimeContext.getState
    --> KeyedStateStore.getState(stateProperties)
    --> AbstractStreamOperator.keyedStateStore.getState
      --> DefaultKeyedStateStore.getState
      --> DefaultKeyedStateStore.getPartitionedState
      --> AbstractKeyedStateBackend.getPartitionedState
      --> AbstractKeyedStateBackend.getOrCreateKeyedState
        --> HeapKeyedStateBackend.createValueState
        --> HeapKeyedStateBackend.tryRegisterStateTable
        --> return new HeapValueState        

而從框架層面看,整個呼叫流程如下:

Task.run
  --> StreamTask.invoke
  --> StreamTask.initializeState
  --> StreamTask.initializeOperators
    --> AbstractStreamOperator.initializeState
    --> AbstractStreamOperator.initKeyedState
      --> StreamTask.createKeyedStateBackend
        --> MemoryStateBackend.createKeyedStateBackend
          --> HeapKeyedStateBackend.<init>

整體來看,建立一個 state handle 還是挺繞的,中間經過了多層封裝和代理。

建立完了 state handle,接下來看看如何獲取和更新狀態值。

首先需要講一下 HeapState 在記憶體中是如何組織的,還是以最簡單的 HeapValueState 為例,
具體的資料結構,是在其基類AbstractHeapState中,以StateTable<K, N, SV> stateTable的形式存在的,其中 K 代表 Key 的型別,N 代表 state 的 namespace(這樣屬於不同 namespace 的 state 可以重名),SV 代表 state value 的型別。

StateTable類內部資料結構如下:

    protected final KeyGroupRange keyGroupRange;
    /** Map for holding the actual state objects. */
    private final List<Map<N, Map<K, ST>>> state;
    /** Combined meta information such as name and serializers for this state */
    protected RegisteredBackendStateMetaInfo<N, ST> metaInfo;

最核心的資料結構是state成員變數,它儲存了一個 list,其值型別為Map<N, Map<K, ST>>,即按 namespace 和 key 分組的兩級 map。那麼它為什麼是一個 list 呢,這裡就要提到keyGroupRange成員變量了,它代表了當前 state 所包含的 key 的一個範圍,這個範圍根據當前的 sub task id 以及最大併發進行計算,在AbstractStreamOperator.initKeyedState方法中:

                KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
                        container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());

舉例來說,如果當前 task 的併發是 2,最大併發是 128,那麼 task-1 所屬的 state backend 的 keyGroupRange 為 [0,63],而 task-2 所屬的 state backend 的 keyGroupRange 為 [64,127]。

這樣,task-1 中的 StateTable.state 這個 list,最大 size 即為 64。獲取特定 key 的 state value 時,會先計算 key 的 hash 值,然後用 hash 值 % 最大併發,這樣會得到一個 [0,127] 之間的 keyGroup,到這個 list 中 get 到這個下標的Map<N, Map<K,V>>值,然後根據 namespace + key 二級獲取到真正的 state value。

看到這裡,有人可能會問,對於一個 key,如何保證在 task-1 中,它計算出來的 keyGroup 一定是在 [0,63] 之間,在 task-2 中一定是在 [64,127] 之間呢?

原因是,在 KeyedStream 中,使用了KeyGroupStreamPartitioner這種 partitioner 來向下遊 task 分發 keys,而這個類過載的selectChannels方法如下:

        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
        return returnArray;

這裡關鍵是KeyGroupRangeAssignment.assignKeyToParallelOperator方法,它中間呼叫了KeyGroupRangeAssignment.assignToKeyGroup方法來確定一個 key 所屬的 keyGroup,這個跟 state backend 計算 keyGroup 是同一個方法。然後根據這個 keyGroup,它會計算出擁有這個 keyGroup 的 task,並將這個 key 傳送到此 task。所以能夠保證,從 KeyedStream 上 emit 到下游 task 的資料,它的 state 所屬的 keyGroup 一定是在當前 task 的 keyGroupRange 中的。

上面已經提到了獲取 ValueState 的值,這裡貼一下程式碼,結合一下就很容易理解了:

        Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
        if (namespaceMap == null) {
            return stateDesc.getDefaultValue();
        }
 
        Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
        if (keyedMap == null) {
            return stateDesc.getDefaultValue();
        }
 
        V result = keyedMap.get(backend.getCurrentKey());
        if (result == null) {
            return stateDesc.getDefaultValue();
        }
 
        return result;

而更新值則通過ValueState.update方法進行更新,這裡就不貼程式碼了。

上面講了最簡單的 ValueState,其他型別的 state,其實也是基本一樣的,只不過 stateTable 中狀態值的型別不同而已。如 HeapListState,它的狀態值型別為 ArrayList;HeapMapState,它的狀態值型別為 HashMap。而值型別的不同,導致了在 State 上的介面也有所不同,如 ListState 會有add方法,MapState 有putget方法。在這裡就不展開說了。

Checkpoint

到上面為止,都是簡單的關於狀態的讀寫,而且狀態都還是隻在 Task 本地,接下來就會涉及到 checkpoint。
所謂 checkpoint,就是在某一時刻,將所有 task 的狀態做一個快照 (snapshot),然後儲存到 memory/file system/rocksdb 等。

關於 Flink 的分散式快照,請參考分散式 Snapshot 和 Flink Checkpointing 簡介及相關論文,這裡不詳述了。

Flink 的 checkpoint,是由CheckpointCoordinator來協調的,它位於 JobMaster 中。但是其實在 ExecutionGraph 中已經建立了,見ExecutionGraph.enableSnapshotCheckpointing方法。

當 Job 狀態切換到 RUNNING 時,CheckpointCoordinatorDeActivator(從 JobStatusListener 派生)會觸發回撥coordinator.startCheckpointScheduler();,根據配置的 checkpoint interval 來定期觸發 checkpoint。

每個 checkpoint 由 checkpoint ID 和 timestamp 來唯一標識,其中 checkpoint ID 可以是 standalone(基於記憶體)的,也可能是基於 ZK 的。
已經完成的 checkpoint,儲存在 CompletedCheckpointStore 中,可以是 StandaloneCompletedCheckpointStore(儲存在 JobMaster 記憶體中),也可以是 ZooKeeperCompletedCheckpointStore(儲存在 ZK 中),甚至是自己實現的 store,比如基於 HDFS 的。

觸發 checkpoint 的方法在 CheckpointCoordinator.ScheduledTrigger 中,只有一行:

    triggerCheckpoint(System.currentTimeMillis(), true);

這個方法比較長,它會先做一系列檢查,如檢查 coordinator 自身的狀態(是否被 shutdown),還會檢查與上次 checkpoint 的時間間隔、當前的併發 checkpoint 數是否超過限制,如果都沒問題,再檢查所有 task 的狀態是否都為 RUNNING,都沒問題之後,觸發每個 Execution 的 checkpoint:

    for (Execution execution: executions) {
        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    }

看下Execution.triggerCheckpoint方法:

    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        final SimpleSlot slot = assignedResource;
 
        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                "no longer running.");
        }
    }

很簡單,通過 RPC 呼叫向 TaskManager 觸發當前 JOB 的 checkpoint,然後一路呼叫下去:

RpcTaskManagerGateway.triggerCheckpoint
  --> TaskExecutorGateway.triggerCheckpoint
  --> TaskExecutor.triggerCheckpoint
    --> task.triggerCheckpointBarrier
    --> StatefulTask.triggerCheckpoint
    --> StreamTask.triggerCheckpoint
    --> StreamTask.performCheckpoint

具體做 checkpoint 的時候,會先向下游廣播 checkpoint barrier,然後呼叫StreamTask.checkpointState方法做具體的 checkpoint,實際會呼叫到StreamTask.executeCheckpointing方法。

checkpoint 裡,具體操作為,遍歷每個 StreamTask 中的所有 operator:

  1. 呼叫 operator 的snapshotState(FSDataOutputStream out, long checkpointId, long timestamp)方法,儲存 operator state,這個結果會返回 operator state handle,儲存於nonPartitionedStates中。這裡實際處理的時候,只有當 user function 實現了Checkpointed介面,才會做 snapshot。需要注意的是,此介面已經 deprecated,被CheckpointedFunction代替,而對CheckpointedFunction的 snapshot 會在下面的第 2 步中來做,因此這兩個介面一般來說是 2 選 1 的。
  2. 呼叫 operator 的snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions)方法,返回OperatorSnapshotResult物件。注意雖然每個 snapshot 方法返回的都是一個 RunnableFuture,不過目前實際上還是同步做的 checkpoint(可以比較容易改成非同步)。

    1. 這裡會先呼叫AbstractStreamOperator.snapshotState方法,為 rich function 做 state snapshot
    2. 呼叫operatorStateBackend.snapshot方法,對 operator state 做 snapshot。
    3. 呼叫keyedStateBackend.snapshot方法,對 keyed state 做 snapshot。
    4. 呼叫timerServiceBackend.snapshot方法,對 processing time/event time window 中註冊的 timer 回撥做 snapshot(恢復狀態的時候必須也要恢復 timer 回撥)
  3. 呼叫StreamTask.runAsyncCheckpointingAndAcknowledge方法確認上面的 snapshot 是否都成功,如果成功,則會向 CheckpointCoordinator 傳送 ack 訊息。
  4. CheckpointCoordinator 收到 ack 訊息後,會檢查本地是否存在這個 pending 的 checkpoint,並且這個 checkpoint 是否超時,如果都 OK,則判斷是否收到所有 task 的 ack 訊息,如果是,則表示已經完成 checkpoint,會得到一個 CompletedCheckpoint 並加入到 completedCheckpointStore 中。

在上面的 checkpoint 過程中,如果 state backend 選擇的是 jobmanager,那麼最終返回的 state handle 為 ByteStreamStateHandle,這個 state handle 中包含了 snapshot 後的所有狀態資料。而如果是 filesystem,則 state handle 只會包含資料的檔案控制代碼,資料則在 filesystem 中,這個下面會再細說。

Filesystem State Backend

上面提到的都是比較簡單的基於記憶體的 state backend,在實際生產中是不太可行的。因此一般會使用 filesystem 或者 rocksdb 的 state backend。我們先講一下基於 filesystem 的 state backend。

基於記憶體的 state backend 實現為MemoryStateBackend,基於檔案系統的 state backend 的實現為FsStateBackend。FsStateBackend 有一個策略,當狀態的大小小於 1MB(可配置,最大 1MB)時,會把狀態資料直接儲存在 meta data file 中,避免出現很小的狀態檔案。

FsStateBackend 另外一個成員變數就是basePath,即 checkpoint 的路徑。實際做 checkpoint 時,生成的路徑為:<base-path>/<job-id>/chk-<checkpoint-id>/

而且 filesystem 推薦使用分散式檔案系統,如 HDFS 等,這樣在 fail over 時可以恢復,如果是本地的 filesystem,那恢復的時候是會有問題的。

回到 StreamTask,在做 checkpoint 的時候,是通過CheckpointStateOutputStream寫狀態的,FsStateBack 會使用FsCheckpointStreamFactory,然後通過FsCheckpointStateOutputStream去寫具體的狀態,這個實現也比較簡單,就是一個帶 buffer 的寫檔案系統操作。最後向上層返回的 StreamStateHandle,視狀態的大小,如果狀態特別小,則會直接返回帶狀態資料的ByteStreamStateHandle,否則會返回FileStateHandle,這個 state handle 包含了狀態檔名和大小。

需要注意的是,雖然 checkpoint 是寫入到檔案系統中,但是基於 FsStateBackend 建立的 keyed state backend,仍然是HeapKeyedStateBackend,也就是說,keyed state 的讀寫仍然是會在記憶體中的,只有在做 checkpoint 的時候才會持久化到檔案系統中。

RocksDB State Backend

RocksDB 跟上面的都略有不同,它會在本地檔案系統中維護狀態,KeyedStateBackend 等會直接寫入本地 rocksdb 中。同時它需要配置一個遠端的 filesystem uri(一般是 HDFS),在做 checkpoint 的時候,會把本地的資料直接複製到 filesystem 中。fail over 的時候從 filesystem 中恢復到本地。

從 RocksDBStateBackend 創建出來的 RocksDBKeyedStateBackend,更新的時候會直接以 key + namespace 作為 key,然後把具體的值更新到 rocksdb 中。

如果是 ReducingState,則在add的時候,會先從 rocksdb 中讀取已有的值,然後根據使用者的 reduce function 進行 reduce,再把新值寫入 rocksdb。

做 checkpoint 的時候,會首先在本地對 rockdb 做 checkpoint(rocksdb 自帶的 checkpoint 功能),這一步是同步的。然後將 checkpoint 非同步複製到遠端檔案系統中。最後返回RocksDBStateHandle

RocksDB 克服了 HeapKeyedStateBackend 受記憶體限制的缺點,同時又能夠持久化到遠端檔案系統中,比較適合在生產中使用。

Queryable State

Queryable State,顧名思義,就是可查詢的狀態,表示這個狀態,在流計算的過程中就可以被查詢,而不像其他流計算框架,需要儲存到外部系統中才能被查詢。目前可查詢的 state 主要針對 partitionable state,如 keyed state 等。

簡單來說,當用戶在 job 中定義了 queryable state 之後,就可以在外部,通過QueryableStateClient,通過 job id, state name, key 來查詢所對應的狀態的實時的值。

queryable state 目前支援兩種方法來定義:

  • 通過KeyedStream.asQueryableState方法,生成一個 QueryableStream,需要注意的是,這個 stream 類似於一個 sink,是不能再做 transform 的。 實現上,生成 QueryableStream 就是為當前 stream 加上一個 operator:QueryableAppendingStateOperator,它的processElement方法,每來一個元素,就會呼叫state.add去更新狀態。因此這種方式有一個限制,只能使用 ValueDescriptor, FoldingStateDescriptor 或者 ReducingStateDescriptor,而不能是 ListStateDescriptor,因為它可能會無限增長導致 OOM。此外,由於不能在 stream 後面再做 transform,也是有一些限制。
  • 通過 managed keyed state。

      ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
    new ValueStateDescriptor<>(
          "average", // the state name
          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
          Tuple2.of(0L, 0L)); 
      descriptor.setQueryable("query-name"); // queryable state name

這個只需要將具體的 state descriptor 標識為 queryable 即可,這意味著可以將一個 pipeline 中間的 operator 的 state 標識為可查詢的。

首先根據 state descriptor 的配置,會在具體的 TaskManager 中建立一個 KvStateServer,用於 state 查詢,它就是一個簡單的 netty server,通過KvStateServerHandler來處理請求,查詢 state value 並返回。

但是一個 partitionable state,可能存在於多個 TaskManager 中,因此需要有一個路由機制,當 QueryableStateClient 給定一個 query name 和 key 時,要能夠知道具體去哪個 TaskManager 中查詢。

為了做到這點,在 Job 的 ExecutionGraph(JobMaster)上會有一個用於定位 KvStateServer 的 KvStateLocationRegistry,當在 TaskManager 中註冊了一個 queryable KvStateServer 時,就會呼叫JobMaster.notifyKvStateRegistered,通知 JobMaster。

具體流程如下圖:

這個設計看起來很美好,通過向流計算實時查詢狀態資料,免去了傳統的儲存等的開銷。但實際上,除了上面提到的狀態型別的限制之外,也會受 netty server 以及 state backend 本身的效能限制,因此並不適用於高併發的查詢。

參考資料:

  1. Dynamic Scaling: Key Groups
  2. Stateful Stream Processing
  3. Working with State
  4. Scaling to large state
  5. Queryable state design doc