博客地址: http://blog.csdn.net/yueqian_zhu/


    这一节介绍具体task的运行以及最终结果的处理

      
       
        看线程运行的run方法,见代码注释
       
      
     
      

       
       
       
        override def run(): Unit = {
        val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
        val deserializeStartTime = System.currentTimeMillis()
        Thread.currentThread.setContextClassLoader(replClassLoader)
        val ser = env.closureSerializer.newInstance()
        logInfo(s"Running $taskName (TID $taskId)")
        //这个就是就是向Driver发送StatusUpdate方法,状态是RUNNING,其实不做什么操作的
        execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
        var taskStart: Long = 0
        startGCTime = computeTotalGcTime()
    
        try {
          //将serializedTask解析出来
          val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
          //下载运行task需要的jar,文件等
          updateDependencies(taskFiles, taskJars)
          //把真正的task反序列化出来
          task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
          task.setTaskMemoryManager(taskMemoryManager)
    
          // If this task has been killed before we deserialized it, let's quit now. Otherwise,
          // continue executing the task.
          if (killed) {
            // Throw an exception rather than returning, because returning within a try{} block
            // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
            // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
            // for the task.
            throw new TaskKilledException
          }
    
          logDebug("Task " + taskId + "'s epoch is " + task.epoch)
          env.mapOutputTracker.updateEpoch(task.epoch)
    
          // Run the actual task and measure its runtime.
          taskStart = System.currentTimeMillis()
          val value = try {
    	//任务执行,见下面解析
            task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
          } finally {
            // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
            // when changing this, make sure to update both copies.
            //释放内存
            val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
            if (freedMemory > 0) {
              val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
              if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
                throw new SparkException(errMsg)
              } else {
                logError(errMsg)
              }
            }
          }
          val taskFinish = System.currentTimeMillis()
    
          // If the task has been killed, let's fail it.
          if (task.killed) {
            throw new TaskKilledException
          }
    
          val resultSer = env.serializer.newInstance()
          val beforeSerialization = System.currentTimeMillis()
          //将task运行结果序列化
          val valueBytes = resultSer.serialize(value)
          val afterSerialization = System.currentTimeMillis()
    
          for (m <- task.metrics) {
            // Deserialization happens in two parts: first, we deserialize a Task object, which
            // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
            m.setExecutorDeserializeTime(
              (taskStart - deserializeStartTime) + task.executorDeserializeTime)
            // We need to subtract Task.run()'s deserialization time to avoid double-counting
            m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
            m.setJvmGCTime(computeTotalGcTime() - startGCTime)
            m.setResultSerializationTime(afterSerialization - beforeSerialization)
          }
    
          val accumUpdates = Accumulators.values
          val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
          val serializedDirectResult = ser.serialize(directResult)
          val resultSize = serializedDirectResult.limit
          //这里将最终结果序列化成serializedDirectResult,并根据这个序列化之后的大小区分处理
    
          // directSend = sending directly back to the driver
          val serializedResult: ByteBuffer = {
            //最终结果序列化之后>1G
            if (maxResultSize > 0 && resultSize > maxResultSize) {
              logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
                s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
                s"dropping it.")
              ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
            } 
            //最终结果序列化之后>10M,把序列化的结果作为一个Block存放在BlockManager里,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中
            else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
              val blockId = TaskResultBlockId(taskId)
              env.blockManager.putBytes(
                blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
              logInfo(
                s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
              ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
            } else {
              //小数据可以直接处理
              logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
              serializedDirectResult
            }
          }
    
          execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
    
        } catch {
          case ffe: FetchFailedException =>
            val reason = ffe.toTaskEndReason
            execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
    
          case _: TaskKilledException | _: InterruptedException if task.killed =>
            logInfo(s"Executor killed $taskName (TID $taskId)")
            execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
    
          case cDE: CommitDeniedException =>
            val reason = cDE.toTaskEndReason
            execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
    
          case t: Throwable =>
            // Attempt to exit cleanly by informing the driver of our failure.
            // If anything goes wrong (or this was a fatal exception), we will delegate to
            // the default uncaught exception handler, which will terminate the Executor.
            logError(s"Exception in $taskName (TID $taskId)", t)
    
            val metrics: Option[TaskMetrics] = Option(task).flatMap { task =>
              task.metrics.map { m =>
                m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
                m.setJvmGCTime(computeTotalGcTime() - startGCTime)
                m
              }
            }
            val taskEndReason = new ExceptionFailure(t, metrics)
            execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(taskEndReason))
    
            // Don't forcibly exit unless the exception was inherently fatal, to avoid
            // stopping other tasks unnecessarily.
            if (Utils.isFatalError(t)) {
              SparkUncaughtExceptionHandler.uncaughtException(t)
            }
    
        } finally {
          // Release memory used by this thread for shuffles
          env.shuffleMemoryManager.releaseMemoryForThisThread()
          // Release memory used by this thread for unrolling blocks
          env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
          // Release memory used by this thread for accumulators
          Accumulators.clear()
          runningTasks.remove(taskId)
        }
      }
    }
       
       

    看task.run做了什么?

       
        final def run(taskAttemptId: Long, attemptNumber: Int): T = {
      //首先构造了一个TaskContext,它维护了task的整个生命周期
      context = new TaskContextImpl(
        stageId = stageId,
        partitionId = partitionId,//分区号
        taskAttemptId = taskAttemptId,//taskId
        attemptNumber = attemptNumber,//这个task的第几次尝试,从0开始
        taskMemoryManager = taskMemoryManager,
        runningLocally = false)
      TaskContext.setTaskContext(context)//设置到threadLocal中
      context.taskMetrics.setHostname(Utils.localHostName())
      taskThread = Thread.currentThread()
      if (_killed) {
        kill(interruptThread = false)
      }
      try {
        runTask(context)//根据不同的task类型启动
      } finally {
        context.markTaskCompleted()
        TaskContext.unset()
      }
    }
       
        
    这里的runTask其实是区分shuffleMapTask和 ResultTask的。而我在之前举例的sparkPi是没有shuffle过程的,所以这里我列举一个wordcount的例子来说明shuffle的部分。
        
         
           import org.apache.spark._
    import SparkContext._
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 3 ){
          println("usage is org.test.WordCount <master> <input> <output>")
          return
        }
        val sc = new SparkContext(args(0), "WordCount",
        System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
        val textFile = sc.textFile(args(1))
        val result = textFile.flatMap(line => line.split("\s+"))
            .map(word => (word, 1)).reduceByKey(_ + _)
        result.saveAsTextFile(args(2))
      }
    }
          
        
         在sc.textFile(...).flatMap(...).map(...)之后得到的是一个
         
          MapPartitionsRDD,这个跟以前的例子是差不多的,就不介绍了。
         
        
       
        
         我们直接看
         
          reduceByKey,方法在
         
         
          PairRDDFunctions中
         
        
       
        
        
        
         /**
       * Merge the values for each key using an associative reduce function. This will also perform
       * the merging locally on each mapper before sending results to a reducer, similarly to a
       * "combiner" in MapReduce.
       */
      def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
        combineByKey[V]((v: V) => v, func, func, partitioner)
      }
    
      /**
       * Merge the values for each key using an associative reduce function. This will also perform
       * the merging locally on each mapper before sending results to a reducer, similarly to a
       * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
       */
      def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
        reduceByKey(new HashPartitioner(numPartitions), func)
      }
    
      /**
       * Merge the values for each key using an associative reduce function. This will also perform
       * the merging locally on each mapper before sending results to a reducer, similarly to a
       * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
       * parallelism level.
       */
      def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
        reduceByKey(defaultPartitioner(self), func)
      }
        
    这里有3个不同的reduceByKey方法。 我们可以手动设定reduce的个数,如果不指定的话,就可能不受控制了。
        
         
         
        
        

    如果不指定reduce个数的话,规则如下:

    1、如果自定义了分区函数partitioner的话,就按你的分区函数来走。

    2、如果没有定义分区函数而是定义了reduce个数的话,默认分区函数就是根据reduce个数生成 HashPartitioner

    3、如果这个也没设置,那就按照reduce个数是" spark.default.parallelism"或者rdd .head.partitions.size来生成 HashPartitioner

         
          这里我们的K类型是String,V类型是Int,这里的C对于这个题来说也就是Int
         
        
         
          
          
         
         
          def combineByKey[C](createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        partitioner: Partitioner,
        mapSideCombine: Boolean = true,
        serializer: Serializer = null): RDD[(K, C)] = self.withScope {
      require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
      if (keyClass.isArray) {
        if (mapSideCombine) {
          throw new SparkException("Cannot use map-side combining with array keys.")
        }
        if (partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("Default partitioner cannot partition array keys.")
        }
      }
      val aggregator = new Aggregator[K, V, C](
        self.context.clean(createCombiner),
        self.context.clean(mergeValue),
        self.context.clean(mergeCombiners))
    //如果目前RDD中的分区函数与我们设置的一样,那就根本不需要进行shuffle操作了
    //它将一个匿名函数封装成MapPartitionsRDD返回
      if (self.partitioner == Some(partitioner)) {
        self.mapPartitions(iter => {
          val context = TaskContext.get()
          new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
        }, preservesPartitioning = true)
      } else {
      //不然就产生一个ShuffledRDD返回
        new ShuffledRDD[K, V, C](self, partitioner)//这里没有传入关于依赖的参数,而之前的提到过的one-to-one依赖都是直接传入的,后面会说到
          .setSerializer(serializer)
          .setAggregator(aggregator)
          .setMapSideCombine(mapSideCombine)
      }
    }
         
    可以看到,这个reduceByKey代码看似很简单的样子。这是因为它只是一个transformation,真正发挥作用是在action触发之后,就可以体会到复杂性。
    那就看最终的 result.saveAsTextFile(args(2)),这就是一个action操作。
       
        上面说到shuffledRDD的依赖不是在new的时候传入的,那么在构建stage的时候需要根据dep来划分。其实shuffledRDD有一个重载方法
       
       
        getDependencies
       
       
       
       
        
         override def getDependencies: Seq[Dependency[_]] = {
      List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
    }
        
       
        所以在划分stage的时候就是因为这个重载方法的存在。
       
      
       
        下面直接跳到剩下的runTask的介绍,分为shuffledMapTask和
        
         ResultTask
        
       
      
       
        1、ShuffleMapTask的runTask
       
      
       
        
         override def runTask(context: TaskContext): MapStatus = {
      // Deserialize the RDD using the broadcast variable.
      val deserializeStartTime = System.currentTimeMillis()
      val ser = SparkEnv.get.closureSerializer.newInstance()
      //反序列化taskBinary成rdd和dep(注:rdd是这个stage的最后一个rdd,dep是这个stage与下一个stage相连的shuffleDependency)
      val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
        ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
      _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    
      metrics = Some(context.taskMetrics)
      var writer: ShuffleWriter[Any, Any] = null
      try {
        val manager = SparkEnv.get.shuffleManager  //默认是SortShuffleManager
        //这里的shuffleHandle是封装了shuffleId, _rdd.partitions.size和反序列出来的dep
        //这里getWriter新建一个SortShuffleWriter对象
        writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
        //看write的参数,其实是调用了rdd的compute方法,返回这个partition分区数据的一个迭代器,具体看下面介绍
        //调用write就是将数据写到本地磁盘,并将把blockManagerId和block的大小组合成一个mapStatus
        writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
        return writer.stop(success = true).get
      } catch {
        case e: Exception =>
          try {
            if (writer != null) {
              writer.stop(success = false)
            }
          } catch {
            case e: Exception =>
              log.debug("Could not stop writer", e)
          }
          throw e
      }
    }
        
         
          
           每个Stage的上边界,要么从外部存储读取数据,要么从上一个Stage的输出读取;而下边界,要么写入本地文件系统,以供下一个Stage读取,要么是ResultTask输出结果。
          
         
        
         
          
           
            上例中其实是划分了两个stage,两个stage通过shuffle依赖建立连接
           
          
         
        
         
          
           
            先看第一个stage,它反序列化出来的rdd是这个stage的最后一个rdd,即
           
          
          
           MapPartitionsRDD;Dep是与下一个stage连接的依赖,即
          
          
           shuffleDependency,这点很重要
          
         
        
         
          
          
         
         
          
           
            
             我们看调用这个rdd的compute方法发生了什么,很简单,对于这个分区中的数据依次调用传入的方法,返回一个计算过后的数据的迭代器。
            
           
          
         
          
           
            
            
           
          
          
           <span style="font-size:14px;background-color: rgb(255, 255, 255);">override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))</span>
          
    我们看用默认的sortShuffleWriter调用write方法。
          
           
            
             首先会将records写入
            
           
           
            ExternalSorter,
            
             ExternalSorter会使用一个map来存储新的计算结果。
            
           
           
            如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件。
           
           
            当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里,见
            
             writePartitionedFile
            
           
            最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件.
           
          
         
          
           
            至此,第一个stage,即从输入源到shuffle输出执行就结束了。
           
          
          
          
          
          
          
           
            
             2、ResultTask的runTask
            
           
          
           
            
            
           
           
            
             override def runTask(context: TaskContext): U = {
      // Deserialize the RDD and the func using the broadcast variables.
      val deserializeStartTime = System.currentTimeMillis()
      val ser = SparkEnv.get.closureSerializer.newInstance()
      //反序列化出finalrdd及func
      val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
        ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
      _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    
      metrics = Some(context.taskMetrics)
      //调用我们设置的func
      func(context, rdd.iterator(partition, context))
    }
            
    我们看func的参数,同理是调用rdd的compute方法。因为我们这里的rdd是经过shuffle之后产生的,所以这里是shuffledRDD,它的compute方法如下
         
          
          
         
         
          override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
        val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
        SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
          .read()
          .asInstanceOf[Iterator[(K, C)]]
      }
         
    这里的 SparkEnv. get . shuffleManager可以分为sort和hash,不管是哪种,getReader就是生成一个 HashShuffleReader
         
          看read方法
         
        
         
         
         
          
           /** Read the combined key-values for this reduce task */
    override def read(): Iterator[Product2[K, C]] = {
      val ser = Serializer.getSerializer(dep.serializer)
      //真正的从file中抓取reducer所需的内容,最终封装成InterruptibleIterator返回
      val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
    
      val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
        if (dep.mapSideCombine) {
          new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
        } else {
          new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
        }
      } else {
        require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
    
        // Convert the Product2s to pairs since this is what downstream RDDs currently expect
        iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
      }
    
      // Sort the output if there is a sort ordering defined.
      dep.keyOrdering match {
        case Some(keyOrd: Ordering[K]) =>
          // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
          // the ExternalSorter won't spill to disk.
          val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
          sorter.insertAll(aggregatedIter)
          context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
          context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
          sorter.iterator
        case None =>
          aggregatedIter
      }
    }
          
    之后真正调用我们的func方法返回结果
          
           我们知道task的run方法返回值是T,所以对于子类shuffleMapTask返回MapStatus,对于
           
            ResultTask返回调用func之后的结果。
           
          
         
          
           
            所以在Executor.scala中的run方法中,value就是run方法的返回值
           
          
         
          
           
           
          
          
           val value = try {
              task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
            } finally {
             ...
             ...
            }
          
           
            
             之后就是一开始讲到过的将返回结果序列化,并调用
            
           
            
             execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)结束整个流程。它其实是向driver发送StatusUpdate消息,包含了executorId,taskId,task的状态以及运算的序列化之后的结果
            
           
            
            
            
             case StatusUpdate(executorId, taskId, state, data) =>
      scheduler.statusUpdate(taskId, state, data.value)
      if (TaskState.isFinished(state)) {
        executorDataMap.get(executorId) match {
          case Some(executorInfo) =>
            executorInfo.freeCores += scheduler.CPUS_PER_TASK//一个任务运行完成,该Executor上相应的freeCores增加
            makeOffers(executorId)//可在该Executor上调度可以运行的任务
          case None =>
            // Ignoring the update since we don't know about the executor.
            logWarning(s"Ignored task status update ($taskId state $state) " +
              s"from unknown executor with ID $executorId")
        }
      }
            
            
             看driver端的处理。首先调用了TaskSchedulerImpl的statusUpdate方法。这个方法中根据状态区分处理,这里是FINISHED状态,且如果结果是DirectTaskResult,直接反序列化结果;如果是IndirectTaskResult,则根据反序列化之后得到的blockId去blockManager中远程读取。不管何种方式,数据取到之后,根据taskId获取tasksetId,再根据tasksetId获取tasksetManager,从而调用该tasksetManager的handleSuccessfulTask方法。该方法主要是调用DAGScheduler的taskEnded方法,向DAGScheduler事件循环发送CompletionEvent事件
            
           

      
       我们主要看ResultTask和ShuffleMapTask的处理逻辑,见注释
      
     
      
       /**
     * Responds to a task finishing. This is called inside the event loop so it assumes that it can
     * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
     */
    private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
      val task = event.task
      val stageId = task.stageId
      val taskType = Utils.getFormattedClassName(task)
    
    
      outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
        event.taskInfo.attempt, event.reason)
    
    
      // The success case is dealt with separately below, since we need to compute accumulator
      // updates before posting.
      if (event.reason != Success) {
        val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
        listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,
          event.taskInfo, event.taskMetrics))
      }
    
    
      if (!stageIdToStage.contains(task.stageId)) {
        // Skip all the actions if the stage has been cancelled.
        return
      }
    
    
      val stage = stageIdToStage(task.stageId)
      event.reason match {
        case Success =>
          listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
            event.reason, event.taskInfo, event.taskMetrics))
          stage.pendingTasks -= task
          task match {
            case rt: ResultTask[_, _] =>
              // Cast to ResultStage here because it's part of the ResultTask
              // TODO Refactor this out to a function that accepts a ResultStage
              val resultStage = stage.asInstanceOf[ResultStage]
              resultStage.resultOfJob match {
                case Some(job) =>
    	      //如果这个分区尚未被标记为已完成,处理
                  if (!job.finished(rt.outputId)) {
                    updateAccumulators(event)
                    job.finished(rt.outputId) = true//标记为已完成
                    job.numFinished += 1
                    // If the whole job has finished, remove it
    		// 最后一个stage的所有分区都完成了,即这个job运行完成了
                    if (job.numFinished == job.numPartitions) {
                      markStageAsFinished(resultStage)
                      cleanupStateForJobAndIndependentStages(job)//清理内存中的关于该job的信息
                      listenerBus.post(
                        SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
                    }
    
    
                    // taskSucceeded runs some user code that might throw an exception. Make sure
                    // we are resilient against that.
                    try {
    		  //调用listener的taskSucceeded方法,这里的listener就是提交job时的JobWaiter,见下面分析
                      job.listener.taskSucceeded(rt.outputId, event.result)
                    } catch {
                      case e: Exception =>
                        // TODO: Perhaps we want to mark the resultStage as failed?
                        job.listener.jobFailed(new SparkDriverExecutionException(e))
                    }
                  }
                case None =>
                  logInfo("Ignoring result from " + rt + " because its job has finished")
              }
    
    
            case smt: ShuffleMapTask =>
              val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
              updateAccumulators(event)
              val status = event.result.asInstanceOf[MapStatus]//shuffleMapTask的输出结果是一个MapStatus结构
              val execId = status.location.executorId
              logDebug("ShuffleMapTask finished on " + execId)
              if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
                logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
              } else {
    	    //将这个partition标记为运行完成,即添加分区号->status的映射到outputLoc的hashmap结构中
                shuffleStage.addOutputLoc(smt.partitionId, status)
              }
       	  //如果这个shuffleMapTask是该stage处理的最后一个task,表明这个stage处理结束了
              if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
                markStageAsFinished(shuffleStage)
                logInfo("looking for newly runnable stages")
                logInfo("running: " + runningStages)
                logInfo("waiting: " + waitingStages)
                logInfo("failed: " + failedStages)
    
    
                // We supply true to increment the epoch number here in case this is a
                // recomputation of the map outputs. In that case, some nodes may have cached
                // locations with holes (from when we detected the error) and will need the
                // epoch incremented to refetch them.
                // TODO: Only increment the epoch number if this is not the first time
                //       we registered these map outputs.
        	    //保存shuffleId->mapStatuses的映射
                mapOutputTracker.registerMapOutputs(
                  shuffleStage.shuffleDep.shuffleId,
                  //这里取list.head是因为同一个partition有可能会有多个task attempt在运行,每一个task attempt运行完成后就添加到list的头部
                  shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
                  changeEpoch = true)
    
    
    	    //由于已经更新了outputLoc, 所以将缓存中的clear
                clearCacheLocs()
                //如果有任何一个partition的outputLoc为空,即说明这个stage未完成,需要重新提交
                if (shuffleStage.outputLocs.contains(Nil)) {
                  // Some tasks had failed; let's resubmit this shuffleStage
                  // TODO: Lower-level scheduler should also deal with this
                  logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
                    ") because some of its tasks had failed: " +
                    shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
                        .map(_._2).mkString(", "))
                  submitStage(shuffleStage)
                } else {
                  val newlyRunnable = new ArrayBuffer[Stage]
                  for (shuffleStage <- waitingStages) {
                    logInfo("Missing parents for " + shuffleStage + ": " +
                      getMissingParentStages(shuffleStage))
                  }
    	      //准备提交下一个stage。下一个可以提交的stage的依据是它的parent stage已经都完成了
                  for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
                  {
                    newlyRunnable += shuffleStage
                  }
                  waitingStages --= newlyRunnable
                  runningStages ++= newlyRunnable
                  for {
                    shuffleStage <- newlyRunnable.sortBy(_.id)
                    jobId <- activeJobForStage(shuffleStage)
                  } {
                    logInfo("Submitting " + shuffleStage + " (" +
                      shuffleStage.rdd + "), which is now runnable")
       		//提交被选中的stage运行
                    submitMissingTasks(shuffleStage, jobId)
                  }
                }
              }
            }
         ...略掉部分case
      }
      submitWaitingStages()//这个是调度等待中的stage
    }
      
       
        override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
      if (_jobFinished) {
        throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
      }
      resultHandler(index, result.asInstanceOf[T])//调用先前设置的方法,见下面
      finishedTasks += 1
      if (finishedTasks == totalTasks) {
        _jobFinished = true
        jobResult = JobSucceeded
        this.notifyAll()//触发该job awaitResult等待完成
      }
    }
       
    resultHandler方法就是对每一个分区的结果用数组保存。之后将该数组返回。
        def runJob[T, U: ClassTag](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        allowLocal: Boolean
        ): Array[U] = {
      val results = new Array[U](partitions.size)
      runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
      results
    }
       
    至此,任务的运行就介绍结束了。