开 始 体 验 Apache Spark是为大规模数据处理而设计的快速通用的运算框架,最初由AMPLab所开发,使用了内存运算技术。相对于Hadoop的MapReduce会在运行完工作后将中介数据存放到磁盘中,Spark能在数据尚未写入硬盘时即在存储器内分析运算。Spark在存储器内运行程序的运算速度能做到比Hadoop MapReduce的运算速度快上100倍。
Spark 快速入门教程
1. Spark 是什么
Apache Spark 是个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。Spark 正如其名,最大的特点就是快(Lightning-fast),可比 Hadoop MapReduce 的处理速度快 100 倍。如果你熟悉 Hadoop,那么你知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop 使用 HDFS 来解决分布式数据问题,MapReduce 计算范式提供有效的分布式计算。类似的,Spark 拥有多种语言的函数式编程 API,提供了除 map 和 reduce 之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。
2. Spark 核心组件
Spark 库本身包含很多应用元素,这些元素可以用到大部分大数据应用中,其中包括对大数据进行类似 SQL 查询的支持,机器学习和图算法,对实时流数据的支持。具体核心组件如下:
Spark Core:包含 Spark 的基本功能;尤其是定义 RDD 的 API、操作以及这两者上的动作。其他 Spark 的库都是构建在 RDD 和 Spark Core 之上的。
Spark SQL:提供通过 Apache Hive 的 SQL 变体 Hive 查询语言(HiveQL)与 Spark 进行交互的 API。每个数据库表被当做一个 RDD,Spark SQL 查询被转换为 Spark 操作。对熟悉 Hive 和 HiveQL 的人,Spar k可以拿来就用。
Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming 允许程序能够像普通 RDD 一样处理实时数据。
MLlib:一个常用机器学习算法库,算法被实现为对 RDD 的 Spark 操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库 Mahout,将会转到 Spark,并在未来实现。
GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX 扩展了 RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。
由于这些组件满足了很多大数据需求,也满足了很多数据科学任务的算法和计算上的需要,Spark 快速流行起来。不仅如此,Spark 也提供了使用 Scala、Java 和Python 编写的 API;满足了不同团体的需求,允许更多数据科学家简便地采用 Spark 作为他们的大数据解决方案。
3. Spark 体系架构
Spark体系架构包括如下三个主要组件:
- 数据存储
- API
- 管理框架
数据存储:Spark 用 HDFS 文件系统存储数据。它可用于存储任何兼容于 Hadoop 的数据源,包括HDFS,Hbase,Cassandra等。
API:利用 API,应用开发者可以用标准的 API 接口创建基于 Spark 的应用。Spark 提供 Scala,Java 和 Python 三种程序设计语言的 API。
下面是三种语言 Spark API 的网站链接。
Scala API
spark.apache.org/docs/latest…
Java
spark.apache.org/docs/latest…
Python
spark.apache.org/docs/latest…
Spark基本概念:
Application:
用户自己写的 Spark 应用程序,批处理作业的集合。Application 的 main 方法为应用程序的入口,用户通过 Spark 的 API,定义了 RDD 和对 RDD 的操作。SparkContext:
Spark 最重要的 API,用户逻辑与 Spark 集群主要的交互接口,它会和 Cluster Master 交互,包括向它申请计算资源等。Driver 和 Executor:
Spark 在执行每个 Application 的过程中会启动 Driver 和 Executor 两种 JVM 进程。Driver 进程为主控进程,负责执行用户 Application 中的 main 方法,提交 Job,并将 Job 转化为 Task,在各个 Executor 进程间协调 Task 的调度。运行在Worker上 的 Executor 进程负责执行 Task,并将结果返回给 Driver,同时为需要缓存的 RDD 提供存储功能。
资源管理:
一组计算机的集合,每个计算机节点作为独立的计算资源,又可以虚拟出多个具备计算能力的虚拟机,这些虚拟机是集群中的计算单元。Spark 的核心模块专注于调度和管理虚拟机之上分布式计算任务的执行,集群中的计算资源则交给 Cluster Manager 这个角色来管理,Cluster Manager 可以为自带的Standalone、或第三方的 Yarn和 Mesos。
Cluster Manager 一般采用 Master-Slave 结构。以 Yarn 为例,部署 ResourceManager 服务的节点为 Master,负责集群中所有计算资源的统一管理和分配;部署 NodeManager 服务的节点为Slave,负责在当前节点创建一个或多个具备独立计算能力的 JVM 实例,在 Spark 中,这些节点也叫做 Worker。
另外还有一个 Client 节点的概念,是指用户提交Spark Application 时所在的节点。
弹性分布式数据集(RDD):
弹性分布式数据集(RDD)是 Spark 框架中的核心概念。可以将 RDD 视作数据库中的一张表。其中可以保存任何类型的数据。Spark 将数据存储在不同分区上的 RDD 之中。
RDD 可以帮助重新安排计算并优化数据处理过程。
此外,它还具有容错性,因为RDD知道如何重新创建和重新计算数据集。
RDD 是不可变的。你可以用变换(Transformation)修改 RDD,但是这个变换所返回的是一个全新的RDD,而原有的 RDD 仍然保持不变。
RDD 支持两种类型的操作:
变换(Transformation)
变换的返回值是一个新的 RDD 集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个 RDD 作为参数,然后返回一个新的 RDD。 变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。行动(Action)
行动操作计算并返回一个新的值。当在一个 RDD 对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。 行动操作包括:reduce,collect,count,first,take,countByKey 以及 foreach。
4. 安装部署
本文档是对 Spark 的一个快速入门。首先,我们通过 Spark 的交互式 shell 介绍一下 API(主要是 Scala),
更完整参考 programming guide:
spark.apache.org/docs/latest…
(1). 安装前设置
注意:本小节部分,实验楼环境已经配置好,无需配置,可以跳过此步骤。
在安装 Hadoop 之前,需要进入 Linux/Ubuntu 环境下,连接 Linux/Ubuntu 使用 SSH (安全 Shell)。按照仅给出简要步骤设立 Linux/Ubuntu 环境,对于本小节您可以百度。
- 修改主机名
- 修改 IP
- 修改主机名和IP的映射关系
- 关闭防火墙
- 重启 Linux/Ubuntu
- 安装JDK
(2). 安装 Spark
官网下载地址:
spark.apache.org/downloads.h…
本文档选择的是 spark-1.6.1-bin-hadoop2.6
版本,下载地址:
d3kbcqa49mib13.cloudfront.net/spark-1.6.1…
>> cd /home/shiyanlou
>>sudo wget https://d3kbcqa49mib13.cloudfront.net/spark-1.6.1-bin-hadoop2.6.tgz
>> sudo tar zxvf spark-1.6.1-bin-hadoop2.6.tgz
(3). 通过 Spark Shell 进行交互分析
Spark shell 提供了简单的方式来学习 API,也提供了交互的方式来分析数据。Spark Shell 支持 Scala 和 Python,本文档选择使用 Scala 来进行介绍。
Scala:
是一门现代的多范式编程语言,以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。Scala 运行于 Java 平台(JVM,Java 虚拟机),并兼容现有的 Java 程序。Scala 是 Spark 的主要编程语言,如果仅仅是写 Spark 应用,并非一定要用 Scala,用 Java、Python 都是可以的。使用 Scala 的优势是开发效率更高,代码更精简,并且可以通过 Spark Shell 进行交互式实时查询,方便排查问题。
1). 启动 spark shell
>>cd spark-1.6.1-bin-hadoop2.6
>>./bin/spark-shell.sh
Spark 最主要的抽象概念是个分布式集合,也叫作弹性分布式数据集(Resilient Distributed Dataset – RDD)。它可被分发到集群各个节点上,进行并行操作。RDD 可以由 Hadoop InputFormats 读取 HDFS 文件创建得来,或者从其他 RDD 转换得到。下面我们就先利用 Spark 源代码目录下的 README 文件来新建一个 RDD:
我们从 /home/shiyanlou/README.md
文件新建一个 RDD,代码如下:
>>`scala> val textFile = sc.textFile("/home/shiyanlou/README.md")`
> textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:1
下面我们就来演示 count() 和 first() 操作:
>> `scala> textFile.count()`
>
> res0: Long = 95
>>`scala> textFile.first()`
>
> res1: String = # Apache Spark
>
接着演示 transformation,通过 filter transformation 来返回一个新的 RDD,代码如下:
>> `scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))` // 统计包含 Spark 的行数
> linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af39
>
>> `scala> linesWithSpark.count()` // 统计行数
> res2: Long = 17
可以看到一共有 17 行内容包含 Spark,这与通过 Linux 命令 cat /home/shiyanlou/README.md | grep “Spark” -c 得到的结果一致,说明是正确的。action 和 transformation 可以用链式操作的方式结合使用,使代码更为简洁:
>>`scala> textFile.filter(line => line.contains("Spark")).count()` // 统计包含 Spark 的行数
> res3: Long = 17
RDD 的 actions 和 transformations 可用在更复杂的计算中,例如通过如下代码可以找到包含单词最多的那一行内容共有几个单词:
>>`scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)`
> res4: Int = 14
代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到最大的数。map()、reduce() 中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),并且可以使用语言特征或 Scala/Java 的库。例如,通过使用 Math.max() 函数(需要导入 Java 的 Math 库),可以使上述代码更容易理解:
>>`scala> import java.lang.Math`
> import java.lang.Math
>>`scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))`
>
> res5: Int = 16
Hadoop 上的 MapReduce 是大家耳熟能详的一种通用数据流模式。在 Spark 中同样可以实现(下面这个例子也就是 WordCount):
>> `scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) ` // 实现单词统计
> wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:2
>> `scala> wordCounts.collect()` // 输出单词统计结果
> res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)...)
2). RDD 缓存
Spark 也支持在分布式的环境下基于内存的缓存,这样当数据需要重复使用的时候就很有帮助。比如当需要查找一个很小的 hot 数据集,或者运行一个类似 PageRank 的算法。
举个简单的例子,对 linesWithSpark RDD 数据集进行缓存,然后再调用 count() 会触发算子操作进行真正的计算,之后再次调用 count() 就不会再重复的计算,直接使用上一次计算的结果的 RDD 了:
>> `scala> linesWithSpark.cache() `
> res8: spark.RDD[String] = spark.FilteredRDD@17e51082
>> `scala> linesWithSpark.count()`
> res9: Long = 17
>> `scala> linesWithSpark.count()`
> res10: Long = 17
看起来缓存一个100行左右的文件很愚蠢,但是如果再非常大的数据集下就非常有用了,尤其是在成百上千的节点中传输 RDD 计算的结果。
3). Spark SQL 和 DataFrames
Spark SQL 是 Spark 内嵌的模块,用于结构化数据。在 Spark 程序中可以使用 SQL 查询语句或 DataFrame API。DataFrames 和 SQL 提供了通用的方式来连接多种数据源,支持 Hive、Avro、Parquet、ORC、JSON、和 JDBC,并且可以在多种数据源之间执行 join 操作。
下面仍在 Spark shell 中演示一下 Spark SQL 的基本操作,该部分内容主要参考了 Spark SQL、DataFrames 和 Datasets 指南
spark.apache.org/docs/latest…
Spark SQL 的功能是通过 SQLContext 类来使用的,而创建 SQLContext 是通过 SparkContext 创建的。在 Spark shell 启动时,输出日志的最后有这么几条信息:
17/05/25 15:16:34 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
17/05/25 15:16:34 INFO repl.SparkILoop: Created sql context..
SQL context available as sqlContext.
这些信息表明 SparkContent 和 SQLContext 都已经初始化好了,可通过对应的 sc、sqlContext 变量直接进行访问。
使用 SQLContext 可以从现有的 RDD 或数据源创建 DataFrames。作为示例,我们通过 Spark 提供的 JSON 格式的数据源文件 spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.json 来进行演示,该数据源内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
执行如下命令导入数据源,并输出内容:
>>`scala> val df = sqlContext.read.json("home/shiyanlou/spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.json")`
> df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
>> `scala> df.show()` // 输出数据源内容
> +----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
接着,我们来演示 DataFrames 处理结构化数据的一些基本操作:
>> `scala> df.select("name").show() ` // 只显示 "name" 列
> +-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
>> `scala> df.select(df("name"), df("age") + 1).show() ` // 将 "age" 加 1
> +-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
>> `scala> df.filter(df("age") > 21).show() ` //条件语句
> +---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
>> `scala> df.groupBy("age").count().show() ` // groupBy 操作
> +----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
当然,我们也可以使用 SQL 语句来进行操作:
>>`scala> .registerTempTable("people") ` // 将 DataFrame 注册为临时表 people
>> `scala> val result = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")` // 执行 SQL 查询
>> `scala> result.show() ` //输出结果
> +----+-------+
| name | age|
+----+-------+
|Justin| 19|
+----+-------+
更多的功能可以查看完整的 DataFrames API:
spark.apache.org/docs/latest…
此外 DataFrames 也包含了丰富的 DataFrames Function:
spark.apache.org/docs/latest…
5. 总结
Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark 拥有多种语言的函数式编程 API,提供了除 map 和 reduce 之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。这篇文档的目的是帮助你快速入门,完成单机上的 Spark 安装与使用,本文档是 Spark 快速入门,主要介绍了 Spark shell 、RDD、Spark SQL 等,希望对您有所帮助。
更多学习资料请参考:
Spark 集群部署:
spark.apache.org/docs/latest…
Spark 编程指南:
spark.apache.org/docs/latest…
Spark SQL、DataFrames 和 Datasets 指南: