【Spark】Spark 存储原理--shuffle 过程

本篇结构:

  • Spark Shuffle 的发展
  • Spark Shuffle 中数据结构
  • Spark Shuffle 原理
  • 后记

Spark Shuffle 是 spark job 中某些算子触发的操作。当 rdd 依赖中出现宽依赖的时候,就会触发 Shuffle 操作,Shuffle 操作通常会伴随着不同 executor/host 之间数据的传输。

Shuffle 操作可能涉及的过程包括数据的排序,聚合,溢写,合并,传输,磁盘IO,网络的 IO 等等。Shuffle 是连接 MapTask 和 ReduceTask 之间的桥梁,Map 的输出到 Reduce 中须经过 Shuffle 环节,Shuffle 的性能高低直接影响了整个程序的性能和吞吐量。

通常 Shuffle 分为两部分:Map 阶段的数据准备( ShuffleMapTask )和Reduce(ShuffleReduceTask) 阶段的数据拷贝处理。一般将在 Map 端的 Shuffle 称之为 Shuffle Write,在 Reduce 端的 Shuffle 称之为 Shuffle Read。

一、Spark Shuffle 的发展

Spark Shuffle 机制总共有三种:

1.1、未优化的 HashShuffle

每一个 ShuffleMapTask 都会为每一个 ReducerTask 创建一个单独的文件,总的文件数是 M * R,其中 M 是 ShuffleMapTask 的数量,R 是 ShuffleReduceTask 的数量。

见下图(来源网络):

《【Spark】Spark 存储原理--shuffle 过程》 image

在处理大数据时,ShuffleMapTask 和 ShuffleReduceTask 的数量很多,创建的磁盘文件数量 M*R 也越多,大量的文件要写磁盘,再从磁盘读出来,不仅会占用大量的时间,而且每个磁盘文件记录的句柄都会保存在内存中(每个人大约 100k),因此也会占用很大的内存空间,频繁的打开和关闭文件,会导致频繁的GC操作,很容易出现 OOM 的情况。

也正是上述原因,该 HashShuffle 如今已退出历史舞台。

1.2、优化后 HashShuffle

在 Spark 0.8.1 版本中,引入了 Consolidation 机制,该机制是对 HashShuffle 的一种优化。

如下图(来源网络):

《【Spark】Spark 存储原理--shuffle 过程》 image

可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。

先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i’,每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。

这样,每个 worker 持有的文件数降为 cores * R。cores 代表核数,R 是 ShuffleReduceTask 数。

1.3、Sort-Based Shuffle

由于 HashShuffle 会产生很多的磁盘文件,引入 Consolidation 机制虽然在一定程度上减少了磁盘文件数量,但是不足以有效提高 Shuffle 的性能,适合中小型数据规模的大数据处理。

为了让 Spark 在更大规模的集群上更高性能处理更大规模的数据,因此在 Spark 1.1版本中,引入了 SortShuffle。

如下图(来源网络):

《【Spark】Spark 存储原理--shuffle 过程》 image

该机制每一个 ShuffleMapTask 都只创建一个文件,将所有的 ShuffleReduceTask 的输入都写入同一个文件,并且对应生成一个索引文件。

以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer 缓存所占用的内存大小,而且同时避免 GC 的风险和频率。

但对于 Rueducer 数比较少的情况,Hash Shuffle 要比 Sort Shuffle 快,因此 Sort Shuffle 有个 “fallback” 计划,对于 Reducers 数少于 “spark.shuffle.sort.bypassMergeThreshold” (200 by default),将使用 fallback 计划,hashing 相关数据到分开的文件,然后合并这些文件为一个。

二、Spark Shuffle 中数据结构

2.1、AppendOnlyMap

AppendOnlyMap 单从命名上来看,是一个只能追加元素的 Map 结构。的确,它是只支持追加的 map,可以修改某个 key 对应的 value,但是不能删除已经存在的 key。

底层是由数组结构实现的,当需要对 Key-Value 进行聚合时,会使用AppendOnlyMap 作为 buffer。在插入或者修改元素的时候,会判断是否扩容,如果达到扩容标准,将会对数组 2 倍容量进行扩容,扩容过程中原有元素并不是直接拷贝,而是进行原有元素的重新定位存储,如果集合中存在的数据量大,那么这里的操作将会耗时又耗资源。

存储级别是 Memory-Only ,在 shuffle reduce 数据不会溢写,在数据量不大的情况下可以,但是数据量大时,会极易出现OOM。

2.2、ExternalAppendOnlyMap

继承于AppendOnlyMap ,但是存储级别是 Memory and Disk,即在数据量达到一个阈值的时候,会把数据溢写到磁盘,达到释放内存空间,降低 OOM 的风险的作用。

2.3、PartitionedAppendOnlyMap

是 SortShuffleWriter 中用到的一种数据结构,在 Map 端需要聚合的时候,采用这种数据结构,这种结构也是一种 Hash Table,能够根据 Key,通过 hash(Key),把数据插入到相应的位置。

PartitionedAppendOnlyMap 支持 aggregation,它继承了 SizeTrackingAppendOnlyMap,还实现了 WritablePartitionedPairCollection 接口中的 partitionedDestructiveSortedIterator 抽象方法,在该方法中调用了AppendOnlyMap 的 destructiveSortedIterator 对底层数组进行整理和排序后获得迭代器,数据有可能不会连续存放。

2.4、PartitionedPairBuffer

底层实现和 PartitionedAppendOnlyMap 一样都是 ArrayBuffer,主要用于SortShuffleWriter 中 Map 端不采用聚合和排序时使用。

不支持 aggregation,主要功能是插入值是有顺序的,主要起缓冲作用,只有顺序插入,没有 changeValue(聚合)和 update(更新)操作,数据连续存放在尾部。

三、Spark Shuffle 原理

因为 hash based shuffle 已经退出历史舞台,所以以 spark 2.3 的 sort based shuffle 为例,看 Spark Shuffle 的原理。

Shuffle 的整个生命周期由 ShuffleManager 来管理,Spark 2.3中,唯一的支持方式为 SortShuffleManager,SortShuffleManager 中定义了 writer 和 reader 对应shuffle 的 map 和 reduce 阶段。writer 有三种运行模式:

  • BypassMergeSortShuffleWriter:当前 shuffle 没有聚合, 并且分区数小于 spark.shuffle.sort.bypassMergeThreshold(默认200)
  • UnsafeShuffleWriter:当条件不满足 BypassMergeSortShuffleWriter 时, 并且当前 rdd 的数据支持序列化(即 UnsafeRowSerializer),也不需要聚合, 分区数小于 2^24
  • SortShuffleWriter:其余

3.1、BypassMergeSortShuffleWriter

首先,BypassMergeSortShuffleWriter 的运行机制的触发条件如下:

  • shuffle reduce task(即partition)数量小于spark.shuffle.sort.bypassMergeThreshold 参数的值。
  • 没有map side aggregations。
    note: map side aggregations是指在 map 端的聚合操作,通常来说一些聚合类的算子都会都 map 端的 aggregation。不过对于 groupByKey 和combineByKey, 如果设定 mapSideCombine 为false,就不会有 map side aggregations。

图片来源网络(正确的应该有三个 ReduceTask):

《【Spark】Spark 存储原理--shuffle 过程》 image

BypassMergeSortShuffleHandle 算法适用于没有聚合,数据量不大的场景。它为 reduce 端的每个分区,创建一个 DiskBlockObjectWriter。根据 Key 判断分区索引,然后添加到对应的 DiskBlockObjectWriter,写入到对应的临时文件。

《【Spark】Spark 存储原理--shuffle 过程》 image

因为写入磁盘文件是通过 Java的 BufferedOutputStream 实现的,BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。所以图中会有内存缓冲的概念。

最后,会将所有临时文件合并成一个磁盘文件,并创建一个索引文件标识下游各个 reduce task 的数据在文件中的 start offset与 end offset。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一样的,也会创建很多的临时文件(所以触发条件中会有 reduce task 数量限制),只是在最后会做一个磁盘文件的合并,对于 shuffle reader 会更友好一些。

