Apache Spark 简介
Apache Spark 是什么
Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群上的并行程序的编写。该框架对资源调度,任务的提交、执行和跟踪,节点间的通信以及数据并行处理的内在底层操作都进行了抽象。它提供了一个更高级别的API用于处理分布式数据。下面的引用是Apache Spark自己的说明。
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Apache Spark 起源
Spark起源于加利福利亚大学伯克利分校的一个研究项目。学校当时关注分布式机器学习算法的应用情况。因此,Spark从一开始便为应对迭代式应用的高性能需求而设计。在这类应用中,相同的数据会被多次访问。该设计主要靠利用数据集内存缓存以及启动任务时的低延迟和低系统开销来实现高性能。再加上其容错性、灵活的分布式数据结构和强大的函数式编程接口,Spark在各类基于机器学习和迭代分析的大规模数据处理任务上有广泛的应用,这也表明了其实用性。
Apache Spark 运行模式
Apache Spark共支持四种运行模式,每种模式各有其特点,为了方便起见,本文基于的运行模式为本地单机模式。
- 本地单机模式:所有Spark进程都运行在同一个Java虚拟机(Java Vitural Machine,JVM)中
- 集群单机模式:使用Spark自己内置的任务调度框架
- 基于Mesos:Mesos是一个流行的开源集群计算框架
- 基于YARN:即Hadoop 2,它是一个与Hadoop关联的集群计算和资源调度框架
Apache Spark 环境搭建
Spark能通过内置的单机集群调度器来在本地运行。此时,所有的Spark进程运行在同一个Java虚拟机中。这实际上构造了一个独立、多线程版本的Spark环境。本地模式很适合程序的原型设计、开发、调试及测试。同样,它也适应于在单机上进行多核并行计算的实际场景。
本地构建Spark环境的第一步是下载其版本包, 本文以spark-1.6.1-bin-hadoop2.4.tgz为例进行安装演示。下载完上述版本包后,解压,并在终端进入解压时新建的主目录。Spark的运行依赖Scala编程语言,好在预编译的二进制包中已包含Scala运行环境,我们不需要另外安装Scala便可运行Spark。但是,JRE(Java运行时环境)或JDK(Java开发套件)是要安装的。
>tar xfvz spark-1.6.1-bin-hadoop2.4.tgz
>cd spark-1.6.1-bin-hadoop2.4
用户运行Spark的脚本在该目录的bin目录下。我们可以运行Spark附带的一个示例程序来测试是否一切正常:
>./bin/run-example org.apache.spark.examples.SparkPi
该命令将在本地单机模式下执行SparkPi这个示例。在该模式下,所有的Spark进程均运行于同一个JVM中,而并行处理则通过多线程来实现。默认情况下,该示例会启用与本地系统的CPU核心数目相同的线程。示例运行完,应可在输出的结尾看到类似如下的提示,
Pi is roughly 3.14248
Apache Spark 基本概念
前一部分介绍了Apache Spark的安装过程,接下来我们一起体验下在Spark上编程的乐趣。就像之前介绍的,Spark支持多种编程语言,包括Java、Scala、Python 和 R等。接下来首先介绍下Spark的编程模型,然后通过使用这四种不同的语言来演示Spark的编程运行过程。
Spark 编程模型
任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的,SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数(比如主节点的URL)。初始化后,我们便可用SparkContext对象所包含的各种方法来创建和操作分布式数据集和共享变量。Spark shell(在Scala和Python下可以,但不支持Java)能自动完成上述初始化。
Spark Shell
Spark支持用Scala或Python REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程序编写。由于输入的代码会被立即计算,shell能在输入代码时给出实时反馈。在Scala shell里,命令执行结果的值与类型在代码执行完后也会显示出来。
要想通过Scala来使用Spark shell,只需从Spark的主目录执行./bin/spark-shell。它会启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个对象。
要想在Python shell中使用Spark,直接运行./bin/pyspark命令即可。与Scala shell类似, Python下的SparkContext对象可以通过Python变量sc来调用。
弹性分布式数据集(RDD)
上文提到的分布式数据集其实就是指RDD。RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。一个RDD代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。Spark中的RDD具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。
创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。
Apache Spark 编程入门
下面我们通过依次用Java、Python等种语言来编写一个简单的Spark数据处理程序。假设一存在一个名为UserPurchaseHistory.csv的文件,内容如下所示。文件的每一行对应一条购买记录,从左到右的各列值依次为客户名称、商品名以及商品价格。
John,iPhone Cover,9.99
John,Headphones,5.49
Jack,iPhone Cover,9.99
Jill,Samsung Galaxy Cover,8.95
Bob,iPad Cover,5.49
Spark 程序实例(Java)
/**
* Created by hackx on 9/11/16.
*/
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/**
* 用Java编写的一个简单的Spark应用
*/
public class JavaApp {
public static void main(String[] args) {
/*正如在Scala项目中一样,我们首先需要初始化一个上下文对象。值得注意的是,
这里所使用的是JavaSparkContext类而不是之前的SparkContext。类似地,调用
JavaSparkContext对象,利用textFile函数来访问数据,然后将各行输入分割成
多个字段。请注意下面代码的高亮部分是如何使用匿名类来定义一个分割函数的。
该函数确定了如何对各行字符串进行分割。*/
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
// 将CSV格式的原始数据转化为(user,product,price)格式的记录集
JavaRDD data = sc.textFile("data/UserPurchaseHistory.csv").map(new Function<String, String[]>() {
public String[] call(String s) throws Exception {
return s.split(",");
}
});
/*现在可以算一下用Scala时计算过的指标。这里有两点值得注意的地方,一是
下面Java API中有些函数(比如distinct和count)实际上和在Scala API中
一样,二是我们定义了一个匿名类并将其传给map函数。匿名类的定义方式可参
见代码的高亮部分。*/
// 求总购买次数
long numPurchases = data.count();
// 求有多少个不同客户购买过商品
long uniqueUsers = data.map(new Function<String[], String>() {
public String call(String[] strings) throws Exception {
return strings[0];
}
}).distinct().count();
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
}
}
Spark 程序实例(Python)
"""用Python编写的一个简单Spark应用"""
from pyspark import SparkContext
sc = SparkContext("local[2]", "First Spark App")
# 将CSV格式的原始数据转化为(user,product,price)格式的记录集
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line:
line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# 求总购买次数
numPurchases = data.count()
# 求有多少不同客户购买过商品
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# 求和得出总收入
totalRevenue = data.map(lambda record: float(record[2])).sum()
# 求最畅销的产品是什么
products = data.map(lambda record: (record[1], 1.0)).
reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]
print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])
运行该脚本的最好方法是在脚本目录下运行如下命令:
>$SPARK_HOME/bin/spark-submit pythonapp.py
参考资料
Spark官网
Spark机器学习