第一次接触Spark,自己整理了(从网络,书籍,同事那里)一些Spark的相关内容当做笔记。路过的朋友仅供参考,不能保证说得都对。
什么是 Spark
简单来说,Spark是一种面向对象、函数式编程语言。Spark能够像操作本地集合对象一样轻松地操作分布式数据集。它具有运行速度快、易用性好、通用性强和随处运行等特点。
Spark提供了支持Java、scala、Python以及R语言的API。还支持更高级的工具如:Spark Sql、Spark Streaming、MLlib、GraphX等。
官方介绍 :Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
百度百科:Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎
Spark 有什么特点
以下摘自百度百科
更快的速度。内存中计算, 比 Hadoop 快100倍。
易用性。Spark 提供了80多个高级运算符。
通用性。Spark 提供了大量的库,包括SQL、DataFrames、MLlib、GraphX、Spark Streaming。 开发者可以在同一个应用程序中无缝组合使用这些库。
支持多种资源管理器。Spark 支持 Hadoop YARN,Apache Mesos,及其自带的独立集群管理器
更多详细介绍可参考:https://blog.csdn.net/xwc35047/article/details/51072145
什么是 RDD
在我使用Spark的过程中,用到最多的对象就是RDD,比如JavaRDD、JavaPairRDD。然后RDD之间又可以互相转化。这个RDD是个啥?
RDD全文是 Resilient Distributed DataSet(弹性·分布式·数据集)。
RDD是一个只读的、可分区的、支持多种来源 、有容错机制 、可以被缓存 、支持并行操作的分布式数据集,可以装载任何你想装载的数据。他的弹性特点体现在RDD的数据可以在内存与磁盘(外存)灵活交换。
Spark 模型
再来认识一下下面几个重要概念
Application。也就是我们编写完Spark程序,负责生成SparkContext。
Job。所谓 job,就是由一个 rdd 的 action算子(后面再说action) 触发的动作,可以简单的理解为,当你需要执行一个 rdd 的 action 的时候,会生成一个 job。
Stage。stage 是一个 job 的组成单位,就是说,一个 job 会被切分成 1 个或多个 stage,然后各个 stage 会按照执行顺序依次执行。
Task。stage 下的一个任务执行单元,一般来说,一个 rdd 有多少个 partition(分区,后面再说partition),就会有多少个 task,因为每一个 task 只是处理一个 partition 上的数据。
简单来说就是以RDD为基准,每触发一个action操作,就会生成一个job。job内部有一个或多个stage顺序执行,组成stage的是一系列task,即任务执行单元。
关于Partition分区。Spark RDD主要由Dependency、Partition、Partitioner组成,Partition是其中之一。一份待处理的原始数据会被按照相应的逻辑切分成n份,每份数据对应到RDD中的一个Partition,Partition的数量决定了task的数量,影响着程序的并行度。
关于Stage。Stage以shuffle和result这两种类型来划分。Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。
那么刚刚提到的action又是什么?我们再来了解一下RDD操作算子。
什么是 RDD 操作算子
RDD有两种操作算子:Transformation(转换) 和 Action(执行)
Transformation。即一个rdd数据集经过数据转换变成一个新的rdd数据集。常用的Transformation操作有:map、filter、union、distinct、groupByKey 等。Transformation 属于延迟计算,当触发Transformation算子时rdd并没有立即进行转换,仅仅是记住了数据集的逻辑操作。
Action。触发Spark作业的运行,真正触发转换算子的计算。常用的操作有:reduce、collect、count、countByKey等等。
什么是 Shuffle
以下摘自官网
Shuffle 即洗牌。以reduceByKey 操作来说,reduceByKey操作生成一个新的RDD,其中相同key的所有值都组合为一个元组——key和reduce函数的结果。但是,并非所有相同key的值都必须位于同一个分区甚至是同一台计算机上,然而它们必须位于同一位置才能计算结果。
在Spark中,数据通常不会根据特定的操作在必要位置进行跨分区分布。在计算过程中,单个任务在单个分区上运行。因此,要执行reduceByKey任务的所有数据,Spark需要执行全部操作。它必须从所有分区中读取数据以找到单个key的所有值,然后将各分区中的值汇总以计算每个key的最终结果 – 这称为洗牌。
什么是窄依赖、宽依赖
窄依赖。指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD 的分区。
宽依赖。指子RDD的分区依赖于父RDD的所有分区。
举例
下面通过一个例子,尽量把我所理解的那部分通过这个小例子表达出来,不保证说的都对。
统计某高中今年参加高考的男生人数
首先我们需要将数据源读取到Spark RDD中(先不管如何读取),一个数据源只生成一个rdd。
rdd内部会按照一定的逻辑分割成n个partition分区,分区数也可以自己指定,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目,每个分区由一个task 执行。
rdd先执行filter 操作即Transformation算子。将参加高考的,性别为男性的对象过滤出来。但Transformation 是惰性的,不会立刻触发spark 作业。
过滤后的rdd 需要进行reduce操作即Action算子。此时触发spark 作业。每个action将生成一个Job。
Job 包含stage,stage有两种:shuffle和result,取决于算子的执行逻辑。如果一个job中有宽依赖,即有shuffle操作,shuffle之前的生成一个shuffle stage。shuffle之后的生成一个result stage。
每个stage 都是一组task 在执行,task 取决于分区数。
reduce 过程将符合条件的学生数计数并返回。
代码示例
package com.yzy.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SparkDemo {
private static String appName = "spark.demo";
private static String master = "local[*]";
public static void main(String[] args) {
JavaSparkContext sc = null;
try {
//初始化JavaSparkContext
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
sc = new JavaSparkContext(conf);
// 生成数据源
List<Student> data = getList();
//生成rdd
JavaRDD<Student> rdd = sc.parallelize(data);
//过滤符合条件的数据
rdd = rdd.filter(new Function<Student, Boolean>() {
public Boolean call(Student s) throws Exception {
return s.isGaoKao() && s.getSex().equals("男");
}
});
// map && reduce
Student result = rdd.map(new Function<Student, Student>() {
public Student call(Student s) throws Exception {
s.setCount(1);
return s;
}
}).reduce(new Function2<Student, Student, Student>() {
public Student call(Student s1, Student s2) throws Exception {
s1.setCount(s1.getCount() + s2.getCount());
return s1;
}
});
System.out.println("执行结果:" + result.getCount());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (sc != null) {
sc.close();
}
}
}
public static List<Student> getList(){
List<Student> data = new ArrayList<Student>();
data.add(new Student(true,"男", "A"));
data.add(new Student(false,"女", "B"));
data.add(new Student(false,"男", "C"));
data.add(new Student(true,"女", "D"));
data.add(new Student(true,"男", "E"));
data.add(new Student(false,"女", "F"));
data.add(new Student(true,"男", "G"));
return data;
}
static class Student implements Serializable{
private String name;
private boolean gaoKao;
private String sex;
private int count;
public Student(boolean gaoKao, String sex, String name) {
this.gaoKao = gaoKao;
this.sex = sex;
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isGaoKao() {
return gaoKao;
}
public void setGaoKao(boolean gaoKao) {
this.gaoKao = gaoKao;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
}
控制台输出
//省略若干行
18/06/22 18:42:25 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 41 ms on localhost (executor driver) (1/4)
18/06/22 18:42:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 58 ms on localhost (executor driver) (2/4)
18/06/22 18:42:25 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 44 ms on localhost (executor driver) (3/4)
18/06/22 18:42:25 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 44 ms on localhost (executor driver) (4/4)
18/06/22 18:42:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/06/22 18:42:25 INFO DAGScheduler: ResultStage 0 (reduce at SparkDemo.java:44) finished in 0.219 s
执行结果:3
请注意:实例中的Student 类必须序列化,否则会报错!