Spark RDD 编程指南中文版(一)

寄语:接下来的一段时间,我会将 Spark 的官方英文文档翻译成中文,发布在这个专栏,欢迎大家一起来交流学习 Spark。

你可以点击这个链接查看所有已翻译的内容: 曾革:Spark 中文文档目录汇总

Overview(我直接从 Spark Programming Guide 这部分的 Overview 开始)

总的来说,每一 个Spark Application 都是由一个 driver program 组成,这个 driver program 会运行用户的 main 方法 ,且能在集群上执行并行操作。Spark 提供了一个主要的数据抽象,叫弹性分布式数据集(RDD),RDD 是一个能跨集群执行并行操作的数据集合,我们可以通过从 Hadoop file system 中读取数据来创建一个 RDD,亦可以将 driver program 中一个 Scala 集合转换而来。通常,我们可以让 Spark 将一个 RDD 持久化到内存,以方便我们后面对这个数据进行重复的操作(注:将数据持久化到内存后,读取速度会变得很快)。在一个分布式集群中,当一个节点挂掉之后,RDD 会自动的进行恢复。

Spark 中的另一个抽象是 shared variables (注:常见的中文资料会将其翻译成共享变量)。默认的,当 Spark 执行一个并行操作的时候,会将函数中使用到的变量复制到每一个 task 中,有些时候,一个变量需要在多个 tasks、或者是在 tasks 和 driver program 之间进行共享。Spark 支持两种类型的共享变量:一是 broadcast variables (广播变量),它将能一个变量缓存到所有节点的内存中,第二个是 accumulators (累加器),他只能被用作加法的变量,比如计数和求和。

这份指南会使用 Spark 支持 的所有语言来演示这些功能(注:Spark 支持 Java 、Scala 、Python 以及 R 语言进行开发,此文档我将暂时只使用 Scala 做演示 )。你可以非常容易的通过启动一个 spark 的交互式 shell 来进行下面的操作( bin/spark-shell )。

Linking with Spark

Spark 2.2.0 默认使用 Scala 2.11 ( Spark 也可以使用其他版本的 Scala )。如果你使用 Scala 来编写应用,也需要使用相同版本的 Scala。

开始写一个 application 之前,你需要添加 Maven dependency on Spark , Spark 的 Maven 仓库地址如下:

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0 

特别的,如果你需要访问一个 HDFS 集群,需要添加如下依赖:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version> 

最后,你需要导入一些类到程序中:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf 

Initializing Spark

Spark Program 需要做的第一件事,就是创建一个 Spark Context 对象,他是 Spark 访问集群的入口,在创建 SparkContext 之前,你需要先创建一个包含你的应用程序的信息的配置文件 SparkConf 对象。

每一个 JVM 都只允许有一个active SparkContext,如果你想创建一个新的,你需要先停掉这个 active SparkContext .

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf) 

appName 参数是你为你 的application 取得名字,他会在 cluster UI 上展示,master 是 Spark、Mesos、 或者是 YARN 集群的URL,你也可以用一个特殊的 “local” 来在本地运行你的程序。在实际使用的时候,如果你打算将你的应用程序扔到集群上运行,那么,你肯定不想将 master 硬编码在代码里,你可以在你使用 spark-submit 提交应用程序的时候,将master 参数传递进去。尽管如此,在你进行本地测试的时候,你大可以用 “local” 模式。

Using the Shell

当你启动 spark-shell 的时候,会自动的创建一个 SparkContext 对象,叫 “sc”,如果你打算手动创建一个 SparkContext,它是不会起作用的。你可以用 –master 参数来告诉 Context 连接到哪个master上,你同样可以用 –jars 参数的形式将一些 jar 包导入到环境变量中去,你也可以用 — package 参数将 Spark 依赖的 jar 包添加到 session shell 中去,参数后面跟一系列的 Maven 坐标,每个坐标用逗号分隔符隔开。举个例子,你可以直接用四个核心来跑 bin/spark-shell ,就像这样:

$ ./bin/spark-shell --master local[4]

或者是用下面的方式将 code.jar 加到环境变量中去:

$ ./bin/spark-shell --master local[4] --jars code.jar

或者通过Maven 坐标添加一个 依赖:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" 

Resilient Distributed Datasets (RDDs)

