1. 程式人生 > 實用技巧 >【原始碼】Flink 運算元 chain 在一起的條件

【原始碼】Flink 運算元 chain 在一起的條件

Flink 任務的圖結構中,部分運算元是 chain 在一起的,因為 chain 在一起有很多好處(減少序列化和網路開銷,提高效率),而運算元 chain 在一起是需要條件的

Flink 任務在生成 JobGraph 的時候,會加入 chain 的概念,會判斷運算元能否 chain 在一起

首先在 env 中不能 禁用 運算元chain

env.disableOperatorChaining()

其次chain 在一起的運算元,還需要滿足如下 7 個條件:

1、下游節點只有一個輸入邊
2、上下游運算元在同一個 slotSharingGroup
3、上下游運算元可以 chain 在一起
4、上下游運算元資料分發策略是 ForwardPartitioner
5、上下游運算元資料 shuffleMode 不是 BATCH(是: PIPELINED,UNDEFINED)
6、上下游運算元併發度一樣
7、StreamGraph 允許 運算元 chain 在一起 (env 允許)

// 檢測上下游是否能 chain 在一起
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
  StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
  StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

  // 下游節點輸入邊為 1
  return downStreamVertex.getInEdges().size() == 1
    //
上下游是同一個 sharingGroup && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // 上下游運算元策略能 chain 在一起 && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) // 上下游邊的分割槽策略是 ForwardPartitioner && (edge.getPartitioner() instanceof ForwardPartitioner)
// shuffleMode 不是 batch && edge.getShuffleMode() != ShuffleMode.BATCH // 上下游併發度一樣 && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // streamGraph 是可以 chain && streamGraph.isChainingEnabled(); } @VisibleForTesting static boolean areOperatorsChainable( StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) { StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory(); StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory(); // 上下游運算元 有工廠類 if (downStreamOperator == null || upStreamOperator == null) { return false; } // 上游的策略是 NEVER , 下游的策略不是 ALWAYS if (upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER || downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) { return false; } // yielding operators cannot be chained to legacy sources if (downStreamOperator instanceof YieldingOperatorFactory) { // unfortunately the information that vertices have been chained is not preserved at this point return !getHeadOperator(upStreamVertex, streamGraph).isStreamSource(); } // 其他 都是可以 chain 的 return true; }

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文