RDD 操作一 基础 ,放入方法,闭包,输出元素,使用 Key-Value 工作
原文地址: http://spark.apache.org/docs/latest/programming-guide.html
仅限交流使用,转载请注明出处。如有错误,欢迎指出!
Henvealf/译
RDD 提供了两种类型的操作:
- transformations :从已经存在的 RDD 中创建出一个新的 RDD。
- actions: 在集群上运行了一个计算后,最终返回一个值给设备中的程序。
transformation 的一个例子就是map,对 RDD 中的每个元素进行相同的操作,返回一个新的 RDD。
action 的一个例子就是 reduce,使用相同的函数来聚合 RDD 中的元素。
在 Spark 中,所有的 transformation 都是懒惰的(lazy),以至于他不会立刻计算出他们结果。代替的是,他们仅仅记住这个 transformation 应用在哪些基础的数据集上(比如一个文件)。transformation 计算仅仅是在程序中的一个动作需要一个返回值的时候才开始。这个设计让能够让 Spark 更加高效。举个例子,我们能够意识到一个 map 生成的数据集只会用在一个 reduce 上,并且仅仅返回 reduce 的结果给设备,而不会是一个 map 后的很大的数据集给设备。
默认情况下,在你每次重新运行一个通过转换(transforme)得到的RDD的action 的时候,转换每次都可能重新再运行一次。然而,你也可以使用 persist(或者 cache)方法将一个 RDD 持久化在内存中,这样就能让 Spark 把这些元素维持在集群中,让下一次的存取速度变得飞快。这里也同样支持持久话 RDD 在磁盘中,或者备份在多个节点中。
基础
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
lineLengths.persist()
val totalLength = lineLengths.reduce((a, b) => a + b)
往 Spark 中放入方法
scala
Spark 的 API 最信任的就是在集群上运行的方法上放方法。下面有两个建议:
- 使用匿名方法语法,能够减少代码量。
- 静态化在全局的单例对象上的函数,就是定义一个object,可以把它理解为直接创建了一个对象,不需要 new 就可以使用。也可以把他理解为一个类,而其中的函数都默认为静态的。里面有你用到所有函数/方法。比如,你可以定义 object MyFunctions ,之后通过 MyFunxtions 来使用方法:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
注意你也可能传入一个引用给一个类(class) 中的函数(与单例 object 的做法是相反的),他需要向 Spark 中传入包含了要使用的方法的类的对象。比如下面:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
着这里,我们要 new 一个 MyClass 的对象才能使用 doSuff 方法。我们要将这一整个对象传送入集群中才可以,然后书写方式和 rdd.map(x => this.func1(x)) 很像。
用很相似的方式,外部的对象存取字段就会引用到整个对象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
可以发现这样其实就是 rdd.map(x => this.field + x), 这样在外部就得到了他的 this 引用,这样很不安全,容易出错,为了解决这个问题,下面有一个简单方式, 就是先将字段赋给一个局部变量中:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
理解闭包(closures)
在 Spark 中比较难理解的就是当在集群上运行代码的时候,变量和方法的作用域与生命周期。在其作用域之外修改变量的 RDD 操作可以是一个混淆的常用资源( can be a frequent source of confusion)。下面我们使用 foreach 来递增一个计数器,相同的问题也能同样出现在其他的操作上。
例子
考虑原生的 RDD 元素的加和操作,这个操作在不同的虚拟机上执行可能会呈现出不同的行为。这个普通的例子是将 Spark 运行在 local 模式下(–master = local[n])的情况与运行在集群上的情况做比较(通过 park-submit 给 YARN)。
Scala
var counter = 0;
var rdd = sc.parallelize(data)
// Wrong: 不会执行他
rdd.foreach(x => counter + x)
println("Counter: " + counter)
本地模式 Vs. 集群模式
在这之前先明确一下一些概念:
Driver: 驱动器,一个 job 只有一个,主要负责 job 的解析,与 task 的调度等。
Executor:执行器,实际运行 task 的地方,一个 job 有多个。
上面这段代码的行为是不确定的,可能不像预想中那样工作。为了执行 job ,Spark 会将处理 RDD 的操作拆分到许多 task 中,且每一个 task 被一个执行器执行。在执行之前, Spark 会计算 task 的闭包。闭包是一些必须让执行器可见的变量和方法,这样执行器才能执行他们在 RDD 上的操作(这里就是 foreach)。这个闭包是被序列化并传送到了每个执行器。
在集群上的变量会立刻被送到每个执行器中,事实上,当 counter 被引用使用在 foreach 方法里面时,他就不再是驱动(driver)节点上的 counter 了。也就是说在驱动节点的内存中也一直会有一个 counter ,可他对执行器来说,已经不可见了。执行器仅仅能看到序列化了的闭包中的拷贝。事实上, 驱动器上的 final 的 counter 的值在操作执行的时候一直都是0,执行器操作的只是引用的序列化的闭包中的值。
在 local 模式下,foreach 方法实际会运行在作为驱动器的 JVM 中,也就是说运行程序的 JVM 和运行驱动器的 JVM 是同一个。所以操作就会引用到原始的 counter, counter 的值就被改变了。
如果想要确保现在说的这种情况有确定的行为,一种就是使用一个 Accumulator(积累器)。Accumulator 常常使用于在执行被分片到不同的 worker 时需要安全的对变量进行更新的情况。 Accumulator 以后详细介绍。
一般情况下,闭包–构建循环或者局部函数,应该不要用于改变一些全局的状态。 Spark 不能确定或者保证修改闭包之外的的对象引用时的行为。一些代码在本地模式下运行的好好的,在放到集群上运行时就可能得不到期望的结果。如果需要使用全局的聚合,就使用一个 Accumulator 来代替他。
输出一个 RDD 的元素
另一个老事件就是试图使用 rdd.foreach(println) 或者 rdd.map(println) 打印出元素的值。在一个机器上,输出 RDD 所有的元素的将会生成你期望的输出。然而,在 cluster 模式下是,stdout 会由执行器来调用,写在了执行器的标准输出上,而不是驱动器上。所以在驱动器上你就看不到 stdout 的输出类了。
为了在驱动器上输出所有的元素,一个你可以使用 collect 方法,先把这个 RDD 带到驱动器节点上: rdd.collect().foreach(println)。不过这中方法容易造成内存不足。因为 collect() 会把 RDD 实体拿进一个单独的机器中;如果仅仅需要输出 RDD 的一小部分元素,最安全的方式是使用 take(): rdd.take().foreach(println).
使用 Key-Value 工作
Scala
RDDs 中包括任何类型的对象,有一些特殊的操作是能用于 RDDs 的键值对上。 最普遍的就是集群上的 “洗牌” 过程,就是使用 key 来进行分组和聚合。
在 Scala 中,这些操作在 包含 ** Tuple2** (二元组)对象的 RDDs 中是自动(直接?)可用的(在本语言中,之间写一个(a,b) 就能创建 tuples )。键值对操作可以在 PairRDDFunction 中得到,他是自动包装了一个元组RDD。
举个例子,下面的代码就在键值对上使用 reduceByKey 操作来计算当前文件的行数。
val lines = sc.textFile("data.txt")
val pairs = line.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
我们也可以使用 counts.sortByKey(),在这个例子中,机会按字母排序这些简直对,最后 count.collect() 就将他们带会驱动器程序,作为一个对象数值使用。
注意 :当你使用自定义的对象来作为键值对的键值,你必须保证这个自定义的该对象的 equals 方法和与之联合匹配的 hashCode() 方法。