1. 程式人生 > 實用技巧 >【原始碼】Flink 三層圖結構——StreamGraph 生成前準備 Transformation

【原始碼】Flink 三層圖結構——StreamGraph 生成前準備 Transformation

最近一直在看 StreamGraph 生成的原始碼,剛好有點思路,準備動手了發現,
如果不說下 Transformation 後面的 StreamGraph 會差比較多意思,
所以先做點鋪墊。

## Transformation

Transformation 類是 Flink 轉換運算元的基類,實現類有下面這些

AbstractMultipleInputTransformation
CoFeedbackTransformation
FeedbackTransformation
KeyedMultipleInputTransformation
LegacySourceTransformation
MultipleInputTransformation
OneInputTransformation
PartitionTransformation
PhysicalTransformation
SelectTransformation
SideOutputTransformation
SinkTransformation
SourceTransformation
SplitTransformation
TwoInputTransformation
UnionTransformation

類圖:

從這些 Transformation 中也可以看出Flink 支援的轉換型別: Source、Sink、一個輸入、兩個輸入、多個輸入、Union、側輸出、Select、分割槽 等轉換操作

## source Transformation 的起始

env.addSource(new SimpleStringSource)

呼叫 StreamExecutionEnvironment.scala 的 addSource 方法

def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
    require(function 
!= null, "Function must not be null.") val cleanFun = scalaClean(function) val typeInfo = implicitly[TypeInformation[T]] asScalaStream(javaEnv.addSource(cleanFun, typeInfo)) }

然後呼叫 javaEnv.addSource 方法

StreamExecutionEnvironment.java

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo) {
    
return addSource(function, "Custom Source", typeInfo); } public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) { TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo); boolean isParallel = function instanceof ParallelSourceFunction; clean(function); // 建立 StreamSource final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); // 使用 StreamSource 建立 DataStreamSource 同時建立 Source 的Transformation 了, this 指 env return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName); }

DataStreamSource.java 使用輸入的 sourceName, operator, outTypeInfo, Parallelism 建立 LegacySourceTransformation

public DataStreamSource(
      StreamExecutionEnvironment environment,
      TypeInformation<T> outTypeInfo,
      StreamSource<T, ?> operator,
      boolean isParallel,
      String sourceName) {
    super(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

    this.isParallel = isParallel;
    if (!isParallel) {
      setParallelism(1);
    }
  }

最終呼叫到 DataStream.java 的 DataStream 方法,將生成的 LegacySourceTransformation 放入到 DataStream 中

public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) {
    this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
    this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
  }

addSource 返回一個 DataStreamSource ,transformation 是 LegacySourceTransformation,並攜帶 StreamExecutionEnvironment 物件,繼續後面運算元的呼叫

## map 運算元看 Transformation

stream
  .map(str => str)

程式碼執行到 map 這一行時,會呼叫到 DataStream.scala 的 map 方法

def map[R: TypeInformation](fun: T => R): DataStream[R] = {
    if (fun == null) {
      throw new NullPointerException("Map function must not be null.")
    }
    val cleanFun = clean(fun)
    val mapper = new MapFunction[T, R] {
      def map(in: T): R = cleanFun(in)
    }
    // 又呼叫 map
    map(mapper)
  }

def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R] = {
    if (mapper == null) {
      throw new NullPointerException("Map function must not be null.")
    }

    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
    // stream.map 呼叫到 DataStream.java 中了
    asScalaStream(stream.map(mapper, outType).asInstanceOf[JavaStream[R]])
  } 

注: Flink 主要功能還是在 Java 程式碼中, Scala Api 就像個外殼,用 Scala 包裝了一下,方便 Scala 程式碼呼叫,實際上還是會呼叫到 Java 程式碼上去

DataStream.java 的 map 方法

這裡呼叫 transform 方法,要構建 Transformation 了,對於這個測試的寫法來說, outputType 是 "String", Transformation 名是 "Map"

