【原始碼】Flink 運算元 chain 在一起的條件
阿新 • • 發佈:2020-12-09
Flink 任務的圖結構中,部分運算元是 chain 在一起的,因為 chain 在一起有很多好處(減少序列化和網路開銷,提高效率),而運算元 chain 在一起是需要條件的
Flink 任務在生成 JobGraph 的時候,會加入 chain 的概念,會判斷運算元能否 chain 在一起
首先在 env 中不能 禁用 運算元chain
env.disableOperatorChaining()
其次chain 在一起的運算元,還需要滿足如下 7 個條件:
1、下游節點只有一個輸入邊
2、上下游運算元在同一個 slotSharingGroup
3、上下游運算元可以 chain 在一起
4、上下游運算元資料分發策略是 ForwardPartitioner
5、上下游運算元資料 shuffleMode 不是 BATCH(是: PIPELINED,UNDEFINED)
6、上下游運算元併發度一樣
7、StreamGraph 允許 運算元 chain 在一起 (env 允許)
// 檢測上下游是否能 chain 在一起 public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); // 下游節點輸入邊為 1 return downStreamVertex.getInEdges().size() == 1 //上下游是同一個 sharingGroup && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // 上下游運算元策略能 chain 在一起 && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) // 上下游邊的分割槽策略是 ForwardPartitioner && (edge.getPartitioner() instanceof ForwardPartitioner)// shuffleMode 不是 batch && edge.getShuffleMode() != ShuffleMode.BATCH // 上下游併發度一樣 && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // streamGraph 是可以 chain && streamGraph.isChainingEnabled(); } @VisibleForTesting static boolean areOperatorsChainable( StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) { StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory(); StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory(); // 上下游運算元 有工廠類 if (downStreamOperator == null || upStreamOperator == null) { return false; } // 上游的策略是 NEVER , 下游的策略不是 ALWAYS if (upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER || downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) { return false; } // yielding operators cannot be chained to legacy sources if (downStreamOperator instanceof YieldingOperatorFactory) { // unfortunately the information that vertices have been chained is not preserved at this point return !getHeadOperator(upStreamVertex, streamGraph).isStreamSource(); } // 其他 都是可以 chain 的 return true; }
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文