RDD是什么
RDD(Resilient Distributed Datasets)可扩展的弹性分布式数据集,rdd是spark最基本的数据抽象,是整个spark生态的基石。rdd表示一个只读、分区且不变的数据集合。一个rdd可以有多个分区,所谓的分区就是表示可以把一个rdd上的数据划分成不同的几个相互独立的子集。显而易见的好处就是可以在rdd上并行化。
RDD的属性
因为RDD是一个抽象类,具体位置在org.apache.spark.rdd包下,同时我们可以发现该包下存在多种rdd。比如从hdfs上生成HadoopRDD;从Mysql上生成的JdbcRDD;以及从一个集合生成的ParallelCollectionRDD。总之,rdd具有多样性。下面将着重介绍rdd的六个重要属性:
分区性
所谓分区性,指得是rdd上的数据可以可以划分为几个独立的子集。显而易见带来的好处是并行化,使得基于rdd模型的执行时间明显缩短。比如存在两个rdd,分别是rdd1,rdd2,第一个rdd有10个分区,第二个rdd只有一个分区,假设两个rdd上的数据相同且计算资源充足。理论上第一个rdd执行的时间明显小于第二个rdd,因为第一个rdd可以10个分区并行执行。
接下来,我们通过源码级别的看一下分区性在类RDD中的体现。在org.apache.spark.rdd.RDD类中,protected def getPartitions: Array[Partition] 表示获取rdd的分区,通过这个函数就可以把一个rdd上的数据进行分区并返回一个数组。由于Partition是一个trait,下面是Partition的源码
package org.apache.spark
/**
* An identifier for a partition in an RDD.
*/
trait Partition extends Serializable {
/**
* Get the partition's index within its parent RDD
*/
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
为了阐述问题,我们使用HadoopRDD作为demon,HadoopRDD中包含了三个类,分别是
HadoopPartition
HadoopRDD
HadoopRDD(object)
其中HadoopPartition继承了Partition,源码如下
private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = 31 * (31 + rddId) + index
override def equals(other: Any): Boolean = super.equals(other)
/**
* Get any environment variables that should be added to the users environment when running pipes
* @return a Map with the environment variables and corresponding values, it could be empty
*/
def getPipeEnvVars(): Map[String, String] = {
val envVars: Map[String, String] = if (inputSplit.value.isInstanceOf[FileSplit]) {
val is: FileSplit = inputSplit.value.asInstanceOf[FileSplit]
// map_input_file is deprecated in favor of mapreduce_map_input_file but set both
// since it's not removed yet
Map("map_input_file" -> is.getPath().toString(),
"mapreduce_map_input_file" -> is.getPath().toString())
} else {
Map()
}
envVars
}
}
HadoopRDD在构建分区时,主要是借助hadoop的InputSplit类构建的,也就是说一个InputSplit是一个分区;HadoopRDD的本地性也是借助了InputSplit这个类的实现的,这个类会记录相应数据与ip之间的映射
接下来,我们看一下HadoopRDD中的getPartition方法的实现。
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
可知,HadoopRDD在处理hdfs相关的数据时,主要是借助了Hadoop中已有的InputFormat、InputSplit类。数据在hdfs上存储的时候,每个split的大小已经确定了,所以HadoopRDD在划分分区的时候直接使用每个split上的数据作为一个独立的partition
函数性
该属性我们可以理解为每个都会有一个函数。换句话我们的逻辑就是由这些函数组成的;在这里我们强调一下,由于RDD是天生不可变性,父RDD正是通过该属性生成子RDD。接下来展示的将是该属性在源码级别的体现(仍然使用HadoopRdd作为说明),函数性主要是指RDD类中的
def compute(split: Partition, context: TaskContext): Iterator[T]
该函数输入是Partition,输出是一个迭代器。下面是HadoopRdd类中对该函数的具体实现(只展示了部分核心代码)
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
Partition转化为HadoopPartition
private val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
private val jobConf = getJobConf()
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.inputSplit.value match {
case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
case _ => InputFileNameHolder.unsetInputFileName()
}
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
通过InputSplit获取一个Reader
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader =
try {
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
} catch {
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
重写Iterator中的getNext方法
override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
}
(key, value)
}
}
new InterruptibleIterator[(K, V)](context, iter)
}
通过上面的源码,可以得出两个点:1.Partition转化为HadoopPartition,并且通过InputSplit创建RecordReader;2.重写Iterator中的两个方法,getNext和close,通过创建的reader读取分片的数据。
依懒性
首先介绍一下rdd依赖的概念:rdds通过transformation算子之间的操作,转化得到新的rdd,新的rdd包含了从其它rdd衍生的必需信息,我们称之为依赖。众所周知,rdd天生具有不变性,每次Transformation都会产生一个新的rdd,那么在这里会有一个问题。即为什么RDD设计为具有依赖性以及他的好处?我认为主要体现为以下两点:
1.生具有容错性。很多书籍上都会介绍该特性,而spark正是利用了这个特性用做数据容灾。当一个rdd数据丢失时,由于spark保存了rdd之间的依赖关系,那么就会利用该性能重新生成丢失的rdd。
2.调度性能更优。窄依赖可以以pipeline形式执行多条命令,同时多个之间可以并行化执行。
rdd之间的依赖性主要分为以下两大类,分别是
1.窄依赖:每一个parent rdd的partition最多被子rdd的一个partition使用。常见的窄依赖操作算子有map,filter,union等
2.宽依赖:多个子rdd的partition会使用同一个parent的partition。常见的宽依赖算子有groupByKey,reduceByKey,join,distinct,repartition等等
在这里很多人会考虑一个问题,“为什么会存在宽依赖和窄依赖?前面已经介绍了窄依赖可以使用pipeline,为什么不全部使用窄依赖?”。为了解释这个问题,我们以mr作为例子。很多人都认为spark性能优于hadoop的原因是内存计算,不需要落盘。其实不然,spark优于hadoop,不仅仅是内存计算,他的作业调度也要优于hadoop,同时spark也是需要落盘的。说到这里,我们也明白了,归根结底是因为spark计算的过程中也需要落盘,这也就是rdd宽依赖存在的根本了。
下面将通过源码来介绍rdd的依赖关系。宽依赖和窄依赖都继承了Dependency类。
/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
其中rdd就是依赖parent rdd。并且窄依赖又分为两种,分别是一对一(OneToOneDependency)和多对一(RangeDependency)两种依赖。这两种依赖的基类是NarrowDependency
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
其中一对一依赖的类是OneToOneDependency,实现源码如下
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
这个类主要重写了getParents方法,输入的参数是子partition的id,我们知道该id是唯一的,返回的是父partitionid
而多对一的依赖的实现类是RangeDependency,源码如下
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
这种依赖主要出现在rdd的union过程中,多个rdd的partition经过union,变成一个partition。
不同于窄依赖,宽依赖的实现只有一种形式,即ShuffleDependency。子rdd的partition依赖于父parent rdd的所有partition,所以该过程使用到了shuffle。
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
* explicitly then the default serializer, as specified by `spark.serializer`
* config option, will be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
在这里我们就不详细介绍shuffle了,后续我将会使用一个专题去介绍spark的shuffle过程。
下面我们看一下rdd依赖在抽象类RDD中的实现,主要是RDD类中定义了一个
protected def getDependencies: Seq[Dependency[_]] = deps
其中在抽象类RDD中,默认的是一对一窄依赖
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
前面使用的HadoopRDD中默认的也是一对一窄依赖,在这里我们看一下ShuffledRDD,该rdd重写了getDependencies方法
override def getDependencies: Seq[Dependency[_]] = {
val serializer = userSpecifiedSerializer.getOrElse {
val serializerManager = SparkEnv.get.serializerManager
if (mapSideCombine) {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
} else {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
}
}
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}
在这里强调一下,所有的rdd创建都是在SparkContext中实现的,SparkContext中使用不同的算子创建不同的rdd,在不同的rdd中会存在不同的依赖,这也进一步体现了rdd的多样性。
分区器(Optional)
该特性是可选项,主要针对key-value格式的rdd,需要提供一个partitioner。在抽象类RDD中,默认返回为None。
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
而在ShuffleRdd中,对该变量进行了实现。
本地性(Optional)
本地性是一个可选项,有些子rdd中并没有实现,比如JdbcRdd,但是绝大部分的rdd都实现了。该特性在spark中使用十分普遍,极大地提高了spark执行的性能。比如我们知道spark的Data Locality分为以下五大类。
1.PROCESS_LOCAL:执行的task和数据在同一个jvm中,即同一个executor中。
2.NODE_LOCAL:task和数据在同一个节点的不同jvm中,即不同的executor中。
3.NO_PREF:数据从哪儿访问都一样快。
4.RACK_LOCAL:数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢。
5.ANY:数据在非同一机架的网络上,速度最慢。
在这里我们思考一个问题,即spark的data locality是如何实现的?在这里我们直接给出答案,即通过spark的本地性实现的,也就是rdd记录了数据存放的最近位置。在抽象类RDD中的源码如下
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
由于RDD没有实现该方法,在这里我们看一下RDD的子类HadoopRdd对其的实现
override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
case Some(c) =>
try {
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e: Exception =>
logDebug("Failed to use InputSplitWithLocations.", e)
None
}
case None => None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
主要是借助了hadoop中的inputSplit类,该类记录了每个split所在的host信息
缓存
在这里我把缓存也看做rdd的一个属性。为什么我们总说spark在迭代计算明显优于hadoop,其中rdd的缓存特性就是其中的决定性因素。在spark中,我们可以把需要重复使用的数据进行缓存,这样可以极大地提高执行性能。主要是利用了RDD的persist函数,源码如下:
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
上面的程序首先判断该rdd是否已经缓存过了,如果没有缓存,直接调用了SparkContext类中的persistRDD方法,该方法使用了一个Map结构,其中key用rdd的id表示,value表示rdd的数据。
private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
map.asScala
}
下一章我将重点介绍shuffle在spark中的实现。