Shuffle分类
一个作业经过spark的DAGSchedule调度器划分为多个stage,同时有些下游的stage依赖上游的stage,这样会导致上游的stage做map的工作,下游的stage做reduce的工作。而上下游stage就是通过shuffle连接在一起的。shuffle分为shuffleWriter和ShulleReader,writer即为上游的stage,reader为下游的stage。
在这里我们首先介绍一下ShuffleWriter的分类。如下:
1.BypassMergeSortShuffleWriter。和Hash Shuffle的实现基本相同,区别在于map task输出汇总一个文件,同时还会产生一个index file。
2.UnsafeShuffleWriter。使用Java Unsafe直接操作内存,避免Java对象多余的开销和GC 延迟,效率高。
3.SortShullfleWriter。sortShulleWriter和shuffle不同的在于map端的sort。
以上三种shuffleWriter都具有各自的应用场景。分别如下:
1.没有map端聚合操作且RDD的partition分区数小于200个,使用BypassMergerSortShuffleWriter。
2.没有map端聚合,RDD的partitions分区数小于16777216且Serializer支持relocation,使用UnsafeShuffleWriter。
3.(1)和(2)不满足使用SortShuffle
上面介绍了ShullfleWriter的分类,下面将详细介绍SortShuffleWriter的具体工作原理。
SortShuffleWriter工作原理
预备知识
org.apache.spark.util.collection.AppendOnlyMap
org.apache.spark.util.collection.PartitionedPairBuffer
org.apache.spark.util.collection.TimSorter
org.apache.spark.shuffle.sort.SortShuffleWriter
其中,AppendOnlyMap和PartitionedPairBuffer是shuffle过程中使用的两个数据结构,但是底层都是基于Array实现的。当需要map端的combine或者sort时,使用的是AppendOnlyMap结构;否则,使用的是PartitionedPairBuffer结构。而TimSorter主要是封装了归并排序,用于内存中的数据排序。
首先我们看一下SortShullfleWriter源码,如下
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
#需要map端的聚合
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
#不需要map端的聚合或者排序
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
接下来看一下ExternalSorter类,具体在org.apache.spark.util.collection.ExternalSorter。下面转到了ExternalSort的insert方法中,该方法做了shuffleWriter的绝大部分工作 。我们先看一下insert的源代码,然后介绍该方法所做的主要工作。
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
#判断是否会发生溢出
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
#判断是否会发生溢出
maybeSpillCollection(usingMap = false)
}
}
}
具体的工作如下:
(1)根据是否需要map端做shouldCombine,决定使用map还是buffer。由于使用的是map,在进行join的过程中,不停地遍历map,当发现相同的时候,直接更新map的value即可。在这里会存在一个问题,就是假如key存在不同的disk的文件上,该如何,这时候当调用Iterator时,会返回一个基于内存和disk的多路归并迭代器。这个迭代器,每次在调用next 方法的时候是基于优先级队列,也就是每个迭代器最小hash值作为比较对象的堆结构,寻找最小的hash值且key值相等的所有元素(因为我们每个map 都是排序过的,所以这总能实现),进行merge,将所有符合要求的元素merge完成后返回。这样便完成了最终的聚合操作。
(2)判断是否为溢出
那么,接下来的重点是看缓存是否会发生溢出,主要在maybeSpillCollection方法中。下面是该方法的源码。
/**
* Spill the current in-memory collection to disk if needed.
*
* @param usingMap whether we're using a map or buffer as our current in-memory collection
*/
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
在该方法中首先需要评估一下是否会发生OOM。由于PartitionedAppendOnlyMap或者PartitionedPairBuffer都继承了SizeTracker类,所以不管是buffer还是map都是使用了SizeTracker中的estimateSize方法。在处理每条record时,都会进行一次estimateSize,评估出已经使用了的缓存大小。下面是该方法
/**
* Estimate the current size of the collection in bytes. O(1) time.
*/
def estimateSize(): Long = {
assert(samples.nonEmpty)
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
(samples.last.size + extrapolatedDelta).toLong
}
其中bytesPerUpdate是两次采样之间每次update这个集合,size增长了多少。也就是说每操作集合一下,集合的size增长了多少。
bytesPerUpdate=(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
然后以bytePerUpdate作为最近平均每次更新时的bytePerUpdate,用当前的update次数减去最后一个Sample的update次数,然后乘以bytePerUpdate,结果加上最后一个Sample记录的大小。
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
(samples.last.size + extrapolatedDelta).toLong
我们回到maybeSpillCollection方法中,当评估出map或者buffer的大小以后,直接调用方法maybeSpill,判断是否发生溢写操作。该方法主要存在org.apache.spark.util.collection.Spillable类中,方法如下
/**
* Spills the current in-memory collection to disk if needed. Attempts to acquire more
* memory before spilling.
*
* @param collection collection to spill to disk
* @param currentMemory estimated size of the collection in bytes
* @return true if `collection` was spilled to disk; false otherwise
*/
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
首先是传入这个集合以及该集合的使用大小,在这个方法中,我们每32个记录判断一次。当当前的集合大小大于阈值时,需要重新扩大能存的大小,同时把集合中的数据写入磁盘上。
接下来是调用ExternalSort的spill方法,该方法的主要作用是集合写到磁盘,并且把添加到一个spills数组中。
/**
* Spill our in-memory collection to a sorted file that we can merge later.
* We add this file into `spilledFiles` to find it later.
*
* @param collection whichever collection we're using (map or buffer)
*/
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
}
接下来就是meger这些磁盘文件了,在这里我们把文件记录在spills数组中了。
merge过程
当我们请求一个iterator或者文件时,会将所有的SpilledFile和在内存当中未进行溢写的数据进行合并。
从上面可以看出接下来执行的是ExternalSort的writePartitionedFile方法,其主要作用就是把数据写入到磁盘,并返回一个索引。
/**
* Write all the data added into this ExternalSorter into a file in the disk store. This is
* called by the SortShuffleWriter.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics)
#所有的数据都存储在内存中,直接写入文件
if (spills.isEmpty) {
// Case where we only have in-memory data
val collection = if (aggregator.isDefined) map else buffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
val segment = writer.commitAndGet()
lengths(partitionId) = segment.length
}
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
# 当发生了溢写操作
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
val segment = writer.commitAndGet()
lengths(id) = segment.length
}
}
}
writer.close()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}
当发生了溢写操作,这时候需要合并多个小文件,这时候主要是通过调用了partitionedIterator方法,在partitionedIterator主要调用了是merge方法,下面看一下merge方法。
/**
* Merge a sequence of sorted files, giving an iterator over partitions and then over elements
* inside each partition. This can be used to either write out a new file or return data to
* the user.
*
* Returns an iterator over all the data written to this object, grouped by partition. For each
* partition we then have an iterator over its contents, and these are expected to be accessed
* in order (you can't "skip ahead" to one partition without reading the previous one).
* Guaranteed to return a key-value pair for each partition, in order of partition ID.
*/
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
#每一个spilledFile创建一个reader
val readers = spills.map(new SpillReader(_))
val inMemBuffered = inMemory.buffered
(0 until numPartitions).iterator.map { p =>
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
#合并内存与spill文件
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
if (aggregator.isDefined) {
##聚合
// Perform partial aggregation across partitions
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
} else if (ordering.isDefined) {
#排序
// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
// sort the elements without trying to merge them
(p, mergeSort(iterators, ordering.get))
} else {
(p, iterators.iterator.flatten)
}
}
}
该方法将属于同一个reduce端的partition的内存数据和spill文件数据合并起来,再进行聚合排序(有需要的话),最后返回(reduce对应的partitionId,该分区数据迭代器)。将数据merge-sort后写入最终的文件后,需要将每个partition的偏移量持久化到文件以供后续每个reduce根据偏移量获取自己的数据,写偏移量的逻辑很简单,就是根据前面得到的partition长度的数组将偏移量写到index文件中。接下来看一下mergeSort方法。
/**
* Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys.
*/
private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K])
: Iterator[Product2[K, C]] =
{
val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
type Iter = BufferedIterator[Product2[K, C]]
val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
// Use the reverse of comparator.compare because PriorityQueue dequeues the max
override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)
})
heap.enqueue(bufferedIters: _*) // Will contain only the iterators with hasNext = true
new Iterator[Product2[K, C]] {
override def hasNext: Boolean = !heap.isEmpty
override def next(): Product2[K, C] = {
if (!hasNext) {
throw new NoSuchElementException
}
val firstBuf = heap.dequeue()
val firstPair = firstBuf.next()
if (firstBuf.hasNext) {
heap.enqueue(firstBuf)
}
firstPair
}
}
}
该方法主要是利用了java collection的优先队列,这样每次遍历的时候都是取最小或者最大的一个,这样就可以把spill文件按照(partitionid,key)排序了,最后生成一个文件,同时会生成一个索引文件,索引文件是由上面的writePartitionedFile方法生成的。
下一节,我们将介绍基于sort shuffle的shufflereader。