1. 程式人生 > 實用技巧 >Flink例項(106):自定義時間和視窗的操作符(十二)自定義視窗分配器 周、月

Flink例項(106):自定義時間和視窗的操作符(十二)自定義視窗分配器 周、月

自定義 WindowAssigner

如果我們定義按天、小時、分鐘的滾動視窗都很容易實現。

但是如果我們要定義一週(週日開始或週一),一個月(1號開始)的滾動視窗,那麼現有API基本沒法實現或很難實現。

對此就需要我們實現一個自定義的視窗分配器。

package com.atguigu.exercise.ETLHIVE

import java.text.SimpleDateFormat
import java.util
import java.util.{Calendar, Collections, Date}

import com.meda.utils.DateHelper
import
org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow class CustomWindowAssigner [T](tag: String) extends WindowAssigner[T, TimeWindow]{ //視窗分配的主要方法,需要為每一個元素指定所屬的分割槽 override def assignWindows(t: T, timestamp: Long, windowAssignerContext: WindowAssigner.WindowAssignerContext): util.Collection[TimeWindow] = { var offset: (Long, Long)
= null tag match { case "month" => offset = getTimestampFromMon(timestamp) case "week" => offset = getTimestampFromWeek(timestamp) } //分配視窗 Collections.singletonList(new TimeWindow(offset._1, offset._2)) } //注意此處需要進行型別的轉換,否則或編譯出錯,java版本好像沒問題,但是java對於上面的offset處理有點難搞,所以放棄了 override def getDefaultTrigger(streamExecutionEnvironment: StreamExecutionEnvironment): Trigger[T, TimeWindow] = EventTimeTrigger.create().asInstanceOf[Trigger[T, TimeWindow]] override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer //是否使用事件時間 override def isEventTime: Boolean = true /** * 獲取指定時間戳當月時間戳範圍 * eg:2020-03-12 11:35:13 (timestamp=1583984113960l) * 結果為:(1582992000000,1585670399999)=>(2020-03-01 00:00:00,2020-03-31 23:59:59) * * @param timestamp 時間戳 * @return */ def getTimestampFromMon(timestamp: Long): (Long, Long) = { val calendar = Calendar.getInstance() calendar.setTime(DateHelper.getInstance().getDateFromStr(new SimpleDateFormat("yyyyMM01000000").format(new Date(timestamp)), "yyyyMMddHHmmss")) val numsOfMon: Long = calendar.getActualMaximum(Calendar.DAY_OF_MONTH) calendar.set(Calendar.DAY_OF_MONTH, 1) val start: Long = calendar.getTimeInMillis val end: Long = start + numsOfMon * 24 * 60 * 60 * 1000 - 1 (start, end) } /** * 獲取指定時間戳本週時間範圍(從週日開始) * eg:2020-03-14 23:59:59 (timestamp=1583895064000l) * 結果為:(1583596800000,1584201599999)=>(2020-03-08 00:00:00,2020-03-14 23:59:59) * * @param timestamp 時間戳 * @return */ def getTimestampFromWeek(timestamp: Long): (Long, Long) = { val calendar = Calendar.getInstance() calendar.setTime(DateHelper.getInstance().getDateFromStr(new SimpleDateFormat("yyyyMMdd000000").format(new Date(timestamp)), "yyyyMMddHHmmss")) // calendar.setFirstDayOfWeek(Calendar.SUNDAY)//設定週日為首日 預設值,一般不用設定 calendar.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY) val start: Long = calendar.getTimeInMillis (start, start + 7 * 24 * 60 * 60 * 1000l - 1) } }


//輸入資料
case class Top100Input(event_id: String, date_d: String, timeStamp: Long, uid: Long, weekTag: String, monthTag: String)
//呼叫
val dStream: DataStream[Top100Input] = ...

dStream
      .keyBy(_.weekTag)
      .window(new CustomWindowAssigner[Top100Input]("week"))

dStream
      .keyBy(_.monthTag)
      .window(new CustomWindowAssigner[Top100Input]("month"))