Spark学习笔记

1.Spark简述

Spark通过内存计算能力,急剧的提高大数据处理速度。解决了Hadoop只适合于离线的高吞吐量、批量处理的业务场景的弊端,提出了实时计算的解决方法。

1.1 Spark特点

a.快速处理能力:Hadoop的MapReduce中间数据采用磁盘存储,而Spark优先使用内存避免大量的磁盘IO,极大的提高了计算速度;
b.支持性强:Spark支持Java、Scala、Python等;
c.可查询:Spark支持SQL;
d.支持流计算:Spark支持实时的流计算;
e.可用性高:Spark的Standalone模式支持多Master,避免了单点故障;
f.Spark支持的数据源多:Spark支持HDFS、Hbase、Hive、Cassandra等,方便数据迁移。

1.2 Spark术语

a.RDD(Resillient Distributed Dataset) :弹性分布式数据集,程序可以根据需要,将RDD的Partition的个数进行增加和减少(一个Partition对应一个task),以提高执行效率。而且RDD支持容错,如果有失效的RDD,那么Spark可以从父RDD重新生成子RDD;
b.Task:执行任务,有ShuffleMapTask(对应Hadoop的Map)和ResultTask(对应Hadoop的Reduce);
c.Job:程序提交的作业,由task构成;
d.Stage:Job的分段,一个Job划分成多个Stage;
e.Partition: 数据分区,一个RDD可以后多个分区构成;
f.Narrow依赖:即窄依赖,子RDD的Partition依赖父RDD中固定的一个Partition,例如map、filter、union等操作会产生窄依赖;
g.Shuffle依赖:即宽依赖,子RDD的Partition依赖父RDD中的所有Partition,例如groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle;
h.DAG(Directed Acycle Graph): 记录RDD之前依赖关系的有向无环图。
i.算子:是指对RDD的运算,可以理解为Spark处理RDD 的函数
算子的分类:

i.1:Value型Transformation算子:

针对处理的数据项是value,例如一对一型(map,flatMap,mapPartition,glom),多对一型(union,cartesian),多对多型(groupBy),输出分区为输入分区子集型(filter,distinct,subtract,sample,takeSample),Cache型(cache:将RDD从磁盘缓存到内存,persist:将RDD从进行缓存,可以缓存到内存或磁盘)。

i.2:Key-Value型Transformation算子:

针对处理的数据项是Key-Value形式的算子。例如一对一型(mapValues),聚集(combineByKey,reduceByKey,partitionBy,cogroup),连接(join,leftOutJoin,rigthOutJoin)。

i.3:Action算子:

Action算子是指通过SparkContext执行提交作业的runJob操作,触发RDD的DAG的执行。例如无输出(foreach),HDFS(saveAsTextFile,saveAsObjectFile),Scala集合型(collect,collectAsMap,reduceByKeyLocally,lookup,count,top,reduce,fold,aggregate)。

2.Spark的基本架构

《Spark学习笔记》 Spark基本架构.jpg

过程说明:
Client 提交应用(由Spark Action算子触发),Cluster Manager找到一个Worker启动Driver,Driver想Cluster Manager申请资源,然后将应用转换为RDD DAG,再有DAG Scheduler 将RDD DAG 转化为Stage DAG(一个Stage由一组相同的task集合构成),然后提交给TaskScheduler,由TaskScheduler将task(每个task对应一个Partition)交给Executor执行。

2.1 Spark的task提交过程

《Spark学习笔记》 Spark-task-提交过程.jpg

2.2 Job,Stage,task的划分

《Spark学习笔记》 Stage划分.jpg

说明:

每个Action算子是一个job,一个job由shuffle(宽依赖)分割成多个Stage,一个Stage内有多少个Partition就产生多少个task。故一个job中task的数量 = stage数量 * 每个stage的task数量。

由shuffle宽依赖划分Stage的原因:

shuffle宽依赖中子RDD的Partition会依赖父RDD的多个Partition,这样就会出现一些父Partition没有准备好数据,导致不能继续计算,直到父RDD的所有Partition都准备好了,才能够执行将父RDD转换为子RDD的计算,而且往往需要跨节点数据传输。而窄依赖是父RDD的一个Partition决定了子RDD的一个Partition,直接计算就可以了。另外,在数据恢复时,窄依赖只需要重新执行丢失子RDD的Partition的父RDD的Partition即可,而shuffle宽依赖需要考虑通过恢复所有父RDD的Partition,然后通过计算再获得子RDD的Partition。

3.Spark的Shuffle过程

3.1 我们先来看看Hadoop的Shuffle过程:

《Spark学习笔记》 Hadoop-Shuffle.jpg

说明:

Reduce获得Map的中间输出结果后,会对这些数据在磁盘上进行merge sort,需要大量的IO。

3.2 Spark基于Hash的Shuffle:

《Spark学习笔记》 Spark-Hash-Shuffle.jpg

说明:

在Hash Shuffle的时候,每个Mapper会根据Reduce的个数创建相应的bucket,故bucket的总个数是Mapper个数 * Reduce的个数。相比Hadoop的Shuffle,Hash Shuffle避免了不必要的排序。

缺点:

Mapper个数和Reduce的个数比较大时,该Shuffle会生成大量的bucket文件,不但对系统产生很大压力,也影响了IO吞吐量。另外,Map的中间结果是首先保存到内存中的,然后再写入磁盘,对内存容量要求比较高。

3.3 Spark基于Hash的Shuffle的优化

《Spark学习笔记》 Spark-Hash-Shuffle的优化.jpg

说明:

使用Consolidation机制,将一个core的所有MapTask的输出到一个ShuffleBlockFile文件中,不同的MapTask输出到不同的ShuffleBlockFile的Segment中。

3.4 Spark基于sort的Shuffle

《Spark学习笔记》 Spark-sort-shuffle.jpg

    原文作者:zhglance
    原文地址: https://www.jianshu.com/p/552f6c605dec
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注