Flink例項(五十五):自定義時間和視窗的操作符(十)TimestampAssigner介面 (一)設定事件時間
阿新 • • 發佈:2020-10-24
在flink中設定事件時間時需要將時間的表示轉換為毫秒
如果不需要轉換
def main(args: Array[String]): Unit = { // ... env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 將時間特性設定為事件時間 env.setParallelism(1) val clickStream = env .fromElements( UserClickLog("user_2", "1500", "click", "page_1"), UserClickLog("user_2", "2000", "click", "page_1") ) .assignAscendingTimestamps(_.eventTime.toLong * 1000L) // 選擇事件時間的欄位 // ... }
如果需要轉換
def main(args: Array[String]): Unit = { // ... env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 將時間特性設定為事件時間 val clickStream = env .fromElements( UserClickLog("user_2", "2019-11-16 17:30:00", "click", "page_1") ) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0)) { override def extractTimestamp(t: UserClickLog): Long = { val dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") val dateTime= DateTime.parse(t.eventTime, dateTimeFormatter) dateTime.getMillis // 返回事件時間 } } ) // ... }
Time.seconds(0):MaxOutOfOrderness延遲時間, 水位線用於延遲視窗的觸發時間