val numberProcessors = Hardware.getNumberCPUCores()
val futureExecutor = Executors.newScheduledThreadPool( numberProcessors, new ExecutorThreadFactory("jobmanager-future"))
val ioExecutor = Executors.newFixedThreadPool( numberProcessors, new ExecutorThreadFactory("jobmanager-io"))
val timeout = AkkaUtils.getTimeout(configuration)
// we have to first start the JobManager ActorSystem because this determines the port if 0 // was chosen before. The method startActorSystem will update the configuration correspondingly. val jobManagerSystem = startActorSystem( configuration, listeningAddress, listeningPort)
val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( configuration, ioExecutor, AddressResolution.NO_ADDRESS_RESOLUTION)
val metricRegistry = new MetricRegistryImpl( MetricRegistryConfiguration.fromConfiguration(configuration))
try { // good, we are allowed to deploy if (!slot.setExecutedVertex(this)) { thrownew JobException("Could not assign the ExecutionVertex to the slot " + slot); }
// race double check, did we fail/cancel and do we need to release the slot? if (this.state != DEPLOYING) { slot.releaseSlot(); return; }
if (LOG.isInfoEnabled()) { LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(), attemptNumber, getAssignedResourceLocation().getHostname())); }
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskState, attemptNumber);
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
for (ResultPartitionDeploymentDescriptor desc: resultPartitionDeploymentDescriptors) { ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);
this.producedPartitions[counter] = new ResultPartition( taskNameWithSubtaskAndId, this, jobId, partitionId, desc.getPartitionType(), desc.getNumberOfSubpartitions(), desc.getMaxParallelism(), networkEnvironment.getResultPartitionManager(), resultPartitionConsumableNotifier, ioManager, desc.sendScheduleOrUpdateConsumersMessage()); //为每个partition初始化对应的writer writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
++counter; }
// Consumed intermediate result partitions this.inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()]; this.inputGatesById = new HashMap<>();
while (true) { ExecutionState current = this.executionState; ////如果当前的执行状态为CREATED,则将其设置为DEPLOYING状态 if (current == ExecutionState.CREATED) { if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } } //如果当前执行状态为FAILED,则发出通知并退出run方法 elseif (current == ExecutionState.FAILED) { // we were immediately failed. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } //如果当前执行状态为CANCELING,则将其修改为CANCELED状态,并退出run elseif (current == ExecutionState.CANCELING) { if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } } //否则说明发生了异常 else { if (metrics != null) { metrics.close(); } thrownew IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } }
// if the clock is not already set, then assign a default TimeServiceProvider //处理timer if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); }
//把之前JobGraph串起来的chain的信息形成实现 operatorChain = new OperatorChain<>(this); headOperator = operatorChain.getHeadOperator();
// task specific initialization //这个init操作的起名非常诡异,因为这里主要是处理算子采用了自定义的checkpoint检查机制的情况,但是起了一个非常大众脸的名字 init();
// save the work of reloading state, etc, if the task is already canceled if (canceled) { thrownew CancelTaskException(); }
// we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) {
// both the following operations are protected by the lock // so that we avoid race conditions in the case that initializeState() // registers a timer, that fires before the open() is called.
This integrates the timer as a service in StreamTask that StreamOperators can use by calling a method on the StreamingRuntimeContext. This also ensures that the timer callbacks can not be called concurrently with other methods on the StreamOperator. This behaviour is ensured by an ITCase.
protectedvoidrun()throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }