Flink例項(二十一):自定義時間和視窗的操作符(二)KeyedProcessFunction(二)
阿新 • • 發佈:2020-10-08
KeyedProcessFunction
KeyedProcessFunction用來操作KeyedStream。KeyedProcessFunction會處理流的每一個元素,輸出為0個、1個或者多個元素。所有的Process Function都繼承自RichFunction介面,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]還額外提供了兩個方法:
- processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一個元素都會呼叫這個方法,呼叫結果將會放在Collector資料型別中輸出。Context可以訪問元素的時間戳,元素的key,以及TimerService時間服務。Context還可以將結果輸出到別的流(side outputs)。
- onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一個回撥函式。當之前註冊的定時器觸發時呼叫。引數timestamp為定時器所設定的觸發的時間戳。Collector為輸出結果的集合。OnTimerContext和processElement的Context引數一樣,提供了上下文的一些資訊,例如firing trigger的時間資訊(事件時間或者處理時間)。
時間服務和定時器
Context和OnTimerContext所持有的TimerService物件擁有以下方法:
currentProcessingTime(): Long
currentWatermark(): Long
返回當前水位線的時間戳registerProcessingTimeTimer(timestamp: Long): Unit
會註冊當前key的processing time的timer。當processing time到達定時時間時,觸發timer。registerEventTimeTimer(timestamp: Long): Unit
會註冊當前key的event time timer。當水位線大於等於定時器註冊的時間時,觸發定時器執行回撥函式。deleteProcessingTimeTimer(timestamp: Long): Unit
deleteEventTimeTimer(timestamp: Long): Unit
刪除之前註冊的事件時間定時器,如果沒有此時間戳的定時器,則不執行。
當定時器timer觸發時,執行回撥函式onTimer()。processElement()方法和onTimer()方法是同步(不是非同步)方法,這樣可以避免併發訪問和操作狀態。
針對每一個key和timestamp,只能註冊一個定期器。也就是說,每一個key可以註冊多個定時器,但在每一個時間戳只能註冊一個定時器。KeyedProcessFunction預設將所有定時器的時間戳放在一個優先佇列中。在Flink做檢查點操作時,定時器也會被儲存到狀態後端中。
例項一
舉個例子說明KeyedProcessFunction如何操作KeyedStream。
下面的程式展示瞭如何監控溫度感測器的溫度值,如果溫度值在一秒鐘之內(processing time)連續上升,報警。
scala version
val warnings = readings
.keyBy(r => r.id)// 此處鍵的型別是String,與接下來一處標紅處對應
.process(new TempIncreaseAlertFunction)
class TempIncrease extends KeyedProcessFunction[String, SensorReading, String] {
// 懶載入;
// 狀態變數會在檢查點操作時進行持久化,例如hdfs
// 只會初始化一次,單例模式
// 在當機重啟程式時,首先去持久化裝置尋找名為`last-temp`的狀態變數,如果存在,則直接讀取。不存在,則初始化。
// 用來儲存最近一次溫度
// 預設值是0.0
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
new ValueStateDescriptor[Double]("last-temp", Types.of[Double])
)
// 預設值是0L
lazy val timer: ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer", Types.of[Long])
)
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 使用`.value()`方法取出最近一次溫度值,如果來的溫度是第一條溫度,則prevTemp為0.0
val prevTemp = lastTemp.value()
// 將到來的這條溫度值存入狀態變數中
lastTemp.update(value.temperature)
// 如果timer中有定時器的時間戳,則讀取
val ts = timer.value()
if (prevTemp == 0.0 || value.temperature < prevTemp) {
ctx.timerService().deleteProcessingTimeTimer(ts)
timer.clear()
} else if (value.temperature > prevTemp && ts == 0) {
val oneSecondLater = ctx.timerService().currentProcessingTime() + 1000L
ctx.timerService().registerProcessingTimeTimer(oneSecondLater)
timer.update(oneSecondLater)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("感測器ID是 " + ctx.getCurrentKey + " 的感測器的溫度連續1s上升了!")
timer.clear()
}
}
例項二
通過socketTextStream讀取9999埠資料,統計在一定時間內不同型別商品的銷售總額度,如果持續銷售額度為0,則執行定時器通知老闆,是不是賣某種型別商品的員工偷懶了(只做功能演示,根據個人業務來使用,比如統計UV等操作)
mport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object ProcessFuncationScala { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.socketTextStream("localhost", 9999) val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1))).setParallelism(4) typeAndData
.keyBy(0)//此處鍵的型別是Tuple,與接下來標紅處相對應
.process(new MyprocessFunction()).print("結果") env.execute() } /** * 實現: * 根據key分類,統計每個key進來的資料量,定期統計數量,如果數量為0則預警 */ class MyprocessFunction extends KeyedProcessFunction[Tuple,(String,String),String]{ //統計間隔時間 val delayTime : Long = 1000 * 10 lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]])) override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = { printf("定時器觸發,時間為:%d,狀態為:%s,key為:%s\n",timestamp,state.value(),ctx.getCurrentKey) if(state.value()._2==0){ //該時間段資料為0,進行預警 printf("型別為:%s,資料為0,預警\n",state.value()._1) } //定期資料統計完成後,清零 state.update(state.value()._1,0) //再次註冊定時器執行 val currentTime: Long = ctx.timerService().currentProcessingTime() ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) } override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = { printf("狀態值:%s,state是否為空:%s\n",state.value(),(state.value()==null)) if(state.value() == null){ //獲取時間 val currentTime: Long = ctx.timerService().currentProcessingTime() //註冊定時器十秒後觸發 ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) printf("定時器註冊時間:%d\n",currentTime+10000L) state.update(value._1,value._2.toInt) } else{ //統計資料 val key: String = state.value()._1 var count: Long = state.value()._2 count += value._2.toInt //更新state值 state.update((key,count)) } println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value) printf("狀態值:%s\n",state.value()) //返回處理後結果 out.collect("處理後返回資料->"+value) } } }