Spark shuffle writer源码解析

Shuffle分类

一个作业经过spark的DAGSchedule调度器划分为多个stage,同时有些下游的stage依赖上游的stage,这样会导致上游的stage做map的工作,下游的stage做reduce的工作。而上下游stage就是通过shuffle连接在一起的。shuffle分为shuffleWriter和ShulleReader,writer即为上游的stage,reader为下游的stage。
  在这里我们首先介绍一下ShuffleWriter的分类。如下:
  1.BypassMergeSortShuffleWriter。和Hash Shuffle的实现基本相同,区别在于map task输出汇总一个文件,同时还会产生一个index file。
  2.UnsafeShuffleWriter。使用Java Unsafe直接操作内存,避免Java对象多余的开销和GC 延迟,效率高。
  3.SortShullfleWriter。sortShulleWriter和shuffle不同的在于map端的sort。
  以上三种shuffleWriter都具有各自的应用场景。分别如下:
  1.没有map端聚合操作且RDD的partition分区数小于200个,使用BypassMergerSortShuffleWriter。
  2.没有map端聚合,RDD的partitions分区数小于16777216且Serializer支持relocation,使用UnsafeShuffleWriter。
  3.(1)和(2)不满足使用SortShuffle
上面介绍了ShullfleWriter的分类,下面将详细介绍SortShuffleWriter的具体工作原理。

SortShuffleWriter工作原理

预备知识

org.apache.spark.util.collection.AppendOnlyMap
org.apache.spark.util.collection.PartitionedPairBuffer
org.apache.spark.util.collection.TimSorter
org.apache.spark.shuffle.sort.SortShuffleWriter
  其中,AppendOnlyMap和PartitionedPairBuffer是shuffle过程中使用的两个数据结构,但是底层都是基于Array实现的。当需要map端的combine或者sort时,使用的是AppendOnlyMap结构;否则,使用的是PartitionedPairBuffer结构。而TimSorter主要是封装了归并排序,用于内存中的数据排序。
  首先我们看一下SortShullfleWriter源码,如下

/** Write a bunch of records to this task's output */
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
    #需要map端的聚合
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
        #不需要map端的聚合或者排序
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    sorter.insertAll(records)
    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
      }
    }
  }

接下来看一下ExternalSorter类,具体在org.apache.spark.util.collection.ExternalSorter。下面转到了ExternalSort的insert方法中,该方法做了shuffleWriter的绝大部分工作 。我们先看一下insert的源代码,然后介绍该方法所做的主要工作。

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined
    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
     #判断是否会发生溢出
        maybeSpillCollection(usingMap = true)
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
       #判断是否会发生溢出
        maybeSpillCollection(usingMap = false)
      }
    }
  }

具体的工作如下:
(1)根据是否需要map端做shouldCombine,决定使用map还是buffer。由于使用的是map,在进行join的过程中,不停地遍历map,当发现相同的时候,直接更新map的value即可。在这里会存在一个问题,就是假如key存在不同的disk的文件上,该如何,这时候当调用Iterator时,会返回一个基于内存和disk的多路归并迭代器。这个迭代器,每次在调用next 方法的时候是基于优先级队列,也就是每个迭代器最小hash值作为比较对象的堆结构,寻找最小的hash值且key值相等的所有元素(因为我们每个map 都是排序过的,所以这总能实现),进行merge,将所有符合要求的元素merge完成后返回。这样便完成了最终的聚合操作。
(2)判断是否为溢出
那么,接下来的重点是看缓存是否会发生溢出,主要在maybeSpillCollection方法中。下面是该方法的源码。

 /**
   * Spill the current in-memory collection to disk if needed.
   *
   * @param usingMap whether we're using a map or buffer as our current in-memory collection
   */
  private def maybeSpillCollection(usingMap: Boolean): Unit = {
    var estimatedSize = 0L
    if (usingMap) {
      estimatedSize = map.estimateSize()
      if (maybeSpill(map, estimatedSize)) {
        map = new PartitionedAppendOnlyMap[K, C]
      }
    } else {
      estimatedSize = buffer.estimateSize()
      if (maybeSpill(buffer, estimatedSize)) {
        buffer = new PartitionedPairBuffer[K, C]
      }
    }

    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize
    }
  }

在该方法中首先需要评估一下是否会发生OOM。由于PartitionedAppendOnlyMap或者PartitionedPairBuffer都继承了SizeTracker类,所以不管是buffer还是map都是使用了SizeTracker中的estimateSize方法。在处理每条record时,都会进行一次estimateSize,评估出已经使用了的缓存大小。下面是该方法

/**
   * Estimate the current size of the collection in bytes. O(1) time.
   */
  def estimateSize(): Long = {
    assert(samples.nonEmpty)
    val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
    (samples.last.size + extrapolatedDelta).toLong
  }

其中bytesPerUpdate是两次采样之间每次update这个集合,size增长了多少。也就是说每操作集合一下,集合的size增长了多少。

bytesPerUpdate=(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)

然后以bytePerUpdate作为最近平均每次更新时的bytePerUpdate,用当前的update次数减去最后一个Sample的update次数,然后乘以bytePerUpdate,结果加上最后一个Sample记录的大小。

val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
    (samples.last.size + extrapolatedDelta).toLong

我们回到maybeSpillCollection方法中,当评估出map或者buffer的大小以后,直接调用方法maybeSpill,判断是否发生溢写操作。该方法主要存在org.apache.spark.util.collection.Spillable类中,方法如下

