Spark RDD 编程指南中文版(二)

接上一章 曾革:Spark RDD 编程指南中文版(一)继续翻译 Spark 的官方英文文档。

你可以点击这个链接查看所有已翻译的内容: 曾革:Spark 中文文档目录汇总

RDD Operations

RDDs 支持两种形式的操作:transformations (转换)和 actions(行动),transformations 能将一个已经存在的数据集转换成一个新的数据集。actions 能对一个数据集进行计算后返回给 driver program 一个值。举个例子,map 就是一个转换操作,他能将数据集里面的每一个元素传递给一个函数并返回一个新的 RDD 。再比如,reduce 就是一个行动操作,他能将一个集合里的元素进行聚合(aggregates)操作并返回给 driver program 一个最终结果(还有一个 reduceByKey 的行动操作可以返回一个 分布式的数据集)。

所有的转换操作都是懒加载(lazy),也就是说它们不会立即计算结果,仅仅只是记住了那些在数据集上的一系列操作。这些转换操作只会在一个行动操作需要向 driver program 返回一个结果的时候进行计算。这种设计使得 Spark 的运行变得更加高效。举个例子,我们可以对一个数据集使用map 方法,它不会立即计算,只会返回一个新的 RDD ,我们再对这个新的 RDD 使用reduce 方法时,它才会计算出一个结果返回给 driver program ,而不是将 map 计算后返回一个大数据集给 driver program。

默认情况下,每一被转换后的 RDD 在你对它进行行动操作时都会被重新计算一次,不过你可以将 RDD 通过使用 persist(or cache )方法持久化到内存中,这会使你在下一次访问这些数据的时候变得异常快速。同样的,你也可以将 RDD 持久化到硬盘,或者是在集群之间进行复制。

Basics

下面的事例代码演示了基本的 RDD 操作:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b) 

第一行代码定义了一个 RDD,这个 RDD 通过读取一个扩展的文件而来,这个数据并不会真正的被加载到内存,除非对它执行了一个行动操作:变量 lines 仅仅只是指向了这个文件。第二行代码对 lines 进行了一个转换操作 map ,将新的 RDD 赋值给了变量 lineLengths ,同样由于懒加载的特性,他并不会立刻计算出一个结果。最后一行代码中,我们调用了一个行动操作 reduce 。在这个时候,Spark 会将这些计算任务拆分成任务分给集群中的机器,每一个机器只会执行自己的那部分 map 和 reduce 操作,然后返回自己的结果给 driver program。

如果后面我们想再次使用 lineLengths ,我们可以这样做:

lineLengths.persist() 

在执行 reduce 之前,这个 lineLength 会在执行第一次计算后才会被缓存到内存中。

Passing Functions to Spark

Spark 非常依赖在集群之间传递 driver program 中的函数。下面有两个建议:

  1. 匿名函数,可以用在简单的代码片段。
  2. 全局单列对象里的静态方法。

例如,你可以定义一个 MyFunction 对象,然后将 MyFunction.func1 传递给 map ,就像这样:

object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1) 

需要注意的是,有时候我们会传递一个类实例中的方法的引用(这和单列对象相反),这需要传递整个对象,包括对象里面的类以及这些类的方法,比如下面的示例:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
} 

有时候,如果我们创建了一个新的 MyClass 实例,然后调用了 doStuff 方法,那么 map 方法会指向这个 MyClass 实例里面的fun1 方法,所以这种情况下我们需要将整个对象传递给集群。相似的写法是:rdd.map(x => this.fun1(x)).

同样的,访问外部对象的字段变量也需要传递整个对象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
} 

上面代码的等价写法是:rdd.map(x => this.field + x ) ,这个引用整个对象。不过为了避免这个问题,最简单的方法是将 field 赋值给一个本地变量,而不是从外部来访问它。

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
} 

Understanding closures

Spark 一个比较难以理解的东西是:当跨集群执行代码的时候,变量和方法的生命周期和作用范围。 在执行 RDD 的操作时,若对变量和方法在其作用域之外进行修改,经常会导致冲突。下面我们演示一下在一个累加器上使用 foreach() 方法时候的表现,不过类似的问题也经常出现在其他操作上。

Example

下面是一个简单的 RDD 求和,注意这个结果可能会不同,因为结果取决于是否在同一个 JVM 上执行这个代码。你可以将这个例子以本地模式提交到 Spark 上,或者是部署到分布式的 Spark 集群上。

var counter = 0 var rdd = sc.parallelize(data) 
// Wrong: Don't do this!! 
rdd.foreach(x => counter += x) 
println("Counter value: " + counter)

Local vs. cluster modes

上面代码的运行结果是不确定的,并且可能不会按照原本的意图工作。当执行这个 job 的时候,Spark 将RDD 操作打散成一个个 tasks,每一个 task 会由一个执行器( executor) 执行。在执行之前,Spark 会计算任务的闭包( closure ),闭包是那些当执行器开始执行计算的时候(在此执行的方法是 foreach() ),必须对执行器可见的变量和方法,这个闭包会被序列化并发送到每一个执行器。

闭包里面的变量会被复制并发送到执行器,当 counter 被 foreach 方法引用的时候,它将不再是 driver node 上的 counter 。这个时候,仍然会有一个 counter 在 driver node 的内存中但是对执行器并不可见。执行器只看得到序列化的闭包的一个副本。所以最终 counter 的值还是 0 ,因为对 counter 的所有操作都是在对闭包里面引用的那个 counter 进行操作。

当在本地模式运行的时候,某些情况下 foreach 方法实际上是在同一个 JVM 驱动器中执行,他们会引用同一个 counter ,所以有可能会更新 counter 。

所以为了保证这些场景最后能给一个一致的结果,我们要使用累加器( Accumulator )。Spark 的累加器能保证当一个执行任务被分配到集群中各节点时,安全的更新变量,我们会在累加器的部分专门讨论此事。

In general, closures – constructs like loops or locally defined methods,should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation
is needed.

Printing elements of an RDD

另一个用于打印 RDD 所有元素的常见语法是 rdd.foreach( println ) 或者是 rdd.map( println )。在单台机器上,这将会产生预期中的输入,打印 RDD 中所有的元素。不过在集群模式下,标准输出会被正在执行任务的执行器的标准输出所替代,而不是在一个driver 上,所以driver 上的标准输出不会打印所有元素。如果要打印所有元素,你需要使用 rdd.collect.foreach( println ) 方法,不过这有可能导致driver 的内存耗尽,应为它会将一个 RDD 的所有数据都集中到一台机器上,如果你只想打印一些元素,一个安全的做法是使用 take 方法:rdd.take( 100 ).foreach( println )。

Working with Key-Value Pairs

虽然大多数的 Spark 的操作都适合任意对象类型的 RDD ,不过仍有少数的一些操作只能用在

key-value pairs 的 RDD上,最常见的是分布式的 “shuffle” 操作,他能通过 key 来对元素进行聚合或者是分组。

在 Scala 里,这些操作可以自动的用在包含有元组(Tuple)的RDD上(元组是 Scala语言内置的对象,简单写法是(a,b))。key-value pair 的操作也适合用在 PairRDDFunctions 类上。

下面的代码示例使用 key-value pairs 的 reduceByKey 方法来统计文本文件中每一行出现了多少次 :

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b) 

我们也可以使用 counts.sortByKey( ) 来对结果按字母进行排序,最后再用 counts.collect( ) 来向 driver program 返回一个数组对象。

(未完待续————————————————————————–)

    原文作者:曾革
    原文地址: https://zhuanlan.zhihu.com/p/32704971
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