1. 程式人生 > 其它 >Flink sql 之 微批處理與MiniBatchIntervalInferRule (原始碼分析)

Flink sql 之 微批處理與MiniBatchIntervalInferRule (原始碼分析)

本文原始碼基於flink1.14

平臺使用者在使用我們的flinkSql時經常會開啟minaBatch來優化狀態讀寫

所以從原始碼的角度具體解讀一下miniBatch的原理

先看一下flinksql是如何觸發miniBatch的優化的

主要就是這個Calcite的rule了,來具體看一下

在對應的match方法中

會根據miniBatch的型別判斷,是否需要新增一個Assigner的節點

這個assigner是幹嘛的呢?這個Assinger是一個execNode和視窗的assigner是不一樣的,這裡主要是為了傳送水印的

沒錯,miniBatch攢一批的實現原理就是通過水印,來作為一批的標識

來具體看看

分為處理時間和事件時間

先看看處理時間

邏輯比較簡單,就是當前微批的開始時間大於當前水印,就傳送一個當前的微批的開始時間的水印

然後,事件時間的沒什麼意思,就是水印直接往下游轉發了

接著,攢微批已經將完了,來看下具體聚合運算元怎麼優化微批計算的吧

來看個StreamExecGroupAggregate這個聚合ExecNode的邏輯

既然是execNode來直接看它的translateToPlanInternal()方法

原來是直接在execNode裡面做了特殊處理,不過也是,每個運算元的優化都不一樣也不太好抽象出來

這裡還是 先看看不使用微批的時候是怎麼處理的,然後來對比一下

沒用微批這裡是封裝成了一個KeyedProcessOperator的運算元,裡面傳的aggFunction直接就是一個KeyedProcessFunction

看下具體處理groupAggFunction

這裡沒有開minibatch的邏輯比較簡單

每來一條資料,先讀狀態accState是一個valueState然後,呼叫聚合函式的accumlate來計算,然後用新得到的累加器更新狀態

可以看到這樣做的問題還是比較大的

第一,每一條資料都要讀寫狀態開銷很大

第二,每條資料都要呼叫計算,有很多虛擬函式的呼叫

因此,讓我們看看MIniBatch是如何做的吧

回到上面,我們看到MiniBatch是建立的一個KeyedMapBundleOperator,裡面的引數是MiniBatchGroupAggFunction

看下KeyedMapBundleOperator

先從一個bundle獲取和資料同key的資料,來看下這個bundle是什麼

ok,就是一個本地map,然後走addInput()

來看下MiniBatchGroupAggFunction的addInput方法

其實就是把,來的資料加到map對應key的Value是一個list裡面去了

最後來看當微批攢夠觸發onTrigger會走到finishBundle()方法

先從buffer獲取每一個key對應的value是一個list

然後讀取狀態state資料

直接for迴圈遍歷微批的資料

然後呼叫聚合函式的accumulate不停計算

最後將計算好的累加器accumulator存到狀態裡面去

是不是很簡單

這樣微批處理就完成了,減少了狀態的頻繁訪問,是一個很不錯的優化