1. 程式人生 > 實用技巧 >Flink例項(114):自定義時間和視窗的操作符(十三)自定義視窗分配器之設定視窗開始與結束時刻

Flink例項(114):自定義時間和視窗的操作符(十三)自定義視窗分配器之設定視窗開始與結束時刻

1.自定義視窗分配器(flink1.11.2)

package com.atguigu.exercise.ETL.caiutil

import java.text.SimpleDateFormat
import java.util
import java.util.{Collections, Date}

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger} import org.apache.flink.streaming.api.windowing.windows.TimeWindow class CustomWindowAssigner[T] extends WindowAssigner[T, TimeWindow]{ override def assignWindows(t: T, timestamp: Long, windowAssignerContext: WindowAssigner.WindowAssignerContext): util.Collection[TimeWindow]
= { var offset: (Long, Long) = null offset = getTimestampFromFiveMinute(timestamp) //分配視窗 Collections.singletonList(new TimeWindow(offset._1, offset._2)) } //注意此處需要進行型別的轉換,否則或編譯出錯,java版本好像沒問題,但是java對於上面的offset處理有點難搞,所以放棄了 override def getDefaultTrigger(streamExecutionEnvironment: StreamExecutionEnvironment): Trigger[T, TimeWindow] = EventTimeTrigger.create().asInstanceOf[Trigger[T, TimeWindow]] override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow]
= new TimeWindow.Serializer //是否使用事件時間 override def isEventTime: Boolean = true /** * 獲取指定時間戳五分鐘範圍 * * @param timestamp 時間戳 * @return */ def getTimestampFromFiveMinute(timestamp: Long): (Long, Long) ={ val timeString= getByinterMinute(timestamp+"") val dateFormat = new SimpleDateFormat("yyyyMMddHHmm") val start_date = dateFormat.parse(timeString._1) val end_date = dateFormat.parse(timeString._2) (start_date.getTime,end_date.getTime) } def getByinterMinute(timeinfo:String): (String,String)={ val timeMillons = timeinfo.toLong val date = new Date(timeMillons) val dateFormatMinute = new SimpleDateFormat("mm") val dateFormatHour = new SimpleDateFormat("yyyyMMddHH") val minute = dateFormatMinute.format(date) val hour = dateFormatHour.format(date) val minuteLong = minute.toLong var endMinute = "" var startMinute = "" if(minuteLong >= 0 && minuteLong <5){//0-5 startMinute = "00" endMinute = "05" }else if (minuteLong >= 5 && minuteLong <10){ startMinute = "05" endMinute = "10" }else if (minuteLong >= 10 && minuteLong <15){ startMinute = "10" endMinute = "15" }else if (minuteLong >= 15 && minuteLong <20){ startMinute = "15" endMinute = "20" }else if (minuteLong >= 20 && minuteLong <25){ startMinute = "20" endMinute = "25" }else if (minuteLong >= 25 && minuteLong <30){ startMinute = "25" endMinute = "30" }else if (minuteLong >= 30 && minuteLong <35){ startMinute = "30" endMinute = "35" }else if (minuteLong >= 35 && minuteLong <40){ startMinute = "35" endMinute = "40" }else if (minuteLong >= 40 && minuteLong <45){ startMinute = "40" endMinute = "45" }else if (minuteLong >= 45 && minuteLong <50){ startMinute = "45" endMinute = "50" }else if (minuteLong >= 50 && minuteLong <55){ startMinute = "50" endMinute = "55" }else if (minuteLong >= 55 && minuteLong <60){ startMinute = "55" endMinute = "60" } val endTime = hour+endMinute // 視窗結束時間 val startTime = hour+startMinute //視窗開始時間 (startTime,endTime) } def main(args: Array[String]): Unit = { val testtime = getTimestampFromFiveMinute(1536268066000L) } }

2 主程式

package com.atguigu.exercise.day4

import java.time.Duration

import com.atguigu.day2.{SensorReading, SensorSource}
import com.atguigu.exercise.ETL.caiutil.CustomWindowAssigner
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object CustomWindowTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.addSource(new SensorSource).assignTimestampsAndWatermarks(
      WatermarkStrategy
        .forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
          override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp
        }
        )
    )

    stream
      .keyBy(data => data.id)
      .window(new CustomWindowAssigner)
        .process(new ProcessWindowFunction[SensorReading,String,String,TimeWindow](){
          override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[String]): Unit = {
            val startTime = context.window.getStart
            val endTime = context.window.getEnd
            val timeString = "startTime" + startTime +" "+"endTime"+endTime
            out.collect(timeString)
          }
        }
     )
        .print()



    env.execute()


  }

}