Flink Stream Graph

Basic information

  • Flink version: 1.8

此篇依序說明從使用者的Flink job如何生成Stream Graph。
首先以SocketWindowWordCount示例代表使用者開發的Flink job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// ...
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// ...
env.execute("Socket Window WordCount");
}

Get StreamExecutionEnvironment and Get Data Source

  • final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • 進入env instanceof ContextEnvironment分支,並取得StreamExecutionEnvironment object
  • env.socketTextStream(…);
    • 調用StreamExecutionEnvironment @org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
    • 在StreamExecutionEnvironment內部連續調用後,會來到socketTextStream(String hostname, int port, String delimiter, long maxRetry)
      • new SocketTextStreamFunction(…)
        • SocketTextStreamFunction class @org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
      • addSource(…)
        • 最終調用addSource(SourceFunction function, String sourceName, TypeInformation typeInfo)
        • 回傳DataStreamSource<> object @org/apache/flink/streaming/api/datastream/DataStreamSource.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Public
public abstract class StreamExecutionEnvironment {
@PublicEvolving
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
}
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
return addSource(function, sourceName, null);
}
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
if (typeInfo == null) {
if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
}
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
StreamSource<OUT, ?> sourceOperator;
if (function instanceof StoppableFunction) {
sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
} else {
sourceOperator = new StreamSource<>(function);
}
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@PublicEvolving
public class SocketTextStreamFunction implements SourceFunction<String> {
public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries) {
this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
}
public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries, long delayBetweenRetries) {
checkArgument(port > 0 && port < 65536, "port is out of range");
checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
this.hostname = checkNotNull(hostname, "hostname must not be null");
this.port = port;
this.delimiter = delimiter;
this.maxNumRetries = maxNumRetries;
this.delayBetweenRetries = delayBetweenRetries;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Public
public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
public DataStreamSource(StreamExecutionEnvironment environment,
TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
boolean isParallel, String sourceName) {
super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
this.isParallel = isParallel;
if (!isParallel) {
setParallelism(1);
}
}
}

FlatMapFunction and Add Transform

  • text.flatMap(…)
    • 調用DataStreamSource的父類SingleOutputStreamOperator類的父類DataStream @org/apache/flink/streaming/api/datastream/DataStream.java
      • transform(…)
        • OneInputTransformation resultTransform = new OneInputTransformation<>(…)
          OneInputTransformation object @org/apache/flink/streaming/api/transformations/OneInputTransformation.java
        • SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(environment, resultTransform);
          • SingleOutputStreamOperator object @org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
        • getExecutionEnvironment().addOperator(resultTransform);
          • getExecutionEnvironment()
            • 取得在初始化時設定的environment,實際上為StreamExecutionEnvironment object
          • addOperator(…)
            • 調用StreamExecutionEnvironment的addOperator(…)
              • 將入參transformation加入到transformations class變量中
              • transformations是List< StreamTransformation<?>>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Public
public class DataStream<T> {
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
}
1
2
3
4
5
6
7
8
9
10
@Public
public abstract class StreamExecutionEnvironment {
protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
@Internal
public void addOperator(StreamTransformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
}

Env Exceute and Gerent Stram Graph

  • execute(…)
    • 調用StreamContextEnvironment的execute(…)
      • this.getStreamGraph();
        • 調用StreamContextEnvironment父類StreamExecutionEnvironment的getStreamGraph()
        • transformations及為之前的Class變量
        • StreamGraphGenerator.generate(…)
          • 在StreamGraphGenerator內部連續調用後,會遍歷StreamExecutionEnvironment的List< StreamTransformation<?>>,將StreamTransformation作為入參傳入transform(StreamTransformatio<?> transform)
            • transform(…)
              • 最終都會調用與StreamTransformation對應的transformXXX方法,例如transformOnInputTransform
              • 內部會遞迴的調用transform(…),逐一加上StreamEdge、StreamNode
                • 並非每個StreamTransformation都會在StreamGraph新增StreamEdge與StreamNode,邏輯上的StreamTransformation則不添加,而是將資訊合併到其他StreamTransformation的StreamEdge中;例如PartitionTransformation
              • StreamNode
                • 儲存了UDF(User-Defined Funtion)、輸入輸出的序列化方法、所有輸入輸出的StreamEdge對象
              • StreamEdge
                • 儲存上下的StreamNode訊息
1
2
3
4
5
6
7
8
9
10
11
12
@PublicEvolving
public class StreamContextEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
StreamGraph streamGraph = this.getStreamGraph();
streamGraph.setJobName(jobName);
// ...
}
}
1
2
3
4
5
6
7
8
9
10
@Public
public abstract class StreamExecutionEnvironment {
@Internal
public StreamGraph getStreamGraph() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return StreamGraphGenerator.generate(this, transformations);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@Internal
public class StreamGraphGenerator {
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
return new StreamGraphGenerator(env).generateInternal(transformations);
}
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
transform(transformation);
}
return streamGraph;
}
private Collection<Integer> transform(StreamTransformation<?> transform) {
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
}

Transformation(StreamTransformation), DataStream and StreamGraph

  • DataStream是較常接觸到的概念,其代表同一種類型元素構成的數據流
  • 在構建DataStream或其子類時,需要傳入StreamTransformation object
    • StreamTransformation代表從一個或多個DataStream生成新的DataStream的操作;換言之,DataStream的Transformation,封裝了StreamTransformation
    • 在使用者調用DataStream的Transformation時,可以傳入UDF,而StreamOperator是運行時調用UDF的具體實現
  • 使用者DataStream上通過Transformation,會將StreamTransformation加入到StreamExecutionEnvironment的List< StreamTransformation<?>>中,並且在執行前,轉換成StreamGraph

Example

  • DataStream#flatMap(FlatMapFunction flatMapper)
    • Transformation: flatMap
    • StreamOperator: StreamFlatMap<> @org/apache/flink/streaming/api/operators/StreamFlatMap.java
    • UDF: FlatMapFunction @org/apache/flink/api/common/functions/FlatMapFunction.java
    • StreamTransformation: OneInputTransformation @org/apache/flink/streaming/api/transformations/OneInputTransformation.java
      • 使用getExecutionEnvironment().addOperator(resultTransform),加入到StreamExecutionEnvironment的List< StreamTransformation<?>>中
    • Return DataStream: SingleOutputStreamOperator @org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
      • 構建時,使用OneInputTransformation作為參數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Public
public class DataStream<T> {
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
}
1
2
3
4
5
6
@Internal
public class StreamFlatMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
// ...
}
1
2
3
4
5
@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
// ...
}
1
2
3
4
@Internal
public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
// ...
}
1
2
3
4
@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
// ...
}