延續上篇Stream Graph產生,此篇說明Job Graph生成的過程與目的
透過下方的連續調用會到Job Graph的生成主要邏輯代碼StreamingJobGraphGenerator的createJobGraph()@org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
|
|
@org/apache/flink/streaming/api/graph/StreamGraph.java
@org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@org/apache/flink/runtime/jobgraph/JobGraph.java
createJobGraph
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(…)
- defaultStreamGraphHasher為StreamGraphHasherV2@org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
- 目的
- 是產生每個StreamNode擁有唯一Hash Id
- 擁有相同邏輯的Stream Graph,會得到相同的Hashed Stream Graph
- 方法
- 使用Source作為起點,廣度優先遍歷傳入的streamGraph,逐一產生每個StreamNode的Hash Id
- 若使用者有指定UID,則使用UID作為產生Hash Id的入參,否則使用StreamNode相關訊息作為產生Hash Id的入參
- 無法直接使用StreamNode id來作為Hash Id的入參是因為其是一個靜態遞增變量,同樣的Stream Graph可能會得到不同的Hash Id輸出
setChaining
- 目的
- 生成JobVertex,JobEdge,並盡量將多個StreamNode Chain在一起
- 將能Chain的兩個或多個節點合成一個節點,減少傳輸量,與序列化/反序列化的過程
- 方法
- 使用Source作為起點,廣度優先遍歷傳入的streamGraph,將盡量多的StreamNode chain在一起,並產生JobVertex
- 根據isChaninable判斷是否能Chain
- 將StreamNode中的config配置到StreamConfig中,其中包括序列化器、StreamOperator、Checkpoint等相關配置,準備發送給JobManager和 TaskManager
- 目的
- setPhysicalEdges
- 目的
- 將每個JobVertex的入邊集合序列化到對應JobVertex的StreamConfig中
- 目的
- setSlotSharing
- 目的
- 根據group name,為每個JobVertex指定所屬的SlotSharingGroup
- 針對Iteration的頭尾設置CoLocationGroup
- 以及针对 Iteration的头尾设置 CoLocationGroup
- 目的
- configureCheckpointing(
- 目的
- 配置checkpoint
- 目的
- configureRestartStrategy()
- 目的
- 配置重啟策略
- 目的
|
|
|
|
Summary
- Stream Graph是邏輯是的DAG圖,不需要關心JobManager如何調度每個Operator,Job Graph是對Stream Graph進行切分與合併,因為有些Stream Node是可以被Chain在一起,並且一起被JobManager安排調度的,換句話說,Job Graph的DAG內的每一個頂點就是JobManger的一個調度單位
- 一個JobVertex包含一個或多個Operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet