Basic information
- Flink version: 1.8
Flink Stream Graph
此篇依序說明從使用者的Flink job如何生成Stream Graph。
首先以SocketWindowWordCount示例代表使用者開發的Flink job
|
|
Get StreamExecutionEnvironment and Get Data Source
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 進入env instanceof ContextEnvironment分支,並取得StreamExecutionEnvironment object
- env.socketTextStream(…);
- 調用StreamExecutionEnvironment @org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
- 在StreamExecutionEnvironment內部連續調用後,會來到socketTextStream(String hostname, int port, String delimiter, long maxRetry)
- new SocketTextStreamFunction(…)
- SocketTextStreamFunction class @org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
- addSource(…)
- 最終調用addSource(SourceFunction
function, String sourceName, TypeInformation typeInfo) - 回傳DataStreamSource<> object @org/apache/flink/streaming/api/datastream/DataStreamSource.java
- 最終調用addSource(SourceFunction
- new SocketTextStreamFunction(…)
|
|
|
|
|
|
FlatMapFunction and Add Transform
- text.flatMap(…)
- 調用DataStreamSource的父類SingleOutputStreamOperator
類的父類DataStream @org/apache/flink/streaming/api/datastream/DataStream.java - transform(…)
- OneInputTransformation
resultTransform = new OneInputTransformation<>(…)
OneInputTransformation object @org/apache/flink/streaming/api/transformations/OneInputTransformation.java - SingleOutputStreamOperator
returnStream = new SingleOutputStreamOperator(environment, resultTransform); - SingleOutputStreamOperator object @org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
- getExecutionEnvironment().addOperator(resultTransform);
- getExecutionEnvironment()
- 取得在初始化時設定的environment,實際上為StreamExecutionEnvironment object
- addOperator(…)
- 調用StreamExecutionEnvironment的addOperator(…)
- 將入參transformation加入到transformations class變量中
- transformations是List< StreamTransformation<?>>
- 調用StreamExecutionEnvironment的addOperator(…)
- getExecutionEnvironment()
- OneInputTransformation
- transform(…)
- 調用DataStreamSource的父類SingleOutputStreamOperator
|
|
|
|
Env Exceute and Gerent Stram Graph
- execute(…)
- 調用StreamContextEnvironment的execute(…)
- this.getStreamGraph();
- 調用StreamContextEnvironment父類StreamExecutionEnvironment的getStreamGraph()
- transformations及為之前的Class變量
- StreamGraphGenerator.generate(…)
- 在StreamGraphGenerator內部連續調用後,會遍歷StreamExecutionEnvironment的List< StreamTransformation<?>>,將StreamTransformation作為入參傳入transform(StreamTransformatio<?> transform)
- transform(…)
- 最終都會調用與StreamTransformation對應的transformXXX方法,例如transformOnInputTransform
- 內部會遞迴的調用transform(…),逐一加上StreamEdge、StreamNode
- 並非每個StreamTransformation都會在StreamGraph新增StreamEdge與StreamNode,邏輯上的StreamTransformation則不添加,而是將資訊合併到其他StreamTransformation的StreamEdge中;例如PartitionTransformation
- StreamNode
- 儲存了UDF(User-Defined Funtion)、輸入輸出的序列化方法、所有輸入輸出的StreamEdge對象
- StreamEdge
- 儲存上下的StreamNode訊息
- transform(…)
- 在StreamGraphGenerator內部連續調用後,會遍歷StreamExecutionEnvironment的List< StreamTransformation<?>>,將StreamTransformation作為入參傳入transform(StreamTransformatio<?> transform)
- this.getStreamGraph();
- 調用StreamContextEnvironment的execute(…)
|
|
|
|
|
|
Transformation(StreamTransformation), DataStream and StreamGraph
- DataStream是較常接觸到的概念,其代表同一種類型元素構成的數據流
- 在構建DataStream或其子類時,需要傳入StreamTransformation object
- StreamTransformation代表從一個或多個DataStream生成新的DataStream的操作;換言之,DataStream的Transformation,封裝了StreamTransformation
- 在使用者調用DataStream的Transformation時,可以傳入UDF,而StreamOperator是運行時調用UDF的具體實現
- 使用者DataStream上通過Transformation,會將StreamTransformation加入到StreamExecutionEnvironment的List< StreamTransformation<?>>中,並且在執行前,轉換成StreamGraph
Example
- DataStream
#flatMap(FlatMapFunction flatMapper) - Transformation: flatMap
- StreamOperator: StreamFlatMap<> @org/apache/flink/streaming/api/operators/StreamFlatMap.java
- UDF: FlatMapFunction
@org/apache/flink/api/common/functions/FlatMapFunction.java - StreamTransformation: OneInputTransformation
@org/apache/flink/streaming/api/transformations/OneInputTransformation.java - 使用getExecutionEnvironment().addOperator(resultTransform),加入到StreamExecutionEnvironment的List< StreamTransformation<?>>中
- Return DataStream: SingleOutputStreamOperator
@org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java - 構建時,使用OneInputTransformation
作為參數
- 構建時,使用OneInputTransformation
|
|
|
|
|
|
|
|
|
|