Spark 中有一个叫做弹性分布式数据集(RDD)的概念,它是一个具有高容错性且能并行执行的数据集合。我们有两种途径来创建 RDDs:一是 parallelizing(并行化) 一个在 driver program 中已经存在的集合,或者引用一个外部的存储系统中的数据集,比如一个 shared filesystem , HDFS,HBase,或者Hadoop支持的任意的存储系统。

Parallelized Collections

并行集合可以通过对 driver program 中已经存在的集合调用 SparkContext 的 parallelize 方法来创建。 集合里面的元素会被 copy 到一个可以被并行执行的分布式数据集里面。举例,下面演示了怎么将一个 1 到 5 的 Array 转换成并行集合:

val testrdd = sc.parallelize(Arrary(1,2,3,4,5))

一旦创建,这个分布式数据集(testrdd) 就能被并行操作了。举例,我们可以调用 testrdd.reduce((a,b) => a + b) 来对 array 中所有的数据进行求和,我们会在后面进一步描述这些在分布式数据集上的操作。

对于一个并行数据集来说,有一个很重要的参数:将一个集合切成片( partitions )的数量。Spark 会在每一个 partition 上运行一个 task,典型的你可以在集群上为每一个 CPU 设置 2-4 个 partitions . 一般而言,Spark 会根据你的集群资源来自动设置 partitions 的数量。尽管如此,你可以通过手动设置 parallelize 方法的第二个参数来指定数量。

External Dataset

Spark 可以从任何 Hadoop 支持的存储系统来创建分布式数据集,包括 你的本地系统,HDFS,Cassandra ,HBase,Amazon S3等。Spark 支持文本文件(Text File),SequenceFiles( 序列化文件 ),和其他任何 Hadoop 的输入格式。

Text File RDDs 可以通过 使用 SparkContext 的 textFile 方法来创建,这个方法需要指定文件的 URI ( 无论这个文件是在你的本地,还是在 hdfs:// ,还是 s3n:// 等),然后将文件的内容按行读取成一个集合。这里有一个调用例子:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

一旦被创建,distFile 就可以使用各种 dataset operations. 举个例子,我们可以通过下面的计算方式来计算 distFile 中 所有 lines 的 sizes :

distFile.map(s => s.length).reduce((a,b) => a + b)

读取文件的时候,有这些需要注意的地方:

  1. 如果使用本地文件系统,必须确保每个节点都有这个路径且有访问权限, 所以在读取文件之前可以将文件复制到所有的 worker 上或者使用网络共享文件系统。
  2. Spark 所有读取文件的方法 ,包括textFile,同样支持对文件夹、压缩文件的读取,还允许读取的路径中带有通配符。 比如你可以使用textFile(“/my/directory”), textFile(“/my/directory/.txt”)和 textFile(“/my/directory/.gz”)。
  3. textFile 方法也可以通过输入一个可选的参数,来控制文件的分片数目。默认情况下,Spark为每一块文件创建一个分片(HDFS默认的块大小为128 MB),但是你也可以通过传入一个更大的值,来指定更多的 partitions。注意,你不能指定一个比块数更少的partitions 。

除了文本文件,Spark Scala API 也支持其它数据格式:

  1. SparkContext.wholeTextFiles() 允许你读取文件夹下所有的文件,比如多个小的文本文件, 然后以 ( filename,content) 的格式一个个返回他们。这个方法与 textFile 是冲突的,因为 textFile 会对每一个文件的每一行返回一个 record ,
  2. 对于 SequenceFiles,可以使用 SparkContext 的 sequenceFile[K, V] 方法创建。像IntWritable 和 Text 一样,它们必须是 Hadoop 的 Writable interface 的子类。另外,对于几种通用 Writable 类型,Spark 允许你指定原生类型来替代。如:sequencFile[Int, String] 将会自动读取 IntWritables 和 Texts。
  3. 对于其他类型的 Hadoop 输入格式,你可以使用 SparkContext.hadoopRDD 方法,它可以接收任意类型的 JobConf 和输入格式类、键类型和值类型。按照 Hadoop 作业一样的方法,来设置输入源就可以了。你也可以使用SparkContext.newAPIHadoopRDD,它基于新的MapReduce API(org.apache.hadoop.mapreduce).
  4. RDD.saveAsObjectFile 和 SparkContext.objectFile 支持将 RDD 保存为一个简单格式的数据, 比如序列化的 Java 对象,尽管这不是一个像 AVRO 一样高效的格式, 但是它提供了一个容易的方式来保存 RDD。

(未完待续——————–)

    原文作者:曾革
    原文地址: https://zhuanlan.zhihu.com/p/32694387
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