1. 程式人生 > 實用技巧 >Flink例項(二十):自定義時間和視窗的操作符(一)KeyedProcessFunction(一)

Flink例項(二十):自定義時間和視窗的操作符(一)KeyedProcessFunction(一)

前言

  在Flink中比如某些運算元(join,coGroup,keyBy,groupBy)要求在資料元上定義key。另外有些運算元操作,例如reduce,groupReduce,Aggregate,Windows需要資料在處理之前根據key進行分組。

在Flink中資料模型不是基於Key,Value格式處理的,因此不需將資料處理成鍵值對的格式,key是“虛擬的”,可以人為的來指定,實際資料處理過程中根據指定的key來對資料進行分組,DataSet中使用groupBy來指定key,DataStream中使用keyBy來指定key。那麼如何指定keys呢?

一.使用Tuples來指定key

  定義元組來指定key可以指定tuple中的第幾個元素當做key,或者指定tuple中的聯合元素當做key。需要使用org.apache.flink.api.java.tuple.TupleXX

包下的tuple,最多支援25個元素且Tuple必須new建立。

  如果Tuple是巢狀的格式,例如:DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds,如果指定keyBy(0)則會使用內部的整個Tuple2作為key。如果想要使用內部Tuple2中的Float格式當做key,可以使用keyBy("f0.f1")這樣的形式指定。

  這裡需要注意,在Flink的Tuple中指定的key的下標從0開始算起,這裡不像Scala中的Tuple從1開始算起,同時一般需要指定key的函式中都可以有兩種寫法,一種是直接寫數字0,1,2等等,還有一種是寫字串的形式前面的0,1,2對應的字串的表達形式為f1,f2,f3。

  如果需要指定多個欄位當做聯合的Key,可以寫成keyBy(0,1),如果寫成字串形式在字串中指定多個key,還可以寫成keyBy("f0","f1")的形式。

二.使用Field Expression來指定key

可以使用Field Expression來指定key,一般作用的物件可以是類物件,或者巢狀的Tuple格式的資料。

對於這種形式的使用,注意點如下:

1.對於類物件可以使用類中的欄位來指定key,類物件定義需要注意:

  • 類的訪問級別必須是public
  • 必須寫出預設的空的建構函式
  • 類中所有的欄位必須是public的或者必須有getter,setter方法。
  • Flink必須支援欄位的型別。

2.對於巢狀的Tuple型別的Tuple資料可以使用"xx.f0"表示巢狀tuple中第一個元素,也可以直接使用”xx.0”來表示第一個元素。

三.使用Key Selector Functions來指定key

使用key Selector這種方式選擇key,非常方便,可以從資料型別中指定想要的key.

KeyedStream<String, String> keyBy = socketText.keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String line) throws Exception {
                return line.split("\t")[2];
            }
        });

作者:叫我不矜持
連結:https://www.jianshu.com/p/faaa059453fb
來源:簡書
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。