1. 程式人生 > 程式設計 >Flink 系列(六)—— Flink 視窗模型

Flink 系列(六)—— Flink 視窗模型

一、視窗概念

在大多數場景下,我們需要統計的資料流都是無界的,因此我們無法等待整個資料流終止後才進行統計。通常情況下,我們只需要對某個時間範圍或者數量範圍內的資料進行統計分析:如每隔五分鐘統計一次過去一小時內所有商品的點選量;或者每發生1000次點選後,都去統計一下每個商品點選率的佔比。在 Flink 中,我們使用視窗 (Window) 來實現這類功能。按照統計維度的不同,Flink 中的視窗可以分為 時間視窗 (Time Windows) 和 計數視窗 (Count Windows) 。

二、Time Windows

Time Windows 用於以時間為維度來進行資料聚合,具體分為以下四類:

2.1 Tumbling Windows

滾動視窗 (Tumbling Windows) 是指彼此之間沒有重疊的視窗。例如:每隔1小時統計過去1小時內的商品點選量,那麼 1 天就只能分為 24 個視窗,每個視窗彼此之間是不存在重疊的,具體如下:

https://github.com/heibaiying

這裡我們以詞頻統計為例,給出一個具體的用例,程式碼如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 接收socket上的資料輸入
DataStreamSource<String> streamSource = env.socketTextStream("hadoop001"
,9999,"\n",3); streamSource.flatMap(new FlatMapFunction<String,Tuple2<String,Long>>() { @Override public void flatMap(String value,Collector<Tuple2<String,Long>> out) throws Exception { String[] words = value.split("\t"); for (String word : words) { out.collect(new
Tuple2<>(word,1L)); } } }).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); //每隔3秒統計一次每個單詞出現的數量 env.execute("Flink Streaming"); 複製程式碼

測試結果如下:

https://github.com/heibaiying

2.2 Sliding Windows

滑動視窗用於滾動進行聚合分析,例如:每隔 6 分鐘統計一次過去一小時內所有商品的點選量,那麼統計視窗彼此之間就是存在重疊的,即 1天可以分為 240 個視窗。圖示如下:

https://github.com/heibaiying

可以看到 window 1 - 4 這四個視窗彼此之間都存在著時間相等的重疊部分。想要實現滑動視窗,只需要在使用 timeWindow 方法時額外傳遞第二個引數作為滾動時間即可,具體如下:

// 每隔3秒統計一次過去1分鐘內的資料
timeWindow(Time.minutes(1),Time.seconds(3))
複製程式碼

2.3 Session Windows

當使用者在進行持續瀏覽時,可能每時每刻都會有點選資料,例如在活動區間內,使用者可能頻繁的將某類商品加入和移除購物車,而你只想知道使用者本次瀏覽最終的購物車情況,此時就可以在使用者持有的會話結束後再進行統計。想要實現這類統計,可以通過 Session Windows 來進行實現。

https://github.com/heibaiying

具體的實現程式碼如下:

// 以處理時間為衡量標準,如果10秒內沒有任何資料輸入,就認為會話已經關閉,此時觸發統計
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件時間為衡量標準    
window(EventTimeSessionWindows.withGap(Time.seconds(10)))
複製程式碼

2.4 Global Windows

最後一個視窗是全域性視窗, 全域性視窗會將所有 key 相同的元素分配到同一個視窗中,其通常配合觸發器 (trigger) 進行使用。如果沒有相應觸發器,則計算將不會被執行。

https://github.com/heibaiying

這裡繼續以上面詞頻統計的案例為例,示例程式碼如下:

// 當單詞累計出現的次數每達到10次時,則觸發計算,計算整個視窗內該單詞出現的總數
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();
複製程式碼

三、Count Windows

Count Windows 用於以數量為維度來進行資料聚合,同樣也分為滾動視窗和滑動視窗,實現方式也和時間視窗完全一致,只是呼叫的 API 不同,具體如下:

// 滾動計數視窗,每1000次點選則計算一次
countWindow(1000)
// 滑動計數視窗,每10次點選發生後,則計算過去1000次點選的情況
countWindow(1000,10)
複製程式碼

實際上計數視窗內部就是呼叫的我們上一部分介紹的全域性視窗來實現的,其原始碼如下:

public WindowedStream<T,KEY,GlobalWindow> countWindow(long size) {
    return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}


public WindowedStream<T,GlobalWindow> countWindow(long size,long slide) {
    return window(GlobalWindows.create())
        .evictor(CountEvictor.of(size))
        .trigger(CountTrigger.of(slide));
}
複製程式碼

參考資料

Flink Windows: ci.apache.org/projects/fl…

更多大資料系列文章可以參見 GitHub 開源專案大資料入門指南