博客地址: http://blog.csdn.net/yueqian_zhu/


    shuffle的读流程也是从compute方法开始的

      override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
        val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
        SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
          .read()
          .asInstanceOf[Iterator[(K, C)]]
      }
     

    目前来说,不管是sortShuffleManager还是hashShuffleManager,getReader方法返回的都是HashShuffleReader。

    接着调用read方法,如下:

      /** Read the combined key-values for this reduce task */
      override def read(): Iterator[Product2[K, C]] = {
        val ser = Serializer.getSerializer(dep.serializer)
        val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
    
        val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
          if (dep.mapSideCombine) {
            new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
          } else {
            new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
          }
        } else {
          require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
    
          // Convert the Product2s to pairs since this is what downstream RDDs currently expect
          iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
        }
    
        // Sort the output if there is a sort ordering defined.
        dep.keyOrdering match {
          case Some(keyOrd: Ordering[K]) =>
            // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
            // the ExternalSorter won't spill to disk.
            val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
            sorter.insertAll(aggregatedIter)
            context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
            context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
            sorter.iterator
          case None =>
            aggregatedIter
        }
      }
     

    该方法首先调用了fetch方法,介绍一下

    1、在task运行那节介绍过,shuffleMapTask运行完成后,会将shuffleId及mapstatus的映射注册到mapOutputTracker中

    2、fetch方法首先尝试在本地mapstatuses中查找是否有该shuffleId的信息,有则本地取;否则想master的mapOutputTracker请求并读取,返回块管理器的地址和对应partition的文件长度

    3、然后根据我们得到的shuffleId等信息去remote或者local通过netty/nio读取,返回一个迭代器

    4、返回的迭代器中的数据并不是全部在内存中的,读取时会根据配置的内存最大值来读取。内存不够的话,下一个待读取


    fetch方法返回一个迭代器后,根据是否mapSideCombine来区分时候需要将读取到的数据进行合并操作。合并过程与写流程类似,内存放不下就写入本地磁盘。

    如果还需要keyOrdering的,new一个ExternalSorter进行外部排序。之后也是同shuffle写流程的insertAll。