1. 程式人生 > >Spark學習之10:Task執行結果返回流程

Spark學習之10:Task執行結果返回流程

當ShuffleMapTask或ResultTask執行完成後,其結果會傳遞給Driver。

1. 返回流程


返回流程涉及Executor和Driver。

2. TaskRunner.run

    override def run() {
      ......
      try {
        ......
        // Run the actual task and measure its runtime.
        taskStart = System.currentTimeMillis()
        val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
        val taskFinish = System.currentTimeMillis()
        // If the task has been killed, let's fail it.
        if (task.killed) {
          throw new TaskKilledException
        }
        val resultSer = env.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()
        ......
        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
        val serializedDirectResult = ser.serialize(directResult)
        val resultSize = serializedDirectResult.limit
        // directSend = sending directly back to the driver
        val serializedResult = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
      } catch {
        ......
      } finally {
        ......
      }
    }
  }
(1)呼叫Task.run開始Task計算,ShuffleMapTask的返回結果為MapStatus物件。MapStatus有兩個實現: CompressedMapStatus和HighlyCompressedMapStatus。MapStatus用於獲取Map輸出結果所在的BlockManager,以及各個輸出分割槽的大小。兩個實現類主要用來對輸出分割槽大小進行壓縮處理。
(2)將Task執行結果進行序列化處理; (3)建立DirectTaskResult物件,封裝序列化後的Task結果; (4)序列化DirectTaskResult物件; (5)依據序列化DirectTaskResult物件的大小,對序列化結果做不同處理。如果結果大小超過
maxResultSize,則丟棄;如果結果大小超過akka的FrameSize,則將結果作為一個Block儲存在BlockManager中 (6)呼叫CoarseGrainedExecutorBackend的statusUpdate方法,該方法將向DriverActor傳送StatusUpdate訊息。

3. DriverActor處理StatusUpdate訊息

      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                "from unknown executor $sender with ID $executorId")
          }
        }
(1)呼叫TaskSchedulerImpl.statusUpdate方法; (2)修改狀態。

4. TaskSchedulerImpl.statusUpdate

            activeTaskSets.get(taskSetId).foreach { taskSet =>
              if (state == TaskState.FINISHED) {
                taskSet.removeRunningTask(tid)
                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
              } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
                taskSet.removeRunningTask(tid)
                taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
              }
            }
呼叫TaskResultGetter.enqueueSuccessfulTask方法,該方法將建立Runnable物件,交由執行緒池來執行。Runnable物件的主要工作是獲取Task計算的結果,然後呼叫TaskSchedulerImpl.handleSuccessfulTask方法。 從流程中看出,Runnable物件執行執行緒會建立CompletionEvent物件,並壓入DAGSchedulerEventProcessLoop的訊息佇列,由事件迴圈執行緒讀取該訊息並呼叫DAGSchedulerEventProcessLoop.onReceive方法進行訊息分發。

5. DAGSchedulerEventProcessLoop.onReceive

  override def onReceive(event: DAGSchedulerEvent): Unit = event match {
    ......
    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)
    .......
  }
呼叫DAGScheduler.handleTaskCompletion處理CompletionEvent訊息。

6. DAGScheduler.handleTaskCompletion

只看task執行成功的情況,分為兩種Task結果。

6.1. ResultTask結果

          case rt: ResultTask[_, _] =>
            stage.resultOfJob match {
              case Some(job) =>
                if (!job.finished(rt.outputId)) {
                  ......
                  // taskSucceeded runs some user code that might throw an exception. Make sure
                  // we are resilient against that.
                  try {
                    job.listener.taskSucceeded(rt.outputId, event.result)
                  } catch {
                    case e: Exception =>
                      // TODO: Perhaps we want to mark the stage as failed?
                      job.listener.jobFailed(new SparkDriverExecutionException(e))
                  }
                }
              case None =>
                logInfo("Ignoring result from " + rt + " because its job has finished")
            }
(1)修改狀態; (2)呼叫JobWaiter.taskSucceeded方法,通知JobWaiter任務完成。

6.2. ShuffleMapTask結果

          case smt: ShuffleMapTask =>
            ......
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
            } else {
              stage.addOutputLoc(smt.partitionId, status)
            }
            if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {
              markStageAsFinished(stage)
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)
              if (stage.shuffleDep.isDefined) {
                // We supply true to increment the epoch number here in case this is a
                // recomputation of the map outputs. In that case, some nodes may have cached
                // locations with holes (from when we detected the error) and will need the
                // epoch incremented to refetch them.
                // TODO: Only increment the epoch number if this is not the first time
                //       we registered these map outputs.
                mapOutputTracker.registerMapOutputs(
                  stage.shuffleDep.get.shuffleId,
                  stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
                  changeEpoch = true)
              }
              clearCacheLocs()
              if (stage.outputLocs.exists(_ == Nil)) {
                // Some tasks had failed; let's resubmit this stage
                // TODO: Lower-level scheduler should also deal with this
                logInfo("Resubmitting " + stage + " (" + stage.name +
                  ") because some of its tasks had failed: " +
                  stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
                submitStage(stage)
              } else {
                val newlyRunnable = new ArrayBuffer[Stage]
                for (stage <- waitingStages) {
                  logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
                }
                for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
                  newlyRunnable += stage
                }
                waitingStages --= newlyRunnable
                runningStages ++= newlyRunnable
                for {
                  stage <- newlyRunnable.sortBy(_.id)
                  jobId <- activeJobForStage(stage)
                } {
                  logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
                  submitMissingTasks(stage, jobId)
                }
              }
            }
          }
(1)呼叫Stage.addOutputLoc方法記錄map輸出結果; (2)判斷Stage是否執行完,若是則執行下面的操作; (3)標記Stage為結束狀態; (4)呼叫MapOutputTracker.registerMapOuputs記錄Stage的所有map輸出結果; (5)若Stage包含失敗的Task,則重新提交Stage; (6)否則開始從waitingStages獲取可執行的Stages,並迴圈呼叫DAGScheduler.submitMissingTasks提交每個Stage。