Basic information
- Flink version: 1.9
Task Lifecycle
此篇依序說明Flink從JobMaster創建ExecutionGraph到提交至TaskExector的流程,以及Task與StreamTask的生命週期
About Objects and Concepts
- Task為Flink中分布式執行的基礎單元
- Operator的包含一個或多個
ExecutionVertex,而ExecutionVertex的數量與Operator的parallel相同 - Task中包含一個或多個Chained operator的
ExecutionVertex所組成的pipeline- 在生成
JobGraph時會根據條件判斷能否將兩個Operator串聯在一起,並將一連串的Chained operator生成一個JobVertex - 生成
ExecutionGraph時,將JobVertex展開為並行化版本的ExecutionVertex - 每一個
ExecutionVertex對應JobVertex的一個並行子任務
- 在生成
- Task負責Element在Operator chain中傳遞,Element包含Input element,Watermark和Checkpoint barriers
- Task為獨立執行緒,多個Task可以包含在一個Task shot內,參考官方文檔
StreamTask為Flink流式引擎中所有不同Task子類型的基礎StreamTask在其生命週期中,透過OperatorChain操作Operator- Operator透過不同的方法處理不同類型的Element
- Input element: processElement()
- Watermark: processWatermark()
- Checkpoint barriers: 異步調用snapshotState()
- Operator中會在其生命週期中,調用開發者的實作代碼(UDF)
Build and Deploy Executeion Graph
Dispatcher(StandaloneDispatcher的父類)透過JobManagerRunner封裝的RPC接口操作JobMaster,在JobMaster在初始化時調用createAndRestoreExecutionGraph(...),其再調用createExecutionGraph(...),此時JobMaster就擁有創建好的ExecutionGraph,存放在executionGraph變量中。
接著JobManagerRunner調用JobMaster的start(),在JobMaster內部會一路調用至scheduleExecutionGraph(),其內部調用ExecutionGraph的scheduleForExecution()方法。
|
|
ExecutionGraph的scheduleForExecution()方法中會根據scheduleMode變量判斷,使用不同的策略申請Slot資源,具體代碼參考scheduleLazy(...)或scheduleEager(...),這裡代碼追縱scheduleEager(...);在其代碼流程中,會先為每個ExecutionJobVertex申請Slot資源;申請成功之後,會在Execution的粒度上進行部署,透過Execution的deploy()方法。
|
|
Execution的deploy()中,使用TaskManagerGateway的submitTask(...)進行Task提交,TaskManagerGateway的instance為RpcTaskManagerGateway,此時對應的TaskManagerRunner process 就會透過Akka RPC接收到Task。
|
|
Create and run Task
TaskExector的submitTask(...)會接收來自RPC Task的提交,在準備初始化Task一切所需的數據完備後,創建Task object;每一個提交的Task,皆會創建一個Task object。
創建完畢後,透過調用Task的startTaskThread()執行Thread。
|
|
Task繼承Thread.實作Runnable,故每一個Task object都是一個獨立的Thread。Task的run()方法中,透過loadAndInstantiateInvokable()方法反射獲得AbstractInvokable的instance,其子類StreamTask是所有不同Task子類型的基礎,例如SourceStreamTask、OneInputStreamTask、TwoInputStreamTask。run()中調用AbstractInvokable的invoke()方法,對應StreamTask的invoke()。
|
|
在StreamTask的invoke()中,會創建OperatorChain,其表示此Task中包含的所有Operator(並行化版本)。OperatorChain的初始化中,會創建所有其負責的Operator的上下游關係,包含Element的傳遞,透過其內部類ChainingOutput或CopyingChainingOutput進行實作。
其二者都擁有一個OneInputStreamOperator變量,其代表著OperatorChain中當前Operator的下游Operator;當ChainingOutput接收到當前Operator提交的數據時,直接調用下游Operator的processElement()方法。ChainingOutput的父類Output,在每個StreamOperator都擁有Output成員,用於收集當前Operator處理完的數據。
|
|
|
|
Summary
- JobMaster創建完Execution graph後,透過Akka RPC提交至
TaskManagerRunnerprocess。 TaskManagerRunnerprocess將每個提交的Task創建對應Taskthread。Taskthread可以根據Flink slot策略(SlotSharingGroup與CoLocationGroup),與其他Task共享一個Task slot。