1. 程式人生 > 實用技巧 >Flink例項(六十七):自定義時間和視窗的操作符(十二)Flink事件時間何時觸發視窗計算

Flink例項(六十七):自定義時間和視窗的操作符(十二)Flink事件時間何時觸發視窗計算

目錄


思考:
    什麼時候才會觸發視窗計算?
    既然使用的是事件時間那麼必然會涉及到水位線(water_mark),水位線在其中扮演的角色是什麼?
    此時我們帶著疑問,一步一步的探究
注意:
1、本篇部落格中的所有解釋都是在滾動視窗的前提下
2、瀏覽本部落格前觀看本欄另外一篇部落格“Flink時間概念與水位線”尤佳。

Q:為什麼要在滾動視窗的前提下進行解釋?
A:因為滾動視窗相比較滑動和會話來說更容易讓大家理解,在本篇部落格中著重的是討論水位線在視窗觸發下的場景,因此當然採用大家較容易理解的視窗來為大家解釋。

Q:那我要是想了解水位線在其他視窗下的場景呢?
A:在本欄的其他部落格有詳細介紹。

1.1 瀏覽本部落格前你需要了解的知識點

    flink內部是如何劃分視窗的?

    首先Windows的時間範圍是一個自然時間範圍,比如你定義了一個視窗:timeWindow(Time.seconds(3));那麼其windows會將視窗中的事件按照3S進行劃分(左閉右開)

[10:11:00,10:11:03)
[10:11:03,10:11:06)
… …
[10:11:21,10:11:24)
… …


    當一個Event Time = 10:11:22的記錄到來時就會生成如下視窗,此時這條訊息就存放在這個視窗中;

[10:11:21,10:11:24]

    觸發的條件?

    a、water_mark時間 >= window_end_time只是第一個條件
    b、在[window_start_time,window_end_time)區間中還需要有資料存在,如果沒有資料同樣是不會觸發的。

    何時第一次觸發?

    當water_marker >= windows_end_time視窗結束時間,就會觸發視窗操作。
(最新的water_marker時間戳會在過去的windows_end_time視窗結束時間中逐一進行比較,如果發現有 >= 的情況就會觸發視窗操作)

1.2 示例:觸發視窗計算

示例
最大亂序事件10秒,視窗時間3秒

    第一次觸發計算

輸入資料:
1,1538359882000
2,1538359886000
3,1538359892000
4,1538359893000
5,1538359894000

    定義下方輸出資料中的water_mark

  val watermark: DataStream[(String, Long, String, Int)] = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long, String, Int)] {
    val maxOutOfOrderness = 10000L //最大允許的亂序時間是10s
    var currentMaxTimestamp = 0L
    var a: Watermark = _

    override def getCurrentWatermark: Watermark = {
      a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
      a
    }


    override def extractTimestamp(element: (String, Long, String, Int), previousElementTimestamp: Long): Long = {
      val timestamp = element._2
      currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
      val end = if (!a.toString.contains("-")) {
        val regEx = "[^0-9]";
        val p = Pattern.compile(regEx);
        val m = p.matcher(a.toString);
        val L_number = m.replaceAll("").trim()
        format.format(L_number.toLong)
      } else a.toString
      println("timestamp:" + element._1 + "," + element._2 + "," + element._3 + "|" +
        s"${a.toString}($end)" + "|" + format.format(currentMaxTimestamp - maxOutOfOrderness))
      val lll: Long = System.currentTimeMillis()

      timestamp
    }
  })

 當我們依次輸入資料的時候,在輸入完5,1538359894000這條資料後,最新的water_marker是:10:11:23。由於該條記錄的水位線為:10:11:24 > 10:11:23,因此水位線會進行更新,變成10:11:24,而10:11:24 >= window end time(10:11:24),所以此時就會觸發計算操作。

    此時的觸發操作計算的是event_time(事件時間)在[10:11:21,10:11:24)視窗之間的資料(也就是隻會計算 1,1538359882000 這一條資料)

