Flink Task Lifecycle

Basic information

  • Flink version: 1.9

Task Lifecycle

此篇依序說明Flink從JobMaster創建ExecutionGraph到提交至TaskExector的流程,以及TaskStreamTask的生命週期

About Objects and Concepts

  • Task為Flink中分布式執行的基礎單元
  • Operator的包含一個或多個ExecutionVertex,而ExecutionVertex的數量與Operator的parallel相同
  • Task中包含一個或多個Chained operator的ExecutionVertex所組成的pipeline
    • 在生成JobGraph時會根據條件判斷能否將兩個Operator串聯在一起,並將一連串的Chained operator生成一個JobVertex
    • 生成ExecutionGraph時,將JobVertex展開為並行化版本的ExecutionVertex
    • 每一個ExecutionVertex對應JobVertex的一個並行子任務
  • Task負責Element在Operator chain中傳遞,Element包含Input element,Watermark和Checkpoint barriers
  • Task為獨立執行緒,多個Task可以包含在一個Task shot內,參考官方文檔
  • StreamTask為Flink流式引擎中所有不同Task子類型的基礎
  • StreamTask在其生命週期中,透過OperatorChain操作Operator
  • Operator透過不同的方法處理不同類型的Element
    • Input element: processElement()
    • Watermark: processWatermark()
    • Checkpoint barriers: 異步調用snapshotState()
  • Operator中會在其生命週期中,調用開發者的實作代碼(UDF)

Build and Deploy Executeion Graph

Dispatcher(StandaloneDispatcher的父類)透過JobManagerRunner封裝的RPC接口操作JobMaster,在JobMaster在初始化時調用createAndRestoreExecutionGraph(...),其再調用createExecutionGraph(...),此時JobMaster就擁有創建好的ExecutionGraph,存放在executionGraph變量中。

接著JobManagerRunner調用JobMasterstart(),在JobMaster內部會一路調用至scheduleExecutionGraph(),其內部調用ExecutionGraphscheduleForExecution()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
public JobMaster(
RpcService rpcService,
JobMasterConfiguration jobMasterConfiguration,
ResourceID resourceId,
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityService,
SlotPoolFactory slotPoolFactory,
SchedulerFactory schedulerFactory,
JobManagerSharedServices jobManagerSharedServices,
HeartbeatServices heartbeatServices,
JobManagerJobMetricGroupFactory jobMetricGroupFactory,
OnCompletionActions jobCompletionActions,
FatalErrorHandler fatalErrorHandler,
ClassLoader userCodeLoader) throws Exception {
// ...
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
// ...
}
/**
* Start the rpc service and begin to run the job.
*
* @param newJobMasterId The necessary fencing token to run the job
* @return Future acknowledge if the job could be started. Otherwise the future contains an exception
*/
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
private void scheduleExecutionGraph() {
// ...
try {
executionGraph.scheduleForExecution();
}
catch (Throwable t) {
executionGraph.failGlobal(t);
}
}
}

ExecutionGraphscheduleForExecution()方法中會根據scheduleMode變量判斷,使用不同的策略申請Slot資源,具體代碼參考scheduleLazy(...)scheduleEager(...),這裡代碼追縱scheduleEager(...);在其代碼流程中,會先為每個ExecutionJobVertex申請Slot資源;申請成功之後,會在Execution的粒度上進行部署,透過Executiondeploy()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class ExecutionGraph implements AccessExecutionGraph {
public void scheduleForExecution() throws JobException {
// ...
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
final CompletableFuture<Void> newSchedulingFuture;
switch (scheduleMode) {
case LAZY_FROM_SOURCES:
newSchedulingFuture = scheduleLazy(slotProvider);
break;
case EAGER:
newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
break;
default:
throw new JobException("Schedule mode is invalid.");
}
if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
schedulingFuture = newSchedulingFuture;
newSchedulingFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null && !(throwable instanceof CancellationException)) {
// only fail if the scheduling future was not canceled
failGlobal(ExceptionUtils.stripCompletionException(throwable));
}
});
} else {
newSchedulingFuture.cancel(false);
}
}
else {
throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
}
}
/**
*
*
* @param slotProvider The resource provider from which the slots are allocated
* @param timeout The maximum time that the deployment may take, before a
* TimeoutException is thrown.
* @return Future which is completed once the {@link ExecutionGraph} has been scheduled.
* The future can also be completed exceptionally if an error happened.
*/
private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
// ...
// allocate the slots (obtain all their futures
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
// these calls are not blocking, they only return futures
Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
slotProvider,
queued,
LocationPreferenceConstraint.ALL,
allPreviousAllocationIds,
timeout);
allAllocationFutures.addAll(allocationFutures);
}
// this future is complete once all slot futures are complete.
// the future fails once one slot future fails.
final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
return allAllocationsFuture.thenAccept(
(Collection<Execution> executionsToDeploy) -> {
for (Execution execution : executionsToDeploy) {
try {
execution.deploy();
} catch (Throwable t) {
throw new CompletionException(
new FlinkException(
String.format("Could not deploy execution %s.", execution),
t));
}
}
})
// Generate a more specific failure message for the eager scheduling
.exceptionally(
// ...
});
}
}

