Basic information
- Flink version: 1.9
Task Lifecycle
此篇依序說明Flink從JobMaster
創建ExecutionGraph
到提交至TaskExector
的流程,以及Task
與StreamTask
的生命週期
About Objects and Concepts
- Task為Flink中分布式執行的基礎單元
- Operator的包含一個或多個
ExecutionVertex
,而ExecutionVertex
的數量與Operator的parallel相同 - Task中包含一個或多個Chained operator的
ExecutionVertex
所組成的pipeline- 在生成
JobGraph
時會根據條件判斷能否將兩個Operator串聯在一起,並將一連串的Chained operator生成一個JobVertex
- 生成
ExecutionGraph
時,將JobVertex
展開為並行化版本的ExecutionVertex
- 每一個
ExecutionVertex
對應JobVertex
的一個並行子任務
- 在生成
- Task負責Element在Operator chain中傳遞,Element包含Input element,Watermark和Checkpoint barriers
- Task為獨立執行緒,多個Task可以包含在一個Task shot內,參考官方文檔
StreamTask
為Flink流式引擎中所有不同Task子類型的基礎StreamTask
在其生命週期中,透過OperatorChain
操作Operator- Operator透過不同的方法處理不同類型的Element
- Input element: processElement()
- Watermark: processWatermark()
- Checkpoint barriers: 異步調用snapshotState()
- Operator中會在其生命週期中,調用開發者的實作代碼(UDF)
Build and Deploy Executeion Graph
Dispatcher
(StandaloneDispatcher
的父類)透過JobManagerRunner
封裝的RPC接口操作JobMaster
,在JobMaster
在初始化時調用createAndRestoreExecutionGraph(...)
,其再調用createExecutionGraph(...)
,此時JobMaster
就擁有創建好的ExecutionGraph
,存放在executionGraph變量中。
接著JobManagerRunner
調用JobMaster
的start()
,在JobMaster
內部會一路調用至scheduleExecutionGraph()
,其內部調用ExecutionGraph
的scheduleForExecution()
方法。
|
|
ExecutionGraph
的scheduleForExecution()
方法中會根據scheduleMode變量判斷,使用不同的策略申請Slot資源,具體代碼參考scheduleLazy(...)
或scheduleEager(...)
,這裡代碼追縱scheduleEager(...)
;在其代碼流程中,會先為每個ExecutionJobVertex
申請Slot資源;申請成功之後,會在Execution
的粒度上進行部署,透過Execution
的deploy()
方法。
|
|
Execution
的deploy()
中,使用TaskManagerGateway
的submitTask(...)
進行Task提交,TaskManagerGateway
的instance為RpcTaskManagerGateway
,此時對應的TaskManagerRunner
process 就會透過Akka RPC接收到Task。
|
|
Create and run Task
TaskExector
的submitTask(...)
會接收來自RPC Task的提交,在準備初始化Task
一切所需的數據完備後,創建Task
object;每一個提交的Task,皆會創建一個Task
object。
創建完畢後,透過調用Task
的startTaskThread()
執行Thread。
|
|
Task
繼承Thread.實作Runnable,故每一個Task
object都是一個獨立的Thread。Task
的run()
方法中,透過loadAndInstantiateInvokable()
方法反射獲得AbstractInvokable
的instance,其子類StreamTask
是所有不同Task子類型的基礎,例如SourceStreamTask、OneInputStreamTask、TwoInputStreamTask。run()
中調用AbstractInvokable
的invoke()
方法,對應StreamTask
的invoke()
。
|
|
在StreamTask
的invoke()
中,會創建OperatorChain
,其表示此Task中包含的所有Operator(並行化版本)。OperatorChain
的初始化中,會創建所有其負責的Operator的上下游關係,包含Element的傳遞,透過其內部類ChainingOutput
或CopyingChainingOutput
進行實作。
其二者都擁有一個OneInputStreamOperator
變量,其代表著OperatorChain
中當前Operator的下游Operator;當ChainingOutput
接收到當前Operator提交的數據時,直接調用下游Operator的processElement()
方法。ChainingOutput
的父類Output
,在每個StreamOperator都擁有Output
成員,用於收集當前Operator處理完的數據。
|
|
|
|
Summary
- JobMaster創建完Execution graph後,透過Akka RPC提交至
TaskManagerRunner
process。 TaskManagerRunner
process將每個提交的Task創建對應Task
thread。Task
thread可以根據Flink slot策略(SlotSharingGroup與CoLocationGroup),與其他Task共享一個Task slot。