public void write(Iterator<Product2<K, V>> records) throws IOException {
  assert (partitionWriters == null);
  if (!records.hasNext()) {
    partitionLengths = new long[numPartitions];
    shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
    return;
  }
  final SerializerInstance serInstance = serializer.newInstance();
  final long openStartTime = System.nanoTime();
  // DiskBlockObjectWriter 数组,索引是 reduce 端的分区索引
  partitionWriters = new DiskBlockObjectWriter[numPartitions];
  // FileSegment数组,索引是 reduce 端的分区索引
  partitionWriterSegments = new FileSegment[numPartitions];
  // 为每个 reduce 端的分区,创建临时 Block 和文件
  for (int i = 0; i < numPartitions; i++) {
    final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
      blockManager.diskBlockManager().createTempShuffleBlock();
    final File file = tempShuffleBlockIdPlusFile._2();
    final BlockId blockId = tempShuffleBlockIdPlusFile._1();
    partitionWriters[i] =
      blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
  }
  // Creating the file to write to and creating a disk writer both involve interacting with
  // the disk, and can take a long time in aggregate when we open many files, so should be
  // included in the shuffle write time.
  writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

  // 遍历数据,根据key找到分区索引,存到对应的文件中
  while (records.hasNext()) {
    final Product2<K, V> record = records.next();
    // 获取数据的key
    final K key = record._1();
 // 根据reduce端的分区器,判断该条数据应该存在reduce端的哪个分区
 // 并且通过DiskBlockObjectWriter,存到对应的文件中
  partitionWriters[partitioner.getPartition(key)].write(key, record._2());
  }

  for (int i = 0; i < numPartitions; i++) {
    final DiskBlockObjectWriter writer = partitionWriters[i];
    // 调用DiskBlockObjectWriter的commitAndGet方法,获取FileSegment,包含写入的数据信息
    partitionWriterSegments[i] = writer.commitAndGet();
    writer.close();
  }

  // 获取最终结果的文件名
  File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
  // 根据output文件名,生成临时文件。临时文件的名称只是在output文件名后面添加了一个uuid
  File tmp = Utils.tempFileWith(output);
  try {
    // 将所有的文件都合并到tmp文件中,返回每个数据段的长度
    partitionLengths = writePartitionedFile(tmp);
    // 这里writeIndexFileAndCommit会将tmp文件重命名,并且会创建索引文件。
    shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
  } finally {
    if (tmp.exists() && !tmp.delete()) {
      logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
    }
  }
  mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

BypassMergeSortShuffleWriter 所有的中间数据都是在磁盘里,并没有利用内存。而且它只保证分区索引的排序,而并不保证数据的排序。

3.2、SortShuffleWriter

图片来源网络:

《【Spark】Spark 存储原理--shuffle 过程》 image

该模式下,数据首先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。有些 shuffle 操作涉及到聚合,对于这种需要聚合的操作,使用 PartitionedAppendOnlyMap 来排序。对于不需要聚合的,则使用 PartitionedPairBuffer 排序。

在进行 shuffle 之前,map 端会先将数据进行排序。排序的规则,根据不同的场景,会分为两种。首先会根据 Key 将元素分成不同的 partition。第一种只需要保证元素的 partitionId 排序,但不会保证同一个 partitionId 的内部排序。第二种是既保证元素的 partitionId 排序,也会保证同一个 partitionId 的内部排序

接着,往内存写入数据,每隔一段时间,当向 MemoryManager 申请不到足够的内存时,或者数据量超过 spark.shuffle.spill.numElementsForceSpillThreshold 这个阈值时 (默认是 Long 的最大值,不起作用),就会进行 Spill 内存数据到文件,然后清空内存数据结构。假设可以源源不断的申请到内存,那么 Write 阶段的所有数据将一直保存在内存中,由此可见,PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比较吃内存的。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件也是通过 Java 的 BufferedOutputStream 实现的。

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。在将最终排序结果写入到数据文件之前,需要将内存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已经 spill 到磁盘的 SpillFiles 进行合并。

此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

BypassMergeSortShuffleWriter 与该机制相比:

第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用 BypassMerge 机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销,当然需要满足那两个触发条件。

override def write(records: Iterator[Product2[K, V]]): Unit = {
  // 根据是否在map端进行数据合并初始化 ExternalSorter
  sorter = if (dep.mapSideCombine) {
    require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
    new ExternalSorter[K, V, C](
      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  } else {
    // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
    // care whether the keys get sorted in each partition; that will be done on the reduce side
    // if the operation being run is sortByKey.
    // 不进行聚合,也不进行排序,reduce端再进行排序,只会根据 key 值获取对应的分区 id,来划分数据,不会在分区内排序,如果结果需要排序,例如sortByKey,会在 reduce 端获取 shuffle 数据后进行
    new ExternalSorter[K, V, V](
      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  }
  sorter.insertAll(records)

  // Don't bother including the time to open the merged output file in the shuffle write time,
  // because it just opens a single file, so is typically too fast to measure accurately
  // (see SPARK-3570).
  // shuffle输出文件
  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  val tmp = Utils.tempFileWith(output)
  try {
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    // sorter 中的数据写出到该文件中   
    val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
  // 写出对应的index文件,纪录每个Partition对应的偏移量 
  shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
    // shuffleWriter的返回结果  
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  } finally {
    if (tmp.exists() && !tmp.delete()) {
      logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
    }
  }
}

3.3、UnsafeShuffleWriter

触发条件有三个:

  • 原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要 Serializer 支持 relocation,在指定位置读取对应数据。 KryoSerializer 和 spark sql 自定义的序列化器 支持这个特性。

  • 没有指定 aggregation 或者key排序, 因为 key 没有编码到排序指针中,所以只有 partition 级别的排序。

  • 分区数目必须小于 16777216 ,因为 partition number 使用24bit 表示的。

下面内容来自 Spark ShuffleWriter 原理

UnsafeShuffleWriter 首先将数据序列化,保存在 MemoryBlock 中。然后将该数据的地址和对应的分区索引,保存在 ShuffleInMemorySorter 内存中,利用ShuffleInMemorySorter 根据分区排序。当内存不足时,会触发 spill 操作,生成spill 文件。最后会将所有的 spill文 件合并在同一个文件里。

整个过程可以想象成归并排序。ShuffleExternalSorter 负责分片的读取数据到内存,然后利用 ShuffleInMemorySorter 进行排序。排序之后会将结果存储到磁盘文件中。这样就会有很多个已排序的文件, UnsafeShuffleWriter 会将所有的文件合并。

下图来自 Spark ShuffleWriter 原理,表示了map端一个分区的shuffle过程:

《【Spark】Spark 存储原理--shuffle 过程》

UnsafeShuffleWriter 是对 SortShuffleWriter 的优化,大体上也和 SortShuffleWriter 差不多。从内存使用角度看,主要差异在以下两点:

  • 一方面,在 SortShuffleWriter 的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 中,存储的是键值或者值的具体类型,也就是 Java 对象,是反序列化过后的数据。而在 UnsafeShuffleWriter 的 ShuffleExternalSorter 中数据是序列化以后存储到实际的 Page 中,而且在写入数据过程中会额外写入长度信息。总体而言,序列化以后数据大小是远远小于序列化之前的数据。

  • 另一方面,UnsafeShuffleWriter 中需要额外的存储记录(LongArray),它保存着分区信息和实际指向序列化后数据的指针(经过编码的Page num 以及 Offset)。相对于 SortShuffleWriter, UnsafeShuffleWriter 中这部分存储的开销是额外的。

四、后记

这篇文章东拼西凑,大多不是自己的东西,惭愧。

详见:

https://zhangchenchen.github.io/2018/09/26/deep-in-spark-shuffle/

Spark ShuffleWriter 原理

    原文作者:w1992wishes
    原文地址: https://www.jianshu.com/p/286173f03a0b
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