Stage提交

调用流程:
1.org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted
2. org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted.submitStage
3. org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted.submitMissingTasks
4. org.apache.spark.scheduler.TaskScheduler.submitTasks

   
    
     private
    
    [scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions:
    
     Array
    
    [Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    
     var
    
    finalStage: ResultStage =
    
     null
    
    
     try
    
    {
    
     // New stage creation may throw an exception if, for example, jobs are run on a
    
    
     // HadoopRDD whose underlying HDFS files have been deleted.
    
    
     //调用newResultStage创建Final Stage
    
    finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
    }
    
     catch
    
    {
    
     case
    
    e:
    
     Exception
    
    =>
        logWarning(
    
     "Creating new stage failed due to exception - job: "
    
    + jobId, e)
        listener.jobFailed(e)
    
     return
    
    }
    
     if
    
    (finalStage !=
    
     null
    
    ) {
      val job =
    
     new
    
    ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo(
    
     "Got job %s (%s) with %d output partitions"
    
    .format(
        job.jobId, callSite.shortForm, partitions.length))
      logInfo(
    
     "Final stage: "
    
    + finalStage +
    
     "("
    
    + finalStage.name +
    
     ")"
    
    )
      logInfo(
    
     "Parents of final stage: "
    
    + finalStage.parents)
      logInfo(
    
     "Missing parents: "
    
    + getMissingParentStages(finalStage))
      val jobSubmissionTime = clock.getTimeMillis()
      jobIdToActiveJob(jobId) = job
      activeJobs += job
      finalStage.resultOfJob = Some(job)
      val stageIds = jobIdToStageIds(jobId).toArray
      val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
      listenerBus.post(
        SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    
     //提交finalStage
    
    submitStage(finalStage)
    }
    submitWaitingStages()
  }
   
  

通过submitStage方法提交finalStage,方法会递归地将finalStage依赖的父stage先提交,最后提交finalStage,具体代码如下:

   
    
     /** Submits stage, but first recursively submits any missing parents. */
    
    
     private
    
    
     def
    
    submitStage(stage: Stage) {
    
     val
    
    jobId = activeJobForStage(stage)
    
     if
    
    (jobId.isDefined) {
      logDebug(
    
     "submitStage("
    
    + stage +
    
     ")"
    
    )
    
     if
    
    (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
    
     //获取依赖的未提交的父stage
    
    
     val
    
    missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug(
    
     "missing: "
    
    + missing)
    
     if
    
    (missing.isEmpty) {
          logInfo(
    
     "Submitting "
    
    + stage +
    
     " ("
    
    + stage.rdd +
    
     "), which has no missing parents"
    
    )
    
     //如果父Stage都提交完成,则提交Stage
    
    submitMissingTasks(stage, jobId.get)
        }
    
     else
    
    {
    
     //如果有未提交的父Stage,则递归提交
    
    
     for
    
    (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    }
    
     else
    
    {
      abortStage(stage,
    
     "No active job for stage "
    
    + stage.id, None)
    }
  }
   
  

从上面的代码可以看到,最终通过submitMissingTasks将Stage提交,其源代码如下:

   
    
     /** Called when stage's parents are available and we can now do its task. */
    
    
     private
    
    
     def
    
    submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug(
    
     "submitMissingTasks("
    
    + stage +
    
     ")"
    
    )
    
     // Get our pending tasks and remember them in our pendingTasks entry
    
    stage.pendingTasks.clear()
    
     // First figure out the indexes of partition ids to compute.
    
    
     //
    
    
     val
    
    (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
      stage
    
     match
    
    {
    
     //在DAG Stage依赖关系中,除之后的Stage 外,全部为ShuffleMapStage
    
    
     //allPartitions为所有partion的ID
    
    
     //filteredPartitions为不在缓存中的partion ID
    
    
     case
    
    stage: ShuffleMapStage =>
    
     val
    
    allPartitions =
    
     0
    
    until stage.numPartitions
    
     val
    
    filteredPartitions = allPartitions.filter { id => stage.outputLocs(id).isEmpty }
          (allPartitions, filteredPartitions)
    
     //在DAG Stage依赖关系中,最后的Stage为ResultStage
    
    
     case
    
    stage: ResultStage =>
    
     val
    
    job = stage.resultOfJob.get
    
     val
    
    allPartitions =
    
     0
    
    until job.numPartitions
    
     val
    
    filteredPartitions = allPartitions.filter { id => !job.finished(id) }
          (allPartitions, filteredPartitions)
      }
    }
    
     // Create internal accumulators if the stage has no accumulators initialized.
    
    
     // Reset internal accumulators only if this stage is not partially submitted
    
    
     // Otherwise, we may override existing accumulator values from some tasks
    
    
     if
    
    (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) {
      stage.resetInternalAccumulators()
    }
    
     val
    
    properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull

    runningStages += stage
    
     // SparkListenerStageSubmitted should be posted before testing whether tasks are
    
    
     // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    
    
     // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    
    
     // event.
    
    outputCommitCoordinator.stageStart(stage.id)
    
     //根据partitionsToCompute获取其优先位置PreferredLocations,使计算离数据最近
    
    
     val
    
    taskIdToLocations =
    
     try
    
    {
      stage
    
     match
    
    {
    
     case
    
    s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
    
     case
    
    s: ResultStage =>
    
     val
    
    job = s.resultOfJob.get
          partitionsToCompute.map { id =>
    
     val
    
    p = job.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    }
    
     catch
    
    {
    
     case
    
    NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s
    
     "Task creation failed: $e
${e.getStackTraceString}"
    
    , Some(e))
        runningStages -= stage
    
     return
    
    }

    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    
     // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    
    
     // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    
    
     // the serialized copy of the RDD and for each task we will deserialize it, which means each
    
    
     // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    
    
     // might modify state of objects referenced in their closures. This is necessary in Hadoop
    
    
     // where the JobConf/Configuration object is not thread-safe.
    
    
     var
    
    taskBinary: Broadcast[Array[Byte]] =
    
     null
    
    
     try
    
    {
    
     // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
    
    
     // For ResultTask, serialize and broadcast (rdd, func).
    
    
     val
    
    taskBinaryBytes: Array[Byte] = stage
    
     match
    
    {
    
     case
    
    stage: ShuffleMapStage =>
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
    
     case
    
    stage: ResultStage =>
          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
      }

      taskBinary = sc.broadcast(taskBinaryBytes)
    }
    
     catch
    
    {
    
     // In the case of a failure during serialization, abort the stage.
    
    
     case
    
    e: NotSerializableException =>
        abortStage(stage,
    
     "Task not serializable: "
    
    + e.toString, Some(e))
        runningStages -= stage
    
     // Abort execution
    
    
     return
    
    
     case
    
    NonFatal(e) =>
        abortStage(stage, s
    
     "Task serialization failed: $e
${e.getStackTraceString}"
    
    , Some(e))
        runningStages -= stage
    
     return
    
    }
    
     //根据ShuffleMapTask或ResultTask,用于后期创建TaskSet
    
    
     val
    
    tasks: Seq[Task[_]] =
    
     try
    
    {
      stage
    
     match
    
    {
    
     case
    
    stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
    
     val
    
    locs = taskIdToLocations(id)
    
     val
    
    part = stage.rdd.partitions(id)
    
     //创建ShuffleMapTask
    
    
     new
    
    ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.internalAccumulators)
          }
    
     case
    
    stage: ResultStage =>
    
     val
    
    job = stage.resultOfJob.get
          partitionsToCompute.map { id =>
    
     val
    
    p: Int = job.partitions(id)
    
     val
    
    part = stage.rdd.partitions(p)
    
     val
    
    locs = taskIdToLocations(id)
    
     //创建ResultTask
    
    
     new
    
    ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, stage.internalAccumulators)
          }
      }
    }
    
     catch
    
    {
    
     case
    
    NonFatal(e) =>
        abortStage(stage, s
    
     "Task creation failed: $e
${e.getStackTraceString}"
    
    , Some(e))
        runningStages -= stage
    
     return
    
    }
    
     if
    
    (tasks.size >
    
     0
    
    ) {
      logInfo(
    
     "Submitting "
    
    + tasks.size +
    
     " missing tasks from "
    
    + stage +
    
     " ("
    
    + stage.rdd +
    
     ")"
    
    )
      stage.pendingTasks ++= tasks
      logDebug(
    
     "New pending tasks: "
    
    + stage.pendingTasks)
    
     //重要!创建TaskSet并使用taskScheduler的submitTasks方法提交Stage
    
    taskScheduler.submitTasks(
    
     new
    
    TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
    
     else
    
    {
    
     //提交完毕
    
    
     // Because we posted SparkListenerStageSubmitted earlier, we should mark
    
    
     // the stage as completed here in case there are no tasks to run
    
    markStageAsFinished(stage, None)
    
     val
    
    debugString = stage
    
     match
    
    {
    
     case
    
    stage: ShuffleMapStage =>
          s
    
     "Stage ${stage} is actually done; "
    
    +
            s
    
     "(available: ${stage.isAvailable},"
    
    +
            s
    
     "available outputs: ${stage.numAvailableOutputs},"
    
    +
            s
    
     "partitions: ${stage.numPartitions})"
    
    
     case
    
    stage : ResultStage =>
          s
    
     "Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
    
    }
      logDebug(debugString)
    }
  }
   
  

在下一节中,将对taskScheduler.submitTasks方法进行介绍,讲解如何进行Task的提交。