RDD是Spark的基础,是对大数据的抽象,所以先破解Spark,首先从RDD开始。
* RDD 是什么?有什么特点?
* RDD 包含什么?
* RDD 能做什么?
RDD 的注释
org.apache.spark.rdd.RDD
类源代码中有详细的注释:
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
翻译:弹性的 分布式 数据集是 Spark 基础的抽象。
解释:弹性的(可复原的),说明数据集具有容错性、可修复性。
分布式,说明数据集可以分布在不同的机器上Represents an immutable, partitioned collection of elements that can be operated on in parallel.
翻译:RDD 是不可变的 分区的 可并行处理的 元素集合
解释:不可变的,这和 Scala 的设计理念相同,数据集一旦构建完成,就不能再修改,这样能轻松解决多个线程读数据的一致性问题。
分区的=可并行处理的=分布式This class contains the basic operations available on all RDDs, such as
map
,filter
, andpersist
.
翻译:这个抽象类包含了所有 RDD 都应该有的基本操作,比如map
、filter
、persist
等
解释:这三个操作分别是:批量转换、筛选、持久化In addition, [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
pairs, such asgroupByKey
andjoin
;
翻译:另外PairRDDFunctions
对象中包含了 键值对型(KV型) RDD 的操作,例如groupByKey
和join
;
解释:KV 型可以支持按 Key 分组、关联等操作[[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
Doubles;
翻译:DoubleRDDFunctions
提供可 double 数据集的操作;
解释:数值型数据集有求和、平均、分布图等统计性操作and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
can be saved as SequenceFiles.
翻译:SequenceFileRDDFunctions
提供了顺序存储操作All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit.
翻译:所有的的类通过隐式转换自动地用于RDD实例中
解释:RDD 伴生对象里包含了隐式转换函数,用implicit
修饰。隐式转换是 Scala 的语法特性。Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
翻译:在 RDD 中,包含这样的5个属性(也就说要实现抽象方法或给空对象赋值):
- 一个分区的列表(getPartitions)
- 一个用于计算分区中数据的函数(compute)
- 一个对其他 RDD 的依赖列表(getDependencies)
- 可选:KV 型 RDD 应该有一个分区器,例如 hash-分区器(partitioner)
- 可选:分区数据计算完后优先存储的位置,例如 HDFS 的某个块(getPreferredLocations)
All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions.
翻译: Spark 中所有的任务调度、任务执行都依赖于这些方法。RDD 可以覆盖这些方法,实现有自己的计算方法。例如从一个新的存储系统中读取数据。Please refer to the http://101.96.8.165/people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf for more details on RDD internals.
翻译:更多细节,可以 Spark 的论文
一段示例代码
是的,我们从HelloWorld开始,官方推荐的第一个程序是计算π的近似值:
import scala.math.random
import org.apache.spark.sql.SparkSession
object SparkPi {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("Spark Pi")
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1))
spark.stop()
}
}
😱什么,RDD 在哪里?
获得 Spark 会话 和 Spark 上下文
val spark = SparkSession.builder.appName("xx").getOrCreate()
这4行,只能算是1行代码。调用内部的构造器产生了一个Spark会话对象
,赋值给spark
。后面spark.sparkContext
是获得 Spark 的上下文对象
。至于会话对象和上下文对象,以后再分析。创建一个 RDD
spark.sparkContext.parallelize(1 until n, slices)
SparkContext 对象中有一个parallelize
函数,创建了一个RDD对象。
RDD是抽象类。进入源码我们可以看到 创建的 RDD 是ParallelCollectionRDD
(字面翻译为并行容器 RDD
)。这个RDD是最简单的RDD了。如果是我,我会将它命名为SimpleRDD。
这句话创建了一个包含slices个分区的 RDD,RDD 的内容是1到 n,这 n+1 个数。数据存在内存中,从内存读分区的数据。看看这个 RDD 中的细节
还记得前一节翻译的文字吗?RDD 应该实现5个方法。这个并行容器 RDD 是怎么实现的呢?- 一个分区的列表
将数据分成slices份,放在slices个容器中。每个容器就是一个分区,所有容器构成了分区列表 - 一个用于计算分区中数据的函数
什么都没做,返回分区的迭代器 - 一个依赖列表
依赖列表为Nil
空列表。即,这个 RDD 不依赖别的 RDD - 一个分区器
不是 KV 型的,不需要 - 一个运算存储优先位置
SparkContext传入了一个 Map,Map 有slices个key,对应slices个容器。可见,SparkContext希望结果存在内存中。
- 一个分区的列表
map
map是将分组中每一个元素映射成另一个元素的操作。我们说过,RDD是不可变的,map这个操作产生新MapPartitionsRDD对象。
那MapPartitionsRDD的5个方法呢?- 一个依赖列表:只依赖于上游的 RDD,本例中依赖于上游的ParallelCollectionRDD。
- 一个分区列表:就是上游分区列表,直接读取上游数据
- 一个计算:计算过程就是“映射关系”,由外部传入一个函数对象表达映射关系
- 一个分区器:上游 RDD 的分区器,直接读上游的分区
- 一个优先存储位置:上游 RDD 的优先位置,本例中直接写到SparkContext传入的 Map
reduce
reduce 也是一个操作,是多对一的聚合操作,聚合前后类型必须一致。本例中是求和操作。
过程可以简述成,先计算每个分区的聚合结果,再将多个分区的结果再聚合。过程比较复杂,以后再深入。如何计算π?
random 取随机数,范围是 [0, 1),那么x 和 y 是 [-1, 1)范围内的随机数。
计算xx+yy,这是点(x, y)到(0, 0) 的距离,当距离不大1(点落在r=1的圆内)时,取1,否则取0。那么随机取 N 个点,点落圆内的几率等于圆的面积/边长为2的正方形的面积。所以:
圆的面积 ≌ 正方形面积 * 落在圆内的点数 / 所有的点数
圆的面积=π,正方形面积=4
根据大数定理和中心极限定理,取的点越多,π的估值越近似于正态分布;取得的点越多,正态分布的标准差越小;取得的点越多,正态分布的均值越接近π的真值。所以,随着取点的增加,π估值约精确。
Scala 语法
1 until n
用到了三个 Scala 语法:
- 一切皆对象
在 Java 中,1
会被认为是一个基本类型int,可以装包成对象,在 Scala 中,1
就是一个对象。 - 隐式转换
util
是调用RichInt.util
方法。Int 转换成 RichInt 是隐式的,定义在scala.Predef
对象中的intWrapper
方法。scala.Predef
类似于宏。
参考: scala source implicit conversion from Int to RichInt – Stack Overflow - 函数调用的写法
1 until n
等价于1.until(n)
,也就是说,如果对象方法若只有一个参数,可以省略掉点和括号,这样代码更接近自然语言。
OK,那么1 until n
这句话写全了应该是什么样的呢?
答:scala.this.Predef.intWrapper(1).until(n);
疑问列表
我将阅读过程中的未解内容记录下来,留待以后阅读代码时解答。疑问一个一个划掉,就是成长的过程。
- reduce 等 RDD 操作是如何执行的?
总结
- RDD 是数据集
- RDD 的特点是有弹性、分布式、不可变。
- RDD应该包含5个部分:一个分区集、一个依赖集、一个运算、[一个分区器、一个优先结果存储位置]。
- RDD 有一系列的操作,包括映射、过滤、聚合、存储等。
本文源码
RDD spark/core/RDD/RDD.scala at master · apache/spark · GitHub
map spark/core/RDD/MapPartitionsRDD at master · apache/spark · GitHub
计算π spark/examples/SparkPi.scala at master · apache/spark · GitHub