Executiondeploy()中,使用TaskManagerGatewaysubmitTask(...)進行Task提交,TaskManagerGateway的instance為RpcTaskManagerGateway,此時對應的TaskManagerRunner process 就會透過Akka RPC接收到Task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {
/**
* Deploys the execution to the previously assigned resource.
*
* @throws JobException if the execution cannot be deployed to the assigned resource
*/
public void deploy() throws JobException {
assertRunningInJobMasterMainThread();
final LogicalSlot slot = assignedResource;
// ...
try {
// ...
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskRestore,
attemptNumber);
// null taskRestore to let it be GC'ed
taskRestore = null;
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
vertex.getExecutionGraph().getJobMasterMainThreadExecutor();
// We run the submission in the future executor so that the serialization of large TDDs does not block
// the main thread and sync back to the main thread once submission is completed.
CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
// only respond to the failure case
if (failure != null) {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
markFailed(new Exception(
"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
} else {
markFailed(failure);
}
}
},
jobMasterMainThreadExecutor);
}
catch (Throwable t) {
markFailed(t);
ExceptionUtils.rethrow(t);
}
}
}

Create and run Task

TaskExectorsubmitTask(...)會接收來自RPC Task的提交,在準備初始化Task一切所需的數據完備後,創建Task object;每一個提交的Task,皆會創建一個Task object。
創建完畢後,透過調用TaskstartTaskThread()執行Thread。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {
try {
final JobID jobId = tdd.getJobId();
final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
// ...
// deserialize the pre-serialized information
final JobInformation jobInformation;
final TaskInformation taskInformation;
try {
jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
}
if (!jobId.equals(jobInformation.getJobId())) {
throw new TaskSubmissionException(
"Inconsistent job ID information inside TaskDeploymentDescriptor (" +
tdd.getJobId() + " vs. " + jobInformation.getJobId() + ")");
}
TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
jobInformation.getJobId(),
jobInformation.getJobName(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskInformation.getTaskName(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber());
InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
jobManagerConnection.getJobManagerGateway(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getTimeout());
TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
GlobalAggregateManager aggregateManager = jobManagerConnection.getGlobalAggregateManager();
LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
jobId,
tdd.getAllocationId(),
taskInformation.getJobVertexId(),
tdd.getSubtaskIndex());
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
final TaskStateManager taskStateManager = new TaskStateManagerImpl(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
taskRestore,
checkpointResponder);
Task task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
taskExecutorServices.getMemoryManager(),
taskExecutorServices.getIOManager(),
taskExecutorServices.getNetworkEnvironment(),
taskExecutorServices.getBroadcastVariableManager(),
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
aggregateManager,
blobCacheService,
libraryCache,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());
log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
boolean taskAdded;
try {
taskAdded = taskSlotTable.addTask(task);
} catch (SlotNotFoundException | SlotNotActiveException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
if (taskAdded) {
task.startTaskThread();
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
// ...
}
} catch (TaskSubmissionException e) {
return FutureUtils.completedExceptionally(e);
}
}
}

