1. 程式人生 > 實用技巧 >Flink例項(五十五):自定義時間和視窗的操作符(十)TimestampAssigner介面 (一)設定事件時間

Flink例項(五十五):自定義時間和視窗的操作符(十)TimestampAssigner介面 (一)設定事件時間

在flink中設定事件時間時需要將時間的表示轉換為毫秒

如果不需要轉換

def main(args: Array[String]): Unit = {

    // ...
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  // 將時間特性設定為事件時間
    env.setParallelism(1)

    val clickStream = env
      .fromElements(
        UserClickLog("user_2", "1500", "click", "page_1"),
        UserClickLog(
"user_2", "2000", "click", "page_1") ) .assignAscendingTimestamps(_.eventTime.toLong * 1000L) // 選擇事件時間的欄位 // ... }

如果需要轉換

def main(args: Array[String]): Unit = {

    // ...
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 將時間特性設定為事件時間

    val clickStream = env
      .fromElements(
        UserClickLog(
"user_2", "2019-11-16 17:30:00", "click", "page_1") ) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0)) { override def extractTimestamp(t: UserClickLog): Long = { val dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") val dateTime
= DateTime.parse(t.eventTime, dateTimeFormatter) dateTime.getMillis // 返回事件時間 } } ) // ... }

Time.seconds(0):MaxOutOfOrderness延遲時間, 水位線用於延遲視窗的觸發時間