輸出資料為:
當前資料 | 當前資料的水位線 , 計算並更新水位線
timestamp:1,1538359882000,10:11:22|Watermark@-10000(Watermark@-0000),10:11:12
timestamp:2,1538359886000,10:11:26|Watermark@1538359872000(10:11:12),10:11:16
timestamp:3,1538359892000,10:11:32|Watermark@1538359876000(10:11:16),10:11:22
timestamp:4,1538359893000,10:11:33|Watermark@1538359882000(10:11:22), 10:11:23
timestamp:5,1538359894000,10:11:34|Watermark@1538359883000(10:11:23), 10:11:24
第一次觸發計算: (1,1538359882000, 10:11:22,1)

    何時第二次觸發計算

輸入資料:
1,1538359882000
2,1538359886000
3,1538359892000
4,1538359893000
5,1538359894000
6,1538359896000
7,1538359897000

輸出資料為:
timestamp:1,1538359882000,10:11:22|Watermark@-10000(Watermark@-0000),10:11:12
timestamp:2,1538359886000,10:11:26|Watermark@1538359872000(10:11:12),10:11:16
timestamp:3,1538359892000,10:11:32|Watermark@1538359876000(10:11:16),10:11:22
timestamp:4,1538359893000,10:11:33|Watermark@1538359882000(10:11:22), 10:11:23
timestamp:5,1538359894000,10:11:34|Watermark@1538359883000(10:11:23), 10:11:24
第一次觸發計算:> (1,1538359882000, 10:11:22,1)
timestamp:6,1538359896000,10:11:36|Watermark@1538359884000(10:11:24),10:11:26
timestamp:7,1538359897000,10:11:37|Watermark@1538359886000(10:11:26), 10:11:27
第二次觸發計算:> (2,1538359886000, 10:11:26,1)

 第一次的視窗[10:11:21,10:11:24)已經被使用了,那麼程式下次觸發的視窗則是[10:11:24,10:11:27),因此我輸入的7,1538359897000這條資料,而該條資料中的water_mark則會更新成10:11:27而10:11:27 >= window_end_time(10:11:27),因此會觸發操作。
    (此時計算的是[10:11:24,10:11:27)視窗內的資料,在上面的輸入資料中只有2,1538359886000這條資料屬於這個視窗,因此計算的是該條資料)

    何時觸發多個視窗計算?

輸入資料:
1,1538359882000
2,1538359886000
3,1538359892000
4,1538359893000
5,1538359894000
6,1538359907000
輸出資料:
timestamp:1,1538359882000,10:11:22|Watermark@-10000(Watermark@-0000),10:11:12
timestamp:2,1538359886000,10:11:26|Watermark@1538359872000(10:11:12),10:11:16
timestamp:3,1538359892000,10:11:32|Watermark@1538359876000(10:11:16),10:11:22
timestamp:4,1538359893000,10:11:33|Watermark@1538359882000(10:11:22), 10:11:23
timestamp:5,1538359894000,10:11:34|Watermark@1538359883000(10:11:23), 10:11:24
第一次觸發計算:> (1,1538359882000, 10:11:22,1)
timestamp:6,1538359907000,10:11:47|Watermark@1538359884000(10:11:24),10:11:37
觸發計算:> (2,1538359886000, 10:11:26,1)
觸發計算:> (3,1538359892000, 10:11:32,1)
觸發計算:> (4,1538359893000, 10:11:33,1)
觸發計算:> (5,1538359894000, 10:11:34,1)

視窗[10:11:21,10:11:24)之間的資料已經在第一次進行了觸發 後面的 6,1538359907000 資料之所以會導致視窗操作是因為:10:11:37 >= 視窗時間中的windows_end_Time [10:11:34,10:11:37);
   而此時在[10:11:34,10:11:37)這個視窗之上還有

                      [10:11:21,10:11:24)已經在第一次進行觸發了
[10:11:24,10:11:27)
[10:11:27,10:11:31)
[10:11:31,10:11:34)
[10:11:34,10:11:37)
… ….


   由於timestamp 2,3,4,5的event_time 分別在這些視窗之間,因此輸出的話就會在這次觸發操作中全部進行輸出了;計算的是每個視窗內的資料。