Flink Job Graph

延續上篇Stream Graph產生,此篇說明Job Graph生成的過程與目的
透過下方的連續調用會到Job Graph的生成主要邏輯代碼StreamingJobGraphGenerator的createJobGraph()@org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@PublicEvolving
public class StreamContextEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
StreamGraph streamGraph = this.getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
// execute the programs
if (ctx instanceof DetachedEnvironment) {
// ...
} else {
return ctx
.getClient()
.run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
.getJobExecutionResult();
}
}
}
public abstract class ClusterClient<T> {
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
throws ProgramInvocationException {
JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
return submitJob(job, classLoader);
}
public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
JobGraph job;
if (optPlan instanceof StreamingPlan) {
job = ((StreamingPlan) optPlan).getJobGraph();
job.setSavepointRestoreSettings(savepointSettings);
} else {
// ...
}
// ...
return job;
}
}

@org/apache/flink/streaming/api/graph/StreamGraph.java

1
2
3
4
5
6
7
8
9
10
@Internal
public class StreamGraph extends StreamingPlan {
@SuppressWarnings("deprecation")
@Override
public JobGraph getJobGraph(@Nullable JobID jobID) {
// ...
return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}
}

@org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Internal
public class StreamingJobGraphGenerator {
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}
private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
// ...
jobGraph = new JobGraph(jobID, streamGraph.getJobName());
}
private JobGraph createJobGraph() {
// make sure that all vertices start immediately
jobGraph.setScheduleMode(ScheduleMode.EAGER);
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
setSlotSharingAndCoLocation();
configureCheckpointing();
JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
}

@org/apache/flink/runtime/jobgraph/JobGraph.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class JobGraph implements Serializable {
/**
* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed),
* the given name and the given execution configuration (see {@link ExecutionConfig}).
* The ExecutionConfig will be serialized and can't be modified afterwards.
*
* @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
* @param jobName The name of the job.
*/
public JobGraph(JobID jobId, String jobName) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
try {
setExecutionConfig(new ExecutionConfig());
} catch (IOException e) {
// this should never happen, since an empty execution config is always serializable
throw new RuntimeException("bug, empty execution config is not serializable");
}
}
}

createJobGraph

  • defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(…)

    • defaultStreamGraphHasher為StreamGraphHasherV2@org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
    • 目的
      • 是產生每個StreamNode擁有唯一Hash Id
      • 擁有相同邏輯的Stream Graph,會得到相同的Hashed Stream Graph
    • 方法
      • 使用Source作為起點,廣度優先遍歷傳入的streamGraph,逐一產生每個StreamNode的Hash Id
      • 若使用者有指定UID,則使用UID作為產生Hash Id的入參,否則使用StreamNode相關訊息作為產生Hash Id的入參
        • 無法直接使用StreamNode id來作為Hash Id的入參是因為其是一個靜態遞增變量,同樣的Stream Graph可能會得到不同的Hash Id輸出
  • setChaining

    • 目的
      • 生成JobVertex,JobEdge,並盡量將多個StreamNode Chain在一起
      • 將能Chain的兩個或多個節點合成一個節點,減少傳輸量,與序列化/反序列化的過程
    • 方法
      • 使用Source作為起點,廣度優先遍歷傳入的streamGraph,將盡量多的StreamNode chain在一起,並產生JobVertex
      • 根據isChaninable判斷是否能Chain
      • 將StreamNode中的config配置到StreamConfig中,其中包括序列化器、StreamOperator、Checkpoint等相關配置,準備發送給JobManager和 TaskManager
  • setPhysicalEdges
    • 目的
      • 將每個JobVertex的入邊集合序列化到對應JobVertex的StreamConfig中
  • setSlotSharing
    • 目的
      • 根據group name,為每個JobVertex指定所屬的SlotSharingGroup
      • 針對Iteration的頭尾設置CoLocationGroup
      • 以及针对 Iteration的头尾设置 CoLocationGroup
  • configureCheckpointing(
    • 目的
      • 配置checkpoint
  • configureRestartStrategy()
    • 目的
      • 配置重啟策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class StreamGraphHasherV2 implements StreamGraphHasher {
/**
* Returns a map with a hash for each {@link StreamNode} of the {@link
* StreamGraph}. The hash is used as the {@link JobVertexID} in order to
* identify nodes across job submissions if they didn't change.
*
* <p>The complete {@link StreamGraph} is traversed. The hash is either
* computed from the transformation's user-specified id (see
* {@link StreamTransformation#getUid()}) or generated in a deterministic way.
*
* <p>The generated hash is deterministic with respect to:
* <ul>
* <li>node-local properties (node ID),
* <li>chained output nodes, and
* <li>input nodes hashes
* </ul>
*
* @return A map from {@link StreamNode#id} to hash as 16-byte array.
*/
@Override
public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
// The hash function used to generate the hash
final HashFunction hashFunction = Hashing.murmur3_128(0);
final Map<Integer, byte[]> hashes = new HashMap<>();
Set<Integer> visited = new HashSet<>();
Queue<StreamNode> remaining = new ArrayDeque<>();
// We need to make the source order deterministic. The source IDs are
// not returned in the same order, which means that submitting the same
// program twice might result in different traversal, which breaks the
// deterministic hash assignment.
List<Integer> sources = new ArrayList<>();
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
sources.add(sourceNodeId);
}
Collections.sort(sources);
//
// Traverse the graph in a breadth-first manner. Keep in mind that
// the graph is not a tree and multiple paths to nodes can exist.
//
// Start with source nodes
for (Integer sourceNodeId : sources) {
remaining.add(streamGraph.getStreamNode(sourceNodeId));
visited.add(sourceNodeId);
}
StreamNode currentNode;
while ((currentNode = remaining.poll()) != null) {
// Generate the hash code. Because multiple path exist to each
// node, we might not have all required inputs available to
// generate the hash code.
if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled(), streamGraph)) {
// Add the child nodes
for (StreamEdge outEdge : currentNode.getOutEdges()) {
StreamNode child = streamGraph.getTargetVertex(outEdge);
if (!visited.contains(child.getId())) {
remaining.add(child);
visited.add(child.getId());
}
}
} else {
// We will revisit this later.
visited.remove(currentNode.getId());
}
}
return hashes;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
@Internal
public class StreamingJobGraphGenerator {
/**
* Sets up task chains from the source {@link StreamNode} instances.
*
* <p>This will recursively create all {@link JobVertex} instances.
*/
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);
if (chainedConfs == null) {
chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
}
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(new OperatorID(primaryHashBytes));
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
}

Summary

  • Stream Graph是邏輯是的DAG圖,不需要關心JobManager如何調度每個Operator,Job Graph是對Stream Graph進行切分與合併,因為有些Stream Node是可以被Chain在一起,並且一起被JobManager安排調度的,換句話說,Job Graph的DAG內的每一個頂點就是JobManger的一個調度單位
  • 一個JobVertex包含一個或多個Operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet