Flink例項(114):自定義時間和視窗的操作符(十三)自定義視窗分配器之設定視窗開始與結束時刻
阿新 • • 發佈:2020-12-31
1.自定義視窗分配器(flink1.11.2)
package com.atguigu.exercise.ETL.caiutil import java.text.SimpleDateFormat import java.util import java.util.{Collections, Date} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.assigners.WindowAssigner import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger} import org.apache.flink.streaming.api.windowing.windows.TimeWindow class CustomWindowAssigner[T] extends WindowAssigner[T, TimeWindow]{ override def assignWindows(t: T, timestamp: Long, windowAssignerContext: WindowAssigner.WindowAssignerContext): util.Collection[TimeWindow]= { var offset: (Long, Long) = null offset = getTimestampFromFiveMinute(timestamp) //分配視窗 Collections.singletonList(new TimeWindow(offset._1, offset._2)) } //注意此處需要進行型別的轉換,否則或編譯出錯,java版本好像沒問題,但是java對於上面的offset處理有點難搞,所以放棄了 override def getDefaultTrigger(streamExecutionEnvironment: StreamExecutionEnvironment): Trigger[T, TimeWindow] = EventTimeTrigger.create().asInstanceOf[Trigger[T, TimeWindow]] override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow]= new TimeWindow.Serializer //是否使用事件時間 override def isEventTime: Boolean = true /** * 獲取指定時間戳五分鐘範圍 * * @param timestamp 時間戳 * @return */ def getTimestampFromFiveMinute(timestamp: Long): (Long, Long) ={ val timeString= getByinterMinute(timestamp+"") val dateFormat = new SimpleDateFormat("yyyyMMddHHmm") val start_date = dateFormat.parse(timeString._1) val end_date = dateFormat.parse(timeString._2) (start_date.getTime,end_date.getTime) } def getByinterMinute(timeinfo:String): (String,String)={ val timeMillons = timeinfo.toLong val date = new Date(timeMillons) val dateFormatMinute = new SimpleDateFormat("mm") val dateFormatHour = new SimpleDateFormat("yyyyMMddHH") val minute = dateFormatMinute.format(date) val hour = dateFormatHour.format(date) val minuteLong = minute.toLong var endMinute = "" var startMinute = "" if(minuteLong >= 0 && minuteLong <5){//0-5 startMinute = "00" endMinute = "05" }else if (minuteLong >= 5 && minuteLong <10){ startMinute = "05" endMinute = "10" }else if (minuteLong >= 10 && minuteLong <15){ startMinute = "10" endMinute = "15" }else if (minuteLong >= 15 && minuteLong <20){ startMinute = "15" endMinute = "20" }else if (minuteLong >= 20 && minuteLong <25){ startMinute = "20" endMinute = "25" }else if (minuteLong >= 25 && minuteLong <30){ startMinute = "25" endMinute = "30" }else if (minuteLong >= 30 && minuteLong <35){ startMinute = "30" endMinute = "35" }else if (minuteLong >= 35 && minuteLong <40){ startMinute = "35" endMinute = "40" }else if (minuteLong >= 40 && minuteLong <45){ startMinute = "40" endMinute = "45" }else if (minuteLong >= 45 && minuteLong <50){ startMinute = "45" endMinute = "50" }else if (minuteLong >= 50 && minuteLong <55){ startMinute = "50" endMinute = "55" }else if (minuteLong >= 55 && minuteLong <60){ startMinute = "55" endMinute = "60" } val endTime = hour+endMinute // 視窗結束時間 val startTime = hour+startMinute //視窗開始時間 (startTime,endTime) } def main(args: Array[String]): Unit = { val testtime = getTimestampFromFiveMinute(1536268066000L) } }
2 主程式
package com.atguigu.exercise.day4 import java.time.Duration import com.atguigu.day2.{SensorReading, SensorSource} import com.atguigu.exercise.ETL.caiutil.CustomWindowAssigner import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction} import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object CustomWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.addSource(new SensorSource).assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] { override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp } ) ) stream .keyBy(data => data.id) .window(new CustomWindowAssigner) .process(new ProcessWindowFunction[SensorReading,String,String,TimeWindow](){ override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[String]): Unit = { val startTime = context.window.getStart val endTime = context.window.getEnd val timeString = "startTime" + startTime +" "+"endTime"+endTime out.collect(timeString) } } ) .print() env.execute() } }