Spark RDD的基本特征以及源码解析

RDD是什么

RDD(Resilient Distributed Datasets)可扩展的弹性分布式数据集,rdd是spark最基本的数据抽象,是整个spark生态的基石。rdd表示一个只读、分区且不变的数据集合。一个rdd可以有多个分区,所谓的分区就是表示可以把一个rdd上的数据划分成不同的几个相互独立的子集。显而易见的好处就是可以在rdd上并行化。

RDD的属性

因为RDD是一个抽象类,具体位置在org.apache.spark.rdd包下,同时我们可以发现该包下存在多种rdd。比如从hdfs上生成HadoopRDD;从Mysql上生成的JdbcRDD;以及从一个集合生成的ParallelCollectionRDD。总之,rdd具有多样性。下面将着重介绍rdd的六个重要属性:

分区性

所谓分区性,指得是rdd上的数据可以可以划分为几个独立的子集。显而易见带来的好处是并行化,使得基于rdd模型的执行时间明显缩短。比如存在两个rdd,分别是rdd1,rdd2,第一个rdd有10个分区,第二个rdd只有一个分区,假设两个rdd上的数据相同且计算资源充足。理论上第一个rdd执行的时间明显小于第二个rdd,因为第一个rdd可以10个分区并行执行。
接下来,我们通过源码级别的看一下分区性在类RDD中的体现。在org.apache.spark.rdd.RDD类中,protected def getPartitions: Array[Partition] 表示获取rdd的分区,通过这个函数就可以把一个rdd上的数据进行分区并返回一个数组。由于Partition是一个trait,下面是Partition的源码

package org.apache.spark

/**
 * An identifier for a partition in an RDD.
 */
trait Partition extends Serializable {
  /**
   * Get the partition's index within its parent RDD
   */
  def index: Int

  // A better default implementation of HashCode
  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)
}

为了阐述问题,我们使用HadoopRDD作为demon,HadoopRDD中包含了三个类,分别是
HadoopPartition
HadoopRDD
HadoopRDD(object)
其中HadoopPartition继承了Partition,源码如下

private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit)
  extends Partition {

  val inputSplit = new SerializableWritable[InputSplit](s)

  override def hashCode(): Int = 31 * (31 + rddId) + index

  override def equals(other: Any): Boolean = super.equals(other)

  /**
   * Get any environment variables that should be added to the users environment when running pipes
   * @return a Map with the environment variables and corresponding values, it could be empty
   */
  def getPipeEnvVars(): Map[String, String] = {
    val envVars: Map[String, String] = if (inputSplit.value.isInstanceOf[FileSplit]) {
      val is: FileSplit = inputSplit.value.asInstanceOf[FileSplit]
      // map_input_file is deprecated in favor of mapreduce_map_input_file but set both
      // since it's not removed yet
      Map("map_input_file" -> is.getPath().toString(),
        "mapreduce_map_input_file" -> is.getPath().toString())
    } else {
      Map()
    }
    envVars
  }
}

HadoopRDD在构建分区时,主要是借助hadoop的InputSplit类构建的,也就是说一个InputSplit是一个分区;HadoopRDD的本地性也是借助了InputSplit这个类的实现的,这个类会记录相应数据与ip之间的映射
接下来,我们看一下HadoopRDD中的getPartition方法的实现。

override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    val inputFormat = getInputFormat(jobConf)
    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    }
    array
  }

可知,HadoopRDD在处理hdfs相关的数据时,主要是借助了Hadoop中已有的InputFormat、InputSplit类。数据在hdfs上存储的时候,每个split的大小已经确定了,所以HadoopRDD在划分分区的时候直接使用每个split上的数据作为一个独立的partition

函数性

该属性我们可以理解为每个都会有一个函数。换句话我们的逻辑就是由这些函数组成的;在这里我们强调一下,由于RDD是天生不可变性,父RDD正是通过该属性生成子RDD。接下来展示的将是该属性在源码级别的体现(仍然使用HadoopRdd作为说明),函数性主要是指RDD类中的
def compute(split: Partition, context: TaskContext): Iterator[T]
该函数输入是Partition,输出是一个迭代器。下面是HadoopRdd类中对该函数的具体实现(只展示了部分核心代码)

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {
       Partition转化为HadoopPartition
      private val split = theSplit.asInstanceOf[HadoopPartition]
      logInfo("Input split: " + split.inputSplit)
      private val jobConf = getJobConf()

      private val inputMetrics = context.taskMetrics().inputMetrics
      private val existingBytesRead = inputMetrics.bytesRead

      // Sets the thread local variable for the file's name
      split.inputSplit.value match {
        case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
        case _ => InputFileNameHolder.unsetInputFileName()
      }

      // Find a function that will return the FileSystem bytes read by this thread. Do this before
      // creating RecordReader, because RecordReader's constructor might read some bytes
      private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
        case _: FileSplit | _: CombineFileSplit =>
          SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
        case _ => None
      }

      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
      // If we do a coalesce, however, we are likely to compute multiple partitions in the same
      // task and in the same thread, in which case we need to avoid override values written by
      // previous partitions (SPARK-13071).
      private def updateBytesRead(): Unit = {
        getBytesReadCallback.foreach { getBytesRead =>
          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
        }
      }

     通过InputSplit获取一个Reader
      private var reader: RecordReader[K, V] = null
      private val inputFormat = getInputFormat(jobConf)
      HadoopRDD.addLocalConfiguration(
        new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
        context.stageId, theSplit.index, context.attemptNumber, jobConf)

      reader =
        try {
          inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
        } catch {
          case e: IOException if ignoreCorruptFiles =>
            logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
            finished = true
            null
        }
      // Register an on-task-completion callback to close the input stream.
      context.addTaskCompletionListener{ context => closeIfNeeded() }
      private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
      private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
      
       重写Iterator中的getNext方法
      override def getNext(): (K, V) = {
        try {
          finished = !reader.next(key, value)
        } catch {
          case e: IOException if ignoreCorruptFiles =>
            logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
            finished = true
        }
        if (!finished) {
          inputMetrics.incRecordsRead(1)
        }
        if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
          updateBytesRead()
        }
        (key, value)
      }
    }
    new InterruptibleIterator[(K, V)](context, iter)
  }

通过上面的源码,可以得出两个点:1.Partition转化为HadoopPartition,并且通过InputSplit创建RecordReader;2.重写Iterator中的两个方法,getNext和close,通过创建的reader读取分片的数据。

依懒性

首先介绍一下rdd依赖的概念:rdds通过transformation算子之间的操作,转化得到新的rdd,新的rdd包含了从其它rdd衍生的必需信息,我们称之为依赖。众所周知,rdd天生具有不变性,每次Transformation都会产生一个新的rdd,那么在这里会有一个问题。即为什么RDD设计为具有依赖性以及他的好处?我认为主要体现为以下两点:
1.生具有容错性。很多书籍上都会介绍该特性,而spark正是利用了这个特性用做数据容灾。当一个rdd数据丢失时,由于spark保存了rdd之间的依赖关系,那么就会利用该性能重新生成丢失的rdd。
2.调度性能更优。窄依赖可以以pipeline形式执行多条命令,同时多个之间可以并行化执行。
rdd之间的依赖性主要分为以下两大类,分别是
1.窄依赖:每一个parent rdd的partition最多被子rdd的一个partition使用。常见的窄依赖操作算子有map,filter,union等
2.宽依赖:多个子rdd的partition会使用同一个parent的partition。常见的宽依赖算子有groupByKey,reduceByKey,join,distinct,repartition等等
在这里很多人会考虑一个问题,“为什么会存在宽依赖和窄依赖?前面已经介绍了窄依赖可以使用pipeline,为什么不全部使用窄依赖?”。为了解释这个问题,我们以mr作为例子。很多人都认为spark性能优于hadoop的原因是内存计算,不需要落盘。其实不然,spark优于hadoop,不仅仅是内存计算,他的作业调度也要优于hadoop,同时spark也是需要落盘的。说到这里,我们也明白了,归根结底是因为spark计算的过程中也需要落盘,这也就是rdd宽依赖存在的根本了。
下面将通过源码来介绍rdd的依赖关系。宽依赖和窄依赖都继承了Dependency类。

/**
 * :: DeveloperApi ::
 * Base class for dependencies.
 */
@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

其中rdd就是依赖parent rdd。并且窄依赖又分为两种,分别是一对一(OneToOneDependency)和多对一(RangeDependency)两种依赖。这两种依赖的基类是NarrowDependency

@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]
  override def rdd: RDD[T] = _rdd
}

其中一对一依赖的类是OneToOneDependency,实现源码如下

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

这个类主要重写了getParents方法,输入的参数是子partition的id,我们知道该id是唯一的,返回的是父partitionid
而多对一的依赖的实现类是RangeDependency,源码如下

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

这种依赖主要出现在rdd的union过程中,多个rdd的partition经过union,变成一个partition。
不同于窄依赖,宽依赖的实现只有一种形式,即ShuffleDependency。子rdd的partition依赖于父parent rdd的所有partition,所以该过程使用到了shuffle。

/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 *
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
 *                   explicitly then the default serializer, as specified by `spark.serializer`
 *                   config option, will be used.
 * @param keyOrdering key ordering for RDD's shuffles
 * @param aggregator map/reduce-side aggregator for RDD's shuffle
 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
 */
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

在这里我们就不详细介绍shuffle了,后续我将会使用一个专题去介绍spark的shuffle过程。
下面我们看一下rdd依赖在抽象类RDD中的实现,主要是RDD类中定义了一个

protected def getDependencies: Seq[Dependency[_]] = deps

其中在抽象类RDD中,默认的是一对一窄依赖

/** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

前面使用的HadoopRDD中默认的也是一对一窄依赖,在这里我们看一下ShuffledRDD,该rdd重写了getDependencies方法

  override def getDependencies: Seq[Dependency[_]] = {
    val serializer = userSpecifiedSerializer.getOrElse {
      val serializerManager = SparkEnv.get.serializerManager
      if (mapSideCombine) {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
      } else {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
      }
    }
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
  }

在这里强调一下,所有的rdd创建都是在SparkContext中实现的,SparkContext中使用不同的算子创建不同的rdd,在不同的rdd中会存在不同的依赖,这也进一步体现了rdd的多样性

分区器(Optional)

该特性是可选项,主要针对key-value格式的rdd,需要提供一个partitioner。在抽象类RDD中,默认返回为None。

/** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None

而在ShuffleRdd中,对该变量进行了实现。

本地性(Optional)

本地性是一个可选项,有些子rdd中并没有实现,比如JdbcRdd,但是绝大部分的rdd都实现了。该特性在spark中使用十分普遍,极大地提高了spark执行的性能。比如我们知道spark的Data Locality分为以下五大类。
1.PROCESS_LOCAL:执行的task和数据在同一个jvm中,即同一个executor中。
2.NODE_LOCAL:task和数据在同一个节点的不同jvm中,即不同的executor中。
3.NO_PREF:数据从哪儿访问都一样快。
4.RACK_LOCAL:数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢。
5.ANY:数据在非同一机架的网络上,速度最慢。
在这里我们思考一个问题,即spark的data locality是如何实现的?在这里我们直接给出答案,即通过spark的本地性实现的,也就是rdd记录了数据存放的最近位置。在抽象类RDD中的源码如下

/**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

由于RDD没有实现该方法,在这里我们看一下RDD的子类HadoopRdd对其的实现

override def getPreferredLocations(split: Partition): Seq[String] = {
    val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
    val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
      case Some(c) =>
        try {
          val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
          val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
          HadoopRDD.convertSplitLocationInfo(infos)
        } catch {
          case e: Exception =>
            logDebug("Failed to use InputSplitWithLocations.", e)
            None
        }
      case None => None
    }
    locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
  }

主要是借助了hadoop中的inputSplit类,该类记录了每个split所在的host信息

缓存

在这里我把缓存也看做rdd的一个属性。为什么我们总说spark在迭代计算明显优于hadoop,其中rdd的缓存特性就是其中的决定性因素。在spark中,我们可以把需要重复使用的数据进行缓存,这样可以极大地提高执行性能。主要是利用了RDD的persist函数,源码如下:

  /**
   * Mark this RDD for persisting using the specified level.
   *
   * @param newLevel the target storage level
   * @param allowOverride whether to override any existing level with the new one
   */
  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    // If this is the first time this RDD is marked for persisting, register it
    // with the SparkContext for cleanups and accounting. Do this only once.
    if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    }
    storageLevel = newLevel
    this
  }

上面的程序首先判断该rdd是否已经缓存过了,如果没有缓存,直接调用了SparkContext类中的persistRDD方法,该方法使用了一个Map结构,其中key用rdd的id表示,value表示rdd的数据。

private[spark] def persistRDD(rdd: RDD[_]) {
    persistentRdds(rdd.id) = rdd
  }

// Keeps track of all persisted RDDs
  private[spark] val persistentRdds = {
    val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
    map.asScala
  }

下一章我将重点介绍shuffle在spark中的实现。

    原文作者:幸福的小棉袄
    原文地址: https://www.jianshu.com/p/1c26ba24bdc7
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