Run Flink Application by Bash Script Source Code Trace

Basic information

  • Flink version: 1.8

在使用Bash Script時,可以使用下面方式Submit一個SocketWindowWordCount的Flink application至JobManager

1
./flink run ../examples/streaming/SocketWindowWordCount.jar --port 9000

  • 此時會調用CliFrontend Class 的main @org/apache/flink/client/cli/CliFrontend.java
    主要分成兩個主要步驟:
    1. 創建CliFrontend object
    2. 調用CliFrontend object的parseParameters,將使用者實作的Flink application邏輯Submit至JobManager
  • 創建CliFrontend object時,會先構造其所需的參數,當中customCommandLines是透過Static function loadCustomCommandLines(…)構造,回傳的List當中,包含FlinkYarnSessionCli與DefaultCLI
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class CliFrontend {
public static void main(final String[] args) {
// ...
// 3. load the custom command lines
final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
try {
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseParameters(args));
System.exit(retCode);
}
catch (Throwable t) {
// ...
}
}
public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
customCommandLines.add(
loadCustomCommandLine(flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
customCommandLines.add(new DefaultCLI(configuration));
return customCommandLines;
}
}

CliFrontend Class

  • parseParameters(…)
    • 根據參數args調用對應的Action function,此處的Action為run,故調用run(params)
      - run(…)
    • 使用buildProgram(runOptions)取得PackagedProgram object
    • 使用getActiveCustomCommandLine(commandLine)取得CustomCommandLine<?> object,此處為DefaultCLI Object @org/apache/flink/client/cli/DefaultCLI.java
    • 調用runProgram(…) Submit使用者的Flink application邏輯至JobManager

下列章節依序分別描述上述buildProgram與runProgram(…)兩步驟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public class CliFrontend {
private final Configuration configuration;
private final List<CustomCommandLine<?>> customCommandLines;
private final Options customCommandLineOptions;
private final FiniteDuration clientTimeout;
private final int defaultParallelism;
public CliFrontend(
Configuration configuration,
List<CustomCommandLine<?>> customCommandLines) throws Exception {
this.configuration = Preconditions.checkNotNull(configuration);
this.customCommandLines = Preconditions.checkNotNull(customCommandLines);
try {
FileSystem.initialize(this.configuration);
} catch (IOException e) {
throw new Exception("Error while setting the default " +
"filesystem scheme from configuration.", e);
}
this.customCommandLineOptions = new Options();
for (CustomCommandLine<?> customCommandLine : customCommandLines) {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
/**
* Parses the command line arguments and starts the requested action.
*
* @param args command line arguments of the client.
* @return The return code of the program
*/
public int parseParameters(String[] args) {
// ...
// remove action from parameters
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_LIST:
# ...
}
} catch (CliArgsException ce) {
// ...
}
}
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
final RunOptions runOptions = new RunOptions(commandLine);
// evaluate help flag
if (runOptions.isPrintHelp()) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
if (runOptions.getJarFilePath() == null) {
throw new CliArgsException("The program JAR file was not specified.");
}
final PackagedProgram program;
try {
LOG.info("Building program from JAR file");
program = buildProgram(runOptions);
}
catch (FileNotFoundException e) {
throw new CliArgsException("Could not build the program from JAR file.", e);
}
final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
try {
runProgram(customCommandLine, commandLine, runOptions, program);
} finally {
program.deleteExtractedLibraries();
}
}
PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException ProgramInvocationException {
String[] programArgs = options.getProgramArgs();
String jarFilePath = options.getJarFilePath();
List<URL> classpaths = options.getClasspaths();
if (jarFilePath == null) {
throw new IllegalArgumentException("The program JAR file was not specified.");
}
File jarFile = new File(jarFilePath);
// ...
// Get assembler class
String entryPointClass = options.getEntryPointClassName();
PackagedProgram program = entryPointClass == null ?
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
return program;
}
public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) {
for (CustomCommandLine<?> cli : customCommandLines) {
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No command-line ran.");
}
}

PackagedProgram Class

  • 在CliFrontend的buildProgram(…) function中,最終回傳一個PackagedProgram object @org/apache/flink/client/program/PackagedProgram.java
  • PackagedProgram object主要封裝了使用者的Flink application的入口類、jar文件、classpath路徑、用戶配置的參數
  • 透過Bash script submit的方式會使hasMainMethod(mainClass) 回傳True,故此時的this.program = null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class PackagedProgram {
public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
// ...
// load the entry point class
this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
// if the entry point is a program, instantiate the class and get the plan
if (Program.class.isAssignableFrom(this.mainClass)) {
Program prg = null;
try {
prg = InstantiationUtil.instantiate(this.mainClass.asSubclass(Program.class), Program.class);
} catch (Exception e) {
// validate that the class has a main method at least.
// the main method possibly instantiates the program properly
if (!hasMainMethod(mainClass)) {
throw new ProgramInvocationException("The given program class implements the " +
Program.class.getName() + " interface, but cannot be instantiated. " +
"It also declares no main(String[]) method as alternative entry point", e);
}
} catch (Throwable t) {
throw new ProgramInvocationException("Error while trying to instantiate program class.", t);
}
this.program = prg;
} else if (hasMainMethod(mainClass)) {
this.program = null;
} else {
throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
Program.class.getName() + " interface.");
}
}
}

runProgram Function of CliFrontend Class

  • ClusterDescriptor clusterDescriptor
    • customCommandLine為外部傳入,實際為DefaultCLI Object @org/apache/flink/client/cli/DefaultCLI.java
    • customCommandLine.createClusterDescriptor(…)會獲得StandaloneClusterDescriptor object @org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
  • clusterDescriptor.retrieve(clusterId)
    • 調用StandaloneClusterDescriptor object的retrieve(…)
    • client變數會被賦值為RestClusterClient object @org/apache/flink/client/program/rest/RestClusterClient.java
  • executeProgram(…)
    • client.run(…)
      • 調用RestClusterClient object父類別ClusterClient @org/apache/flink/client/program/ClusterClient.java 的run(PackagedProgram prog, int parallelism)
      • run(PackagedProgram prog, int parallelism)
        • new ContextEnvironmentFactory(…)
          • 獲取ContextEnvironmentFactory object @org/apache/flink/client/program/ContextEnvironmentFactory.java
          • 將ContextEnvironmentFactory object賦值給factory
        • ContextEnvironment.setAsContext(factory)
          • 將ContextEnvironmentFactory object透過ContextEnvironment @org/apache/flink/client/program/ContextEnvironment.java賦值給ExecutionEnvironment @org/apache/flink/api/java/ExecutionEnvironment.java 的contextEnvironmentFactory,
          • 讓使用者的Flink application在實作邏輯時,使用StreamExecutionEnvironment.getExecutionEnvironment()獲得的ExecutionEnvironment即是ContextEnvironmentFactory object
        • prog.invokeInteractiveModeForExecution()
          • 調用使用者的Flink application入口函數
  • env.execute(…)
    • 調用ContextEnvironment的execute(…) function
      • client.run(…).getJobExecutionResult();
        • 調用RestClusterClient父類ClusterClient的JobSubmissionResult的run(JobWithJars program, int parallelism)
          • 準備執行所需的Graph,並調用RestClusterClient的submitJob(…),將Flink application submit至JobManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class CliFrontend {
private <T> void runProgram(
CustomCommandLine<T> customCommandLine,
CommandLine commandLine,
RunOptions runOptions,
PackagedProgram program) throws ProgramInvocationException, FlinkException {
final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
try {
final T clusterId = customCommandLine.getClusterId(commandLine);
final ClusterClient<T> client;
// directly deploy the job if the cluster is started in job mode and detached
if (clusterId == null && runOptions.getDetachedMode()) {
// ...
} else {
if (clusterId != null) {
client = clusterDescriptor.retrieve(clusterId);
shutdownHook = null;
} else {
// ...
}
try {
// ...
executeProgram(program, client, userParallelism);
} finally {
// ...
}
}
} finally {
// ...
}
}
protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
logAndSysout("Starting execution of program");
final JobSubmissionResult result = client.run(program, parallelism);
if (null == result) {
throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " +
"ExecutionEnvironment.execute()");
}
if (result.isJobExecutionResult()) {
logAndSysout("Program execution finished");
JobExecutionResult execResult = result.getJobExecutionResult();
System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
}
} else {
logAndSysout("Job has been submitted with JobID " + result.getJobID());
}
}
}

@org/apache/flink/client/cli/DefaultCLI.java

1
2
3
4
5
6
7
8
9
public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> {
@Override
public StandaloneClusterDescriptor createClusterDescriptor(
CommandLine commandLine) throws FlinkException {
final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
return new StandaloneClusterDescriptor(effectiveConfiguration);
}
}

