1. 程式人生 > 實用技巧 >Flink例項(二十一):自定義時間和視窗的操作符(二)KeyedProcessFunction(二)

Flink例項(二十一):自定義時間和視窗的操作符(二)KeyedProcessFunction(二)

KeyedProcessFunction

  KeyedProcessFunction用來操作KeyedStream。KeyedProcessFunction會處理流的每一個元素,輸出為0個、1個或者多個元素。所有的Process Function都繼承自RichFunction介面,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]還額外提供了兩個方法:

  • processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一個元素都會呼叫這個方法,呼叫結果將會放在Collector資料型別中輸出。Context可以訪問元素的時間戳,元素的key,以及TimerService時間服務。Context還可以將結果輸出到別的流(side outputs)。
  • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一個回撥函式。當之前註冊的定時器觸發時呼叫。引數timestamp為定時器所設定的觸發的時間戳。Collector為輸出結果的集合。OnTimerContext和processElement的Context引數一樣,提供了上下文的一些資訊,例如firing trigger的時間資訊(事件時間或者處理時間)。

時間服務和定時器

Context和OnTimerContext所持有的TimerService物件擁有以下方法:

  • currentProcessingTime(): Long
    返回當前處理時間
  • currentWatermark(): Long返回當前水位線的時間戳
  • registerProcessingTimeTimer(timestamp: Long): Unit會註冊當前key的processing time的timer。當processing time到達定時時間時,觸發timer。
  • registerEventTimeTimer(timestamp: Long): Unit會註冊當前key的event time timer。當水位線大於等於定時器註冊的時間時,觸發定時器執行回撥函式。
  • deleteProcessingTimeTimer(timestamp: Long): Unit
    刪除之前註冊處理時間定時器。如果沒有這個時間戳的定時器,則不執行。
  • deleteEventTimeTimer(timestamp: Long): Unit刪除之前註冊的事件時間定時器,如果沒有此時間戳的定時器,則不執行。

  當定時器timer觸發時,執行回撥函式onTimer()。processElement()方法和onTimer()方法是同步(不是非同步)方法,這樣可以避免併發訪問和操作狀態。

  針對每一個key和timestamp,只能註冊一個定期器。也就是說,每一個key可以註冊多個定時器,但在每一個時間戳只能註冊一個定時器。KeyedProcessFunction預設將所有定時器的時間戳放在一個優先佇列中。在Flink做檢查點操作時,定時器也會被儲存到狀態後端中。

例項一

舉個例子說明KeyedProcessFunction如何操作KeyedStream。

下面的程式展示瞭如何監控溫度感測器的溫度值,如果溫度值在一秒鐘之內(processing time)連續上升,報警。

scala version

val warnings = readings
  .keyBy(r => r.id)// 此處鍵的型別是String,與接下來一處標紅處對應
  .process(new TempIncreaseAlertFunction)
  class TempIncrease extends KeyedProcessFunction[String, SensorReading, String] {
    // 懶載入;
    // 狀態變數會在檢查點操作時進行持久化,例如hdfs
    // 只會初始化一次,單例模式
    // 在當機重啟程式時,首先去持久化裝置尋找名為`last-temp`的狀態變數,如果存在,則直接讀取。不存在,則初始化。
    // 用來儲存最近一次溫度
    // 預設值是0.0
    lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
      new ValueStateDescriptor[Double]("last-temp", Types.of[Double])
    )

    // 預設值是0L
    lazy val timer: ValueState[Long] = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("timer", Types.of[Long])
    )

    override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
      // 使用`.value()`方法取出最近一次溫度值,如果來的溫度是第一條溫度,則prevTemp為0.0
      val prevTemp = lastTemp.value()
      // 將到來的這條溫度值存入狀態變數中
      lastTemp.update(value.temperature)

      // 如果timer中有定時器的時間戳,則讀取
      val ts = timer.value()

      if (prevTemp == 0.0 || value.temperature < prevTemp) {
        ctx.timerService().deleteProcessingTimeTimer(ts)
        timer.clear()
      } else if (value.temperature > prevTemp && ts == 0) {
        val oneSecondLater = ctx.timerService().currentProcessingTime() + 1000L
        ctx.timerService().registerProcessingTimeTimer(oneSecondLater)
        timer.update(oneSecondLater)
      }
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
      out.collect("感測器ID是 " + ctx.getCurrentKey + " 的感測器的溫度連續1s上升了!")
      timer.clear()
    }
  }

例項二

  通過socketTextStream讀取9999埠資料,統計在一定時間內不同型別商品的銷售總額度,如果持續銷售額度為0,則執行定時器通知老闆,是不是賣某種型別商品的員工偷懶了(只做功能演示,根據個人業務來使用,比如統計UV等操作)

mport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
 
object ProcessFuncationScala {
 
 
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val stream: DataStream[String] = env.socketTextStream("localhost", 9999)
    val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1))).setParallelism(4)
    typeAndData
    .keyBy(
0)//此處鍵的型別是Tuple,與接下來標紅處相對應
    .process(new MyprocessFunction()).print("結果") env.execute() } /** * 實現: * 根據key分類,統計每個key進來的資料量,定期統計數量,如果數量為0則預警 */ class MyprocessFunction extends KeyedProcessFunction[Tuple,(String,String),String]{ //統計間隔時間 val delayTime : Long = 1000 * 10 lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]])) override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = { printf("定時器觸發,時間為:%d,狀態為:%s,key為:%s\n",timestamp,state.value(),ctx.getCurrentKey) if(state.value()._2==0){ //該時間段資料為0,進行預警 printf("型別為:%s,資料為0,預警\n",state.value()._1) } //定期資料統計完成後,清零 state.update(state.value()._1,0) //再次註冊定時器執行 val currentTime: Long = ctx.timerService().currentProcessingTime() ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) } override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = { printf("狀態值:%s,state是否為空:%s\n",state.value(),(state.value()==null)) if(state.value() == null){ //獲取時間 val currentTime: Long = ctx.timerService().currentProcessingTime() //註冊定時器十秒後觸發 ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) printf("定時器註冊時間:%d\n",currentTime+10000L) state.update(value._1,value._2.toInt) } else{ //統計資料 val key: String = state.value()._1 var count: Long = state.value()._2 count += value._2.toInt //更新state值 state.update((key,count)) } println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value) printf("狀態值:%s\n",state.value()) //返回處理後結果 out.collect("處理後返回資料->"+value) } } }