Run Flink Application by Bash Script Source Code Trace
Basic information
- Flink version: 1.8
Run Flink Application by Bash Script
在使用Bash Script時,可以使用下面方式Submit一個SocketWindowWordCount的Flink application至JobManager
- 此時會調用CliFrontend Class 的main @org/apache/flink/client/cli/CliFrontend.java
主要分成兩個主要步驟:- 創建CliFrontend object
- 調用CliFrontend object的parseParameters,將使用者實作的Flink application邏輯Submit至JobManager
- 創建CliFrontend object時,會先構造其所需的參數,當中customCommandLines是透過Static function loadCustomCommandLines(…)構造,回傳的List當中,包含FlinkYarnSessionCli與DefaultCLI
|
|
CliFrontend Class
- parseParameters(…)
- 根據參數args調用對應的Action function,此處的Action為run,故調用run(params)
- run(…) - 使用buildProgram(runOptions)取得PackagedProgram object
- 使用getActiveCustomCommandLine(commandLine)取得CustomCommandLine<?> object,此處為DefaultCLI Object @org/apache/flink/client/cli/DefaultCLI.java
- 調用runProgram(…) Submit使用者的Flink application邏輯至JobManager
- 根據參數args調用對應的Action function,此處的Action為run,故調用run(params)
下列章節依序分別描述上述buildProgram與runProgram(…)兩步驟
|
|
PackagedProgram Class
- 在CliFrontend的buildProgram(…) function中,最終回傳一個PackagedProgram object @org/apache/flink/client/program/PackagedProgram.java
- PackagedProgram object主要封裝了使用者的Flink application的入口類、jar文件、classpath路徑、用戶配置的參數
- 透過Bash script submit的方式會使hasMainMethod(mainClass) 回傳True,故此時的this.program = null
|
|
runProgram Function of CliFrontend Class
- ClusterDescriptor
clusterDescriptor - customCommandLine為外部傳入,實際為DefaultCLI Object @org/apache/flink/client/cli/DefaultCLI.java
- customCommandLine.createClusterDescriptor(…)會獲得StandaloneClusterDescriptor object @org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
- clusterDescriptor.retrieve(clusterId)
- 調用StandaloneClusterDescriptor object的retrieve(…)
- client變數會被賦值為RestClusterClient object @org/apache/flink/client/program/rest/RestClusterClient.java
- executeProgram(…)
- client.run(…)
- 調用RestClusterClient object父類別ClusterClient @org/apache/flink/client/program/ClusterClient.java 的run(PackagedProgram prog, int parallelism)
- run(PackagedProgram prog, int parallelism)
- new ContextEnvironmentFactory(…)
- 獲取ContextEnvironmentFactory object @org/apache/flink/client/program/ContextEnvironmentFactory.java
- 將ContextEnvironmentFactory object賦值給factory
- ContextEnvironment.setAsContext(factory)
- 將ContextEnvironmentFactory object透過ContextEnvironment @org/apache/flink/client/program/ContextEnvironment.java賦值給ExecutionEnvironment @org/apache/flink/api/java/ExecutionEnvironment.java 的contextEnvironmentFactory,
- 讓使用者的Flink application在實作邏輯時,使用StreamExecutionEnvironment.getExecutionEnvironment()獲得的ExecutionEnvironment即是ContextEnvironmentFactory object
- prog.invokeInteractiveModeForExecution()
- 調用使用者的Flink application入口函數
- new ContextEnvironmentFactory(…)
- client.run(…)
User Flink Application
- env.execute(…)
- 調用ContextEnvironment的execute(…) function
- client.run(…).getJobExecutionResult();
- 調用RestClusterClient父類ClusterClient的JobSubmissionResult的run(JobWithJars program, int parallelism)
- 準備執行所需的Graph,並調用RestClusterClient的submitJob(…),將Flink application submit至JobManager
- 調用RestClusterClient父類ClusterClient的JobSubmissionResult的run(JobWithJars program, int parallelism)
- client.run(…).getJobExecutionResult();
- 調用ContextEnvironment的execute(…) function
|
|
@org/apache/flink/client/cli/DefaultCLI.java
@org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@org/apache/flink/client/program/rest/RestClusterClient.java
@org/apache/flink/client/program/ClusterClient.java
@org/apache/flink/client/program/ContextEnvironment.java
@org/apache/flink/api/java/ExecutionEnvironment.java