/**
   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
   * memory before spilling.
   *
   * @param collection collection to spill to disk
   * @param currentMemory estimated size of the collection in bytes
   * @return true if `collection` was spilled to disk; false otherwise
   */
  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }

首先是传入这个集合以及该集合的使用大小,在这个方法中,我们每32个记录判断一次。当当前的集合大小大于阈值时,需要重新扩大能存的大小,同时把集合中的数据写入磁盘上。
  接下来是调用ExternalSort的spill方法,该方法的主要作用是集合写到磁盘,并且把添加到一个spills数组中。

/**
   * Spill our in-memory collection to a sorted file that we can merge later.
   * We add this file into `spilledFiles` to find it later.
   *
   * @param collection whichever collection we're using (map or buffer)
   */
  override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    spills += spillFile
  }

接下来就是meger这些磁盘文件了,在这里我们把文件记录在spills数组中了。

merge过程

当我们请求一个iterator或者文件时,会将所有的SpilledFile和在内存当中未进行溢写的数据进行合并。
  从上面可以看出接下来执行的是ExternalSort的writePartitionedFile方法,其主要作用就是把数据写入到磁盘,并返回一个索引。

/**
   * Write all the data added into this ExternalSorter into a file in the disk store. This is
   * called by the SortShuffleWriter.
   *
   * @param blockId block ID to write to. The index file will be blockId.name + ".index".
   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
   */
  def writePartitionedFile(
      blockId: BlockId,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)
    val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
      context.taskMetrics().shuffleWriteMetrics)
      #所有的数据都存储在内存中,直接写入文件
    if (spills.isEmpty) {
      // Case where we only have in-memory data
      val collection = if (aggregator.isDefined) map else buffer
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
      while (it.hasNext) {
        val partitionId = it.nextPartition()
        while (it.hasNext && it.nextPartition() == partitionId) {
          it.writeNext(writer)
        }
        val segment = writer.commitAndGet()
        lengths(partitionId) = segment.length
      }
    } else {
      // We must perform merge-sort; get an iterator by partition and write everything directly.
    # 当发生了溢写操作
      for ((id, elements) <- this.partitionedIterator) {
        if (elements.hasNext) {
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          }
          val segment = writer.commitAndGet()
          lengths(id) = segment.length
        }
      }
    }

    writer.close()
    context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
    context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
    context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)

    lengths
  }

当发生了溢写操作,这时候需要合并多个小文件,这时候主要是通过调用了partitionedIterator方法,在partitionedIterator主要调用了是merge方法,下面看一下merge方法。

/**
   * Merge a sequence of sorted files, giving an iterator over partitions and then over elements
   * inside each partition. This can be used to either write out a new file or return data to
   * the user.
   *
   * Returns an iterator over all the data written to this object, grouped by partition. For each
   * partition we then have an iterator over its contents, and these are expected to be accessed
   * in order (you can't "skip ahead" to one partition without reading the previous one).
   * Guaranteed to return a key-value pair for each partition, in order of partition ID.
   */
  private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
     #每一个spilledFile创建一个reader
    val readers = spills.map(new SpillReader(_))
    val inMemBuffered = inMemory.buffered
    (0 until numPartitions).iterator.map { p =>
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)
        #合并内存与spill文件
      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
      if (aggregator.isDefined) {
        ##聚合
        // Perform partial aggregation across partitions
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      } else if (ordering.isDefined) {
      #排序
        // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
        // sort the elements without trying to merge them
        (p, mergeSort(iterators, ordering.get))
      } else {
        (p, iterators.iterator.flatten)
      }
    }
  }

该方法将属于同一个reduce端的partition的内存数据和spill文件数据合并起来,再进行聚合排序(有需要的话),最后返回(reduce对应的partitionId,该分区数据迭代器)。将数据merge-sort后写入最终的文件后,需要将每个partition的偏移量持久化到文件以供后续每个reduce根据偏移量获取自己的数据,写偏移量的逻辑很简单,就是根据前面得到的partition长度的数组将偏移量写到index文件中。接下来看一下mergeSort方法。

 /**
   * Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys.
   */
  private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K])
      : Iterator[Product2[K, C]] =
  {
    val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
    type Iter = BufferedIterator[Product2[K, C]]
    val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
      // Use the reverse of comparator.compare because PriorityQueue dequeues the max
      override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)
    })
    heap.enqueue(bufferedIters: _*)  // Will contain only the iterators with hasNext = true
    new Iterator[Product2[K, C]] {
      override def hasNext: Boolean = !heap.isEmpty

      override def next(): Product2[K, C] = {
        if (!hasNext) {
          throw new NoSuchElementException
        }
        val firstBuf = heap.dequeue()
        val firstPair = firstBuf.next()
        if (firstBuf.hasNext) {
          heap.enqueue(firstBuf)
        }
        firstPair
      }
    }
  }

该方法主要是利用了java collection的优先队列,这样每次遍历的时候都是取最小或者最大的一个,这样就可以把spill文件按照(partitionid,key)排序了,最后生成一个文件,同时会生成一个索引文件,索引文件是由上面的writePartitionedFile方法生成的。
   下一节,我们将介绍基于sort shuffle的shufflereader。

    原文作者:幸福的小棉袄
    原文地址: https://www.jianshu.com/p/95308731ff08
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