Spark

什么是Spark

是一个大规模数据处理的统一分析引擎

Spark的四大特性

速度快

编程简单

通用性

到处运行

SparkConf对象

配置运行模式 local 本地。standalone spark自带的分布式计算框架。 Mesos 资源调度框架。

配置Spark Application的名称

配置Spark Application运行的资源

SparkContext对象

Spark中通往集群的唯一通道

RDD

Resilient Distributed Dateset 弹性分布式数据集

五大特性

RDD是由多个partition组成

算子(函数)作用在partition上

RDD之间有依赖关系

分区器是作用在K,V格式的RDD上

partition对外提供最佳的计算位置,利于数据处理的本地化

注意:什么是K,V格式的RDD:RDD的元素是二元组,那么这个RDD就是K,V格式

sc.textFile()底层调用的是MR读取hdfs上文件的方法 :mr是一个split一个block ,spark是一个split一个partition

RDD的分布式:体现在partition分布在多个节点上

RDD的的容错:RDD的之间的依赖关系,RDD的partition可多可少

Spark中的常用算子

Spark代码流程

1).创建SparkConf : val conf = new SparkConf() ; conf.setMaster.. ,conf.setAppName…

2).创建SparkContext :val sc = new SparkContext(conf)

3).获取RDD:val rdd = sc.textFile…

4).使用Transformation类算子对数据进行转换

5).使用Action算子触发Transformation类算子执行

6).sc.stop()

Transformation类算子:(转换型)【懒执行的,需要Action算子触发】

map,flatMap,reduceByKey,sortBy,SortByKey,filter,sample

join,leftOuterJoin,rightOuterJoin,fullOuterJoin 作用在K,V格式的RDD上。根据K进行连接,对(K,V)join(K,W)返回(K,(V,W)),join后的分区数与父RDD分区数多的那一个相同。

union 合并两个数据集。两个数据集的类型要一致 返回新的RDD的分区数是合并RDD分区数的总和

intersection 取两个数据集的交集,返回新的RDD与父RDD分区多的一致

subtract 取两个数据集的差集,结果RDD的分区数与subtract前面的RDD的分区数一致。

mapPartitions 与map类似,遍历的单位是每个partition上的数据。

mapPartitoinWithIndex 可以获取RDD每个分区索引

repartition : 重分区,可以将RDD的分区增大,也可以减少,会产生Shuffle(一般用来增加分区)

caolesce :重分区,可以将RDD的分区增大,也可以将RDD的分区减少,默认不产生shuffle. 由少的分区分到多的分区的时候,指定不产生shuffle,这种模式是不起作用的.(一般用来减少分区)要增加分区 要在分区数后加参数ture

groupByKey :按照key来分组

distinct(map+reduceByKey+map) 去重

zip & zipWithIndex :zip 两个集合组合,元素个数要一致 zipWithIndex 一个集合的元素和他的下标组合

cogroup 当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>)),子RDD的分区与父RDD多的一致。

Action类算子:【触发Transformation类算子执行,有一个Action算子,就有一个job】foreach,take,first,count,collect

foreachPartition 遍历的数据是每个partition的数据。

countByKey & countByValue:(value是把一个Tuple整体当做value)

持久化算子

cache,persist,checkPoint

cache:

默认将数据存储在内存中。cache() = persist() = persist(StorageLevel.MEMORY_ONLY)

persist:

可以手动指定持久化级别。

常用级别:

MEMORY_ONLY

MEMORY_ONLY_SER

MEMORY_AND_DISK

MEMORY_AND_DISK_SER

尽量少用DISK_ONLY和“_2”级别

1).cache和persist都是懒执行,持久化的单位是partition

2).对一个RDD进行 cache或者persist时 ,可以赋值给一个变量/也可以不赋值给一个变量,下个job中使用这个变量就是使用的持久化的数据。

3).如果使用第二种方式,对RDD持久化之后,后面不能紧跟action算子。紧跟:lines.persist().count()

4).持久化的数据是由spark自动存储,这个数据当Spark Application停止之后,自动清除。

checkPoint:

checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。

执行原理

当RDD的job执行完毕后,会从finalRDD从后往前回溯。当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记

Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。

优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。

Spark集群搭建

三到四台

一台Master 两台Worker 一台客户端

环境 : jdk1.8 , hadoop-env-sh 的JAVA_HOME

解压 spark压缩包

配置salves

node2

node3

配置spark-evn-sh