SimpleOperatorFactory.of(operator)) 獲取的工廠類是: SimpleUdfStreamOperatorFactory str => str 就是 Udf

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
    // 可以看到 transform ,這裡的 outputType 是 String 了
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
  }

public <R> SingleOutputStreamOperator<R> transform(
      String operatorName,
      TypeInformation<R> outTypeInfo,
      OneInputStreamOperator<T, R> operator) {

    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
  }

DataStream.java 的 doTransform 方法建立 map 運算元對於的 OneInputTransformation, 同時建立一個新的 DataStream: SingleOutputStreamOperator

// 真正建立 Transformation
  protected <R> SingleOutputStreamOperator<R> doTransform(
      String operatorName,
      TypeInformation<R> outTypeInfo,
      StreamOperatorFactory<R> operatorFactory) {

    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    // 檢驗和設定 transformation 輸出型別
    transformation.getOutputType();

    // 建立 一個輸入的 Transformation, this.transformation 上一運算元的 Transformation 做為 當前運算元的 輸入 Transformation
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
        this.transformation,
        operatorName,
        operatorFactory,
        outTypeInfo,
        environment.getParallelism());

    @SuppressWarnings({"unchecked", "rawtypes"})
      // 建立 一個輸出的 StreamOperator 也是 DataStream, 也 攜帶 environment
    SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
    // 講 建立的 Transformation 放到 ExecutionEnvironment 的 transformations 列表中
    getExecutionEnvironment().addOperator(resultTransform);
    // 返回 SingleOutputStreamOperator
    return returnStream;
  }

所以,執行完 map 後,返回的也是一個新的 DataStream,這不像有些使用者,objectA.methodA().methodB() 每次都返回原來的 objectA

## sink Transformation 的起始

public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    // 檢驗輸出和設定輸出型別
    transformation.getOutputType();

    // configure the type if needed
    // 檢查輸入方法型別
    if (sinkFunction instanceof InputTypeConfigurable) {
      ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
    }
    // 建立一個 sinkOperator
    StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
    // 使用 sinkOperator 建立 DataStreamSink , 同是建立 SinkTransformation
    DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
    // 把 SinkTransformation 新增到 transformations
    getExecutionEnvironment().addOperator(sink.getTransformation());
    // 返回 DataStreamSink
    return sink;
  }

DataStreamSink.java 建立 DataStreamSink 的時候,用當前的 DataStream 和 StreamSink 做引數, 當前的 DataStream 做為 StreamSink 的 input Transformation

protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
    this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
  }

在建立 Sink 的 DataStream 的時候,將 前一個運算元生成的 DataStream 傳入 做為了 Sink 的 input Transformation。

## 總結

從 env.addSource.map.addSink 最簡單的 Flink 程式,可以看到 Flink 建立 StreamGraph 前的 Transformation 生成過程,其他如: flatMap、filter、union、process 基本類似,其他如 join、window、forward 也相差不大

比如:

val join = process.join(map)
      .where(str => str)
      .equalTo(str => str)
      .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
      .apply(new JoinFunction[String, String, String] {
        override def join(first: String, second: String): String = {
          first + ";" + second
        }
      })

從 apply 追下去,會 看到 在 WindowedStream.java 的 apply 方法中 呼叫了 input.transform(opName, resultType, operator) 生成了 一個輸出的 Transformation

@PublicEvolving
  public <R> SingleOutputStreamOperator<R> transform(
      String operatorName,
      TypeInformation<R> outTypeInfo,
      OneInputStreamOperator<T, R> operator) {

    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
  }

  protected <R> SingleOutputStreamOperator<R> doTransform(
      String operatorName,
      TypeInformation<R> outTypeInfo,
      StreamOperatorFactory<R> operatorFactory) {

    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    transformation.getOutputType();

    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
        this.transformation,
        operatorName,
        operatorFactory,
        outTypeInfo,
        environment.getParallelism());

    @SuppressWarnings({"unchecked", "rawtypes"})
    SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;

Transformation 就是 使用者程式碼,轉換成 Flink 運算元的結果,Transformation

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文