Flink例項(五十六):自定義時間和視窗的操作符(十一)TimestampAssigner介面 (二)生成水印的三個過載方法
阿新 • • 發佈:2020-10-24
https://www.cnblogs.com/Springmoon-venn/p/11403665.html
Timestamp 和Watermark 的概念:
1. Timestamp和Watermark都是基於事件的時間欄位生成的 2. Timestamp和Watermark是兩個不同的東西,並且一旦生成都跟事件資料沒有關係了(所有即使事件中不再包含生成Timestamp和Watermark的欄位也沒關係) 3. 事件資料和 Timestamp 一一對應(事件在流中傳遞以StreamRecord物件表示,value 和 timestamp 是它的兩個成員變數) 4. Watermark 在生成之後與事件資料沒有直接關係,Watermark 作為一個訊息,和事件資料一樣在流中傳遞(Watermark 和StreamRecord 具有相同的父類:StreamElement) 5. Timestamp 與 Watermark 在生成之後,會在下游window運算元中做比較,判斷事件資料是否是過期資料 6. 只有window運算元才會用Watermark判斷事件資料是否過期
Flink 在流上手動生成水印有三個過載的方法(忽略過期的一個)
1 assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]
此方法是資料流的快捷方式,其中已知元素時間戳在每個並行流中單調遞增。在這種情況下,系統可以通過跟蹤上升時間戳自動且完美地生成水印。
val input = env.addSource(source) .map(json => { val id = json.get("id").asText() val createTime = json.get("createTime").asText() val amt= json.get("amt").asText() LateDataEvent("key", id, createTime, amt) }) // flink auto create timestamp & watermark .assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime)
注:這種方法建立時間戳與水印最簡單,返回一個long型別的數字就可以了
2.assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]
基於給定的水印生成器生成水印,即使沒有新元素到達也會定期檢查給定水印生成器的新水印,以指定允許延遲時間
val input = env.addSource(source) .map(json => { val id = json.get("id").asText() val createTime = json.get("createTime").asText() val amt = json.get("amt").asText() LateDataEvent("key", id, createTime, amt) }) // assign timestamp & watermarks periodically(定期生成水印) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) { override def extractTimestamp(element: LateDataEvent): Long = { println("want watermark : " + sdf.parse(element.createTime).getTime) sdf.parse(element.createTime).getTime } })
3.assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]
此方法僅基於流元素建立水印,對於通過[[AssignerWithPunctuatedWatermarks#extractTimestamp(Object,long)]]處理的每個元素,
呼叫[[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]]方法,如果返回的水印值大於以前的水印,會發出新的水印,
此方法可以完全控制水印的生成,但是要注意,每秒生成數百個水印會影響效能
val input = env.addSource(source) .map(json => { val id = json.get("id").asText() val createTime = json.get("createTime").asText() val amt = json.get("amt").asText() LateDataEvent("key", id, createTime, amt) }) // assign timestamp & watermarks every event .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() { // check extractTimestamp emitted watermark is non-null and large than previously override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } // generate next watermark override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = { val eventTime = sdf.parse(element.createTime).getTime eventTime } })
注:本文基於全部事件時間