export SPARK_MASTER_HOST=node1

export SPARK_MASTER_PORT=7077

export SPARK_WORKER_CORES=2

export SPARK_WORKER_MEMORY=2g

启动 在sbin目录下 ./start-all.sh

客服端 :需要配置spark-evn-sh

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

Spark任务的四种类型提交流程

《Spark》
《Spark》
《Spark》
《Spark》
《Spark》
《Spark》
《Spark》
《Spark》

术语

《Spark》
《Spark》

资源层面:Master ->Worker -> Executor -> ThreadPool

Master管理Worker,Woker启动Executor,Executor管理ThreadPool

任务层面:Application -> job -> stage -> tasks

Application有多个job依次执行,每个job按照RDD的宽窄依赖分成多个Stage,每个Stage中又有多组Task

RDD宽窄依赖

RDD宽依赖:

父RDD与子RDD partition之间的关系是一对多

RDD窄依赖:

父RDD与子RDD partition之间的关系是一对一

父RDD与子RDD partition之间的关系是多对一

stage划分

1.stage划分按照RDD的宽依赖划分,有宽依赖就有shuffle

2.stage是由多组并行的task组成

3.stage并行度是由stage中finalRDD的partition个数决定的

4.stage的计算模式 : pipeline计算模式【迭代器模式】

5.如何增大stage的并行度?reduceByKey(xx,numpartition)

Spark的资源调度和任务调度

《Spark》
《Spark》

1-6 资源申请

1.启动集群,Worker向Master会把奥中资源信息

2.Master掌握了集群资源信息

3.客服端提交任务(一个Application),

Master会启动Driver。(根据提交的模式,有的是在客服端,有的是在集群其他资源满足条件的节点启动Driver)

4.Driver端向Master启动申请资源

5.Master收到申请后,找到满足资源的Worker启动Executor

6.Executor反向注册向给Driver

7-11任务调度

7.Action算子触发任务执行 。Driver端创建两个对象:DAGScheduler和TaskScheduler

8.DAGScheduler的主要作用就是将DAG根据RDD之间的宽窄依赖关系划分为一个个的Stage,然后将这些Stage以TaskSet的形式提交给TaskScheduler

9.TaskSchedule会遍历TaskSet集合,TaskScheduler是任务调度的低层调度器,这里TaskSet其实就是一个集合,里面封装的就是一个个的task任务,也就是stage中的并行度task任务

10.TaskSchedule拿到每个task后会将task发送到计算节点Executor中去执行(其实就是发送到Executor中的线程池ThreadPool去执行)。

11.task在Executor线程池中的运行情况会向TaskScheduler反馈,当task执行失败时,则由TaskScheduler负责重试,将task重新发送给Executor去执行,默认重试3次。如果重试3次依然失败,那么这个task所在的stage就失败了。stage失败了则由DAGScheduler来负责重试,重新发送TaskSet到TaskSchdeuler,Stage默认重试4次。如果重试4次以后依然失败,那么这个job就失败了。job失败了,Application就失败了。

注意

TaskScheduler不仅能重试失败的task,还会重试straggling(落后,缓慢)task(也就是执行速度比其他task慢太多的task)。如果有运行缓慢的task那么TaskScheduler会启动一个新的task来与这个运行缓慢的task执行相同的处理逻辑。两个task哪个先执行完,就以哪个task的执行结果为准。这就是Spark的推测执行机制。在Spark中推测执行默认是关闭的。推测执行可以通过spark.speculation属性来配置。

另外

对于ETL类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。

如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有task重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。

粗粒度资源申请和细粒度资源申请

粗粒度资源申请(Spark)

在Application执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task执行完成后,才会释放这部分资源。

优点:在Application执行之前,所有的资源都申请完毕,每一个task直接使用资源就可以了,不需要task在执行前自己去申请资源,task启动就快了,task执行快了,stage执行就快了,job就快了,application执行就快了。

缺点:直到最后一个task执行完成才会释放资源,集群的资源无法充分利用。

细粒度资源申请(MapReduce)

Application执行之前不需要先去申请资源,而是直接执行,让job中的每一个task在执行前自己去申请资源,task执行完成就释放资源。

优点:集群的资源可以充分利用。

缺点:task自己去申请资源,task启动变慢,Application的运行就相应的变慢了。

    原文作者:我是你的小星星
    原文地址: https://zhuanlan.zhihu.com/p/67062108
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