@org/apache/flink/client/deployment/StandaloneClusterDescriptor.java

1
2
3
4
5
6
7
8
9
10
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
@Override
public RestClusterClient<StandaloneClusterId> retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException {
try {
return new RestClusterClient<>(config, standaloneClusterId);
} catch (Exception e) {
throw new ClusterRetrieveException("Couldn't retrieve standalone cluster", e);
}
}
}

@org/apache/flink/client/program/rest/RestClusterClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient {
@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached());
final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
if (isDetached()) {
try {
return jobSubmissionFuture.get();
} catch (Exception e) {
throw new ProgramInvocationException("Could not submit job",
jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
}
} else {
final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose(
ignored -> requestJobResult(jobGraph.getJobID()));
final JobResult jobResult;
try {
jobResult = jobResultFuture.get();
} catch (Exception e) {
throw new ProgramInvocationException("Could not retrieve the execution result.",
jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
}
try {
this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
return lastJobExecutionResult;
} catch (JobExecutionException e) {
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
}
}
}
@Override
public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
// we have to enable queued scheduling because slot will be allocated lazily
jobGraph.setAllowQueuedScheduling(true);
CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
try {
final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
objectOut.writeObject(jobGraph);
}
return jobGraphFile;
} catch (IOException e) {
throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
}
}, executorService);
CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
List<String> jarFileNames = new ArrayList<>(8);
List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
Collection<FileUpload> filesToUpload = new ArrayList<>(8);
filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
for (Path jar : jobGraph.getUserJars()) {
jarFileNames.add(jar.getName());
filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
}
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) {
artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName()));
filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
}
final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
jobGraphFile.getFileName().toString(),
jarFileNames,
artifactFileNames);
return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
});
final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
requestAndFileUploads -> sendRetriableRequest(
JobSubmitHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
requestAndFileUploads.f0,
requestAndFileUploads.f1,
isConnectionProblemOrServiceUnavailable())
);
submissionFuture
.thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)
.thenAccept(jobGraphFile -> {
try {
Files.delete(jobGraphFile);
} catch (IOException e) {
log.warn("Could not delete temporary file {}.", jobGraphFile, e);
}
});
return submissionFuture
.thenApply(
(JobSubmitResponseBody jobSubmitResponseBody) -> new JobSubmissionResult(jobGraph.getJobID()))
.exceptionally(
(Throwable throwable) -> {
throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", ExceptionUtils.stripCompletionException(throwable)));
});
}
}

@org/apache/flink/client/program/ClusterClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public abstract class ClusterClient<T> {
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
throws ProgramInvocationException, ProgramMissingJobException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
// ...
}
else if (prog.isUsingInteractiveMode()) {
log.info("Starting program in interactive mode (detached: {})", isDetached());
// ...
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
prog.getSavepointSettings());
ContextEnvironment.setAsContext(factory);
try {
// invoke main method
prog.invokeInteractiveModeForExecution();
if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
throw new ProgramMissingJobException("The program didn't contain a Flink job.");
}
if (isDetached()) {
// in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
}
else {
// in blocking mode, we execute all Flink jobs contained in the user code and then return here
return this.lastJobExecutionResult;
}
}
finally {
ContextEnvironment.unsetContext();
}
}
else {
throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
}
}
public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
return run(program, parallelism, SavepointRestoreSettings.none());
}
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
throws CompilerException, ProgramInvocationException {
ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
}
OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
}
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
throws ProgramInvocationException {
JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
return submitJob(job, classLoader);
}
}

@org/apache/flink/client/program/ContextEnvironment.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ContextEnvironment extends ExecutionEnvironment {
static void setAsContext(ContextEnvironmentFactory factory) {
initializeContextEnvironment(factory);
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
this.userCodeClassLoader);
this.lastJobExecutionResult = client.run(toRun, getParallelism(), savepointSettings).getJobExecutionResult();
return this.lastJobExecutionResult;
}
}

@org/apache/flink/api/java/ExecutionEnvironment.java

1
2
3
4
5
6
7
8
9
10
11
@Public
public abstract class ExecutionEnvironment {
protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = Preconditions.checkNotNull(ctx);
}
public static ExecutionEnvironment getExecutionEnvironment() {
return contextEnvironmentFactory == null ?
createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment();
}
}