Task繼承Thread.實作Runnable,故每一個Task object都是一個獨立的Thread。
Taskrun()方法中,透過loadAndInstantiateInvokable()方法反射獲得AbstractInvokable的instance,其子類StreamTask是所有不同Task子類型的基礎,例如SourceStreamTask、OneInputStreamTask、TwoInputStreamTask。
run()中調用AbstractInvokableinvoke()方法,對應StreamTaskinvoke()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public class Task implements Runnable, TaskActions, CheckpointListener {
/**
* Starts the task's thread.
*/
public void startTaskThread() {
executingThread.start();
}
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
// ----------------------------
// Initial State transition
// ----------------------------
// ...
// all resource acquisitions and registrations from here on
// need to be undone in the end
Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
AbstractInvokable invokable = null;
try {
// ----------------------------
// Task Bootstrap - We periodically
// check for canceling as a shortcut
// ----------------------------
// ...
// ----------------------------------------------------------------
// register the task with the network stack
// this operation may fail if the system does not have enough
// memory to run the necessary data exchanges
// the registration must also strictly be undone
// ----------------------------------------------------------------
// ...
// ----------------------------------------------------------------
// call the user code initialization methods
// ----------------------------------------------------------------
TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
Environment env = new RuntimeEnvironment(
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
taskStateManager,
aggregateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
producedPartitions,
inputGates,
network.getTaskEventDispatcher(),
checkpointResponder,
taskManagerConfig,
metrics,
this);
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
// make sure, we enter the catch block if the task leaves the invoke() method due
// to the fact that it has been canceled
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// finalization of a successful execution
// ----------------------------------------------------------------
// ...
}
catch (Throwable t) {
// ...
}
finally {
// ...
}
}
}

StreamTaskinvoke()中,會創建OperatorChain,其表示此Task中包含的所有Operator(並行化版本)。
OperatorChain的初始化中,會創建所有其負責的Operator的上下游關係,包含Element的傳遞,透過其內部類ChainingOutputCopyingChainingOutput進行實作。
其二者都擁有一個OneInputStreamOperator變量,其代表著OperatorChain中當前Operator的下游Operator;當ChainingOutput接收到當前Operator提交的數據時,直接調用下游Operator的processElement()方法。
ChainingOutput的父類Output,在每個StreamOperator都擁有Output成員,用於收集當前Operator處理完的數據。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
@Override
public final void invoke() throws Exception {
boolean disposed = false;
try {
// -------- Initialize ---------
// ...
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();
// task specific initialization
init();
// save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
synchronized (lock) {
// both the following operations are protected by the lock
// so that we avoid race conditions in the case that initializeState()
// registers a timer, that fires before the open() is called.
initializeState();
openAllOperators();
}
// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
isRunning = true;
run();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
LOG.debug("Finished task {}", getName());
// make sure no further checkpoint and notification actions happen.
// we make sure that no other thread is currently in the locked scope before
// we close the operators by trying to acquire the checkpoint scope lock
// we also need to make sure that no triggers fire concurrently with the close logic
// at the same time, this makes sure that during any "regular" exit where still
synchronized (lock) {
// this is part of the main logic, so if this fails, the task is considered failed
closeAllOperators();
// make sure no new timers can come
timerService.quiesce();
// only set the StreamTask to not running after all operators have been closed!
// See FLINK-7430
isRunning = false;
}
// make sure all timers finish
timerService.awaitPendingAfterQuiesce();
LOG.debug("Closed operators for task {}", getName());
// make sure all buffered data is flushed
operatorChain.flushOutputs();
// make an attempt to dispose the operators such that failures in the dispose call
// still let the computation fail
tryDisposeAllOperators();
disposed = true;
}
finally {
// ...
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
public OperatorChain(
StreamTask<OUT, OP> containingTask,
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters) {
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
final StreamConfig configuration = containingTask.getConfiguration();
headOperator = configuration.getStreamOperator(userCodeClassloader);
// we read the chained configs, and the order of record writer registrations by output name
Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
// create the final output stream writers
// we iterate through all the out edges from this job vertex and create a stream output
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
// from here on, we need to make sure that the output writers are shut down again on failure
boolean success = false;
try {
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
RecordWriterOutput<?> streamOutput = createStreamOutput(
recordWriters.get(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}
// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOps);
if (headOperator != null) {
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
headOperator.setup(containingTask, configuration, output);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
}
// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
success = true;
}
finally {
// make sure we clean up after ourselves in case of a failure after acquiring
// the first resources
if (!success) {
for (RecordWriterOutput<?> output : this.streamOutputs) {
if (output != null) {
output.close();
}
}
}
}
}}

Summary

  • JobMaster創建完Execution graph後,透過Akka RPC提交至TaskManagerRunner process。
  • TaskManagerRunner process將每個提交的Task創建對應Task thread。
  • Task thread可以根據Flink slot策略(SlotSharingGroup與CoLocationGroup),與其他Task共享一個Task slot。