spark大数据架构初学入门基础详解

Spark是什么

a) 是一种通用的大数据计算框架

b) Spark Core 离线计算

Spark SQL 交互式查询

Spark Streaming 实时流式计算

Spark MLlib 机器学习

Spark GraphX 图计算

c) 特点:

i. 一站式:一个技术堆栈解决大数据领域的计算问题

ii. 基于内存

d) Spark2009年诞生于伯克利大学的AMPLab实验室

2010年正式开源了Spark项目

2013年Spark成为Apache下的项目

2014年飞速发展,成为Apache的顶级项目

2015年在国内兴起,代替mr,hive,storm等

作者:辛湜(shi)

e) Spark和Hive:

Spark优点:

i. 速度快

ii. Spark SQL支持大量不同的数据源

f) Spark 和Storm

i. 计算模型不一样

ii. Spark吞吐量大

g) 特点:快,易用,通用,兼容性

h) spark运行模式

i. local(本地)

ii. standalone(集群)

iii. on yarn(由 yarn作为资源调度Spark负责任务调度和计算)

iv. on mesos(由mesos作为资源调度S)

v. on cloud()

i) 配置步骤

=======================on yarn====================

【说明】

1. spark任务运行在yarn上,由yarn来进行资源调度和管理,spark只负责任务的调度 和计算

2.不需要配置和启动spark集群

3.只需要在提交任务的节点上安装并配置spark on yarn 模式

4.必须找一台节点安装spark

5. 步骤:

i.安装配置JDK

ii. vi spark-env.sh

1. export  JAVA_HOME=/opt/modules/jdk1.7_6.0

2. export  HADOOP_CONF_DIR = /opt/modules/hadoop/etc/hadoop

iii.测试spark on yarn 模式是否安装成功

iv.网络测试:http://hadoop-yarn1.beicai.com:8088

=====================sdandalone模式==============

【说明】

1.  spark运行在spark 集群上,由spark进行资源调度管理,同时还负责任务的调度和 计算

2.需要配置和启动spark集群

3. 步骤:

i.安装配置JDK

ii.上传并解压Spark

iii.建立软连接 ln -s spark spark 或者修改名称

iv. 配置环境变量

v.安装配置Spark,修改spark配置文件(spark-env.sh, slaves)

1. vi spark-env.sh

a) export  JAVA_HOME=/opt/modules/jdk(jdk位置)

b) export SPARK_MASTER_IP=hadoop-yarn1.beicai.com

c) export SPARK_MASTER_PORT=7077

2.  vi slaves(用于指定在哪些节点上启动worker)

a) hadoop-yarn2.beicai.com

hadoop-yarn3.beicai.com

vi.将spark发送给其他主机

vii. 启动

/opt/modules/spark/bin/start-all.sh

vii. 查看SparkUI界面:http://hadoop-yarn1.beicai.com:8080

4. 

j) 

一、Spark原理

1、Spark的运行原理

i、分布式

Ii、主要基于内存(少数情况基于磁盘)

Iii、迭代式计算

2、Spark 计算模式 VS  MapReduce  计算模式对比

Mr这种计算模型比较固定,只有两种阶段,map阶段和reduce阶段,两个阶段结束 后,任务就结束了,这意味着我们的操作很有限,只能在map阶段和reduce阶段, 也同时意味着可能需要多个mr任务才能处理完这个job

Spark 是迭代式计算,一个阶段结束后,后面可以有多个阶段,直至任务计算完 成,也就意味着我们可以做很多的操作,这就是Spark计算模型比mr 强大的地方

三、什么是Spark RDD?

1、什么是RDD?

弹性的,分布式的,数据集

(RDD在逻辑上可以看出来是代表一个HDFS上的文件,他分为多个分区,散落 在Spark的多个节点上)

3、RDD—-弹性

当RDD的某个分区的数据保存到某个节点上,当这个节点的内存有限,保存不了这个 分区的全部数据时,Spark就会有选择性的将部分数据保存到硬盘上,例如:当worker 的内存只能保存20w条数据时,但是RDD的这个分区有30w条数据,这时候Spark就 会将多余的10w条数据,保存到硬盘上去。Spark的这种有选择性的在内存和硬盘之间的权衡机制就是RDD的弹性特点所在

4、Spark的容错性

RDD最重要的特性就是,提供了容错性,可以自动的从失败的节点上恢复过来,即如 果某个节点上的RDD partition(数据),因为节点的故障丢了,那么RDD会自动的通过 自己的数据来源重新计算该partition,这一切对使用者来说是透明的

2、Spark的开发类型

(1)、核心开发:离线批处理 / 演示性的交互式数据处理

(2)、SQL查询:底层都是RDD和计算操作

(3)、底层都是RDD和计算操作

(4)、机器学习

(5)、图计算

3、Spark 核心开发(Spark-core == Spark-RDD)步骤

(1)、创建初始的RDD

(2)、对初始的RDD进行转换操作形成新的RDD,然后对新的RDD再进行操作,直 至操作计算完成

(3)、将最后的RDD的数据保存到某种介质中(hive、hdfs,MySQL、hbase…)

五、Spark原理

Driver,Master,Worker,Executor,Task各个节点之间的联系

Spark中的各节点的作用:

1、driver的作用:

(1)、 向master进行任务的注册

(2)、构建运行任务的基本环境

(3)、接受该任务的executor的反向注册

(4)、向属于该任务的executor分配任务

2、什么是driver?

我们编写的程序打成jar包后,然后找一台能够连接spark集群的节点做任务的driver,具体的表现为SparkSubmit

3、Master的作用:

(1)、监控集群;

(2)、动态感知worker的上下线;

(3)、接受driver端注册请求;

(4)、任务资源的调度

4、Worker的作用:

(1)、定时向master汇报状态;

(2)、接受master资源调度命令,进行资源的调度

(3)、启动任务的容器Executor

5、Executor的作用:

(1)、保存计算的RDD分区数据;

(2)、向Driver反向注册;

(3)、接受Driver端发送来的任务Task,作用在RDD上进行执行

Spark 编程的流程:

1、我们编写的程序打包成jar包,然后调用Spark-Submit 脚本做任务的提交

2、启动driver做任务的初始化

3、Driver会将任务极其参数(core,memory,driver相关的参数)进行封装成ApplicationDescript通过taskSchedulerImpl 提交给Master

4、Master接受到driver端注册任务请求时,会将请求参数进行解析,并封装成APP,然后进行持久化,并且加入到其任务队列中的waitingAPPs

5、当轮到咱们提交的任务运行时,master会调用schedule()这个方法,做任务资源调度

6、Master将调度好的资源封装成launchExecutor,发送给指定的worker

7、Worker接收到发送来的launchExecutor时,会将其解析并封装成ExecutorRunner,然后调用start方法,启动Executor

8、Executor启动后,会向任务的Driver进行反向注册

9、当属于这个任务的所有executor启动成功并反向注册完之后,driver会结束SparkContext对象的初始化

10、当sc 初始化成功后,意味着运行任务的基本环境已经准备好了,driver会继续运行我们编写好的代码

11、开始注册初始的RDD,并且不断的进行转换操作,当触发了一个action算子时,意味着触发了一个job,此时driver就会将RDD之间的依赖关系划分成一个一个的stage,并将stage封装成taskset,然后将taskset中的每个task进行序列化,封装成launchtask,发送给指定的executor执行

12、Executor接受到driver发送过来的任务task,会对task进行反序列化,然后将对应的算子(flatmap,map,reduceByKey。。。。)作用在RDD分区上

六、RDD详解

1、什么是RDD?

RDD(Resilient Disttibuted Dataset)叫做弹性的分布式的数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可并行计算的集合

2、RDD的特点:

自动容错

位置感知性调度

伸缩性

3、RDD的属性:

(1)、一组分片(partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度,用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU Core的数目

(2)、一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现computer函数以达到这个目的。Computer函数会对迭代器进行复合,不需要保存每次计算的结果。

(3)、RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

(4)、一个partition,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于hashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了partition RDD Shuffle输出时的分片数量。

(5)、一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFD文件来说。这个列表保存的就是每个Partition所在的快的位置。按照“移动数据不如移动计算”的理念。Spark在进行任务调度的时候,会尽可能的将计算任务分配到所要处理数据块的存储位置。

4、RDD的创建:

进行Spark核心编程时,首先要做的事就是创建一个初始的RDD。Spark Core提供了三种创建RDD的方式:

(1)、使用程序中的集合创建RDD (调用parallelize()方法)

(2)、使用本地文件创建RDD(调用textFile()方法)

(3)、使用HDFD文件创建RDD  (调用textFile()方法)

七、算子

1、什么是算子?

是RDD中定义的作用在每一个RDD分片上的函数,可以对RDD中的数据进行转换 和操作

2、RDD算子的分类

(1)、Transformation算子,这类算子变换不触发提交作业(特点就是lazy特性)

返回的是一个RDD

(2)、Action算子,这类算子会触发SparkContext提交作业(触发一个spark job的运行,从而触发这个action之前所有的transformation的执行)

返回的是一个spark对象

3、常用的Transformation算子

八、RDD分区排序

I、分区

两种实现方式:coalesce和 repartition(底层调用coalesce)

coalesce(numPartitons,isShuffle)

第一个参数是重分区后的数量,第二个参数是是否进行shuffle

如果原来有N个分区,重分区后有M个分区

如果 M > N ,必须将第二参数设置为true(也就是进行shuffle),等价于 repartition(numPartitons)    如果是false将不起作用  

如果M < N

100–>10 重分区后的分区数比原来的小的多,那么久需要使用shuffle,也即是设置为true

100–>90 重分区后的分区数和原来的差不多的,那么就不需要使用shuffle,也就是设置为false

II、排序

sortBy(x => x)这个算子中带有隐式转换参数

x 能够排序(比较大小),那么这个类就必须有比较大小的功能,也就是实现了compareTo 或者compare

实现二次排序有两种方法:

1、继承Comparable 接口 或者 Ordered

2、隐式转换:可以定义隐式转换函数(Ordered)或者隐式转换值(Ordering)

九、自定义分区

自定义分区

要求:按照key将对应的value输出到指定的分区中

解释:自定义一个自定义分区类,继承partitioner,实现他的两个方法

1、numPartitions

2、getPartition

具体的功能根据项目的要求自定义实现,然后调用partitionBy方法,new出自定义的类,传入参数即可

九、RDD持久化原理

1、持久化场景:对于一个rdd会被多次引用到,并且这个rdd计算过程复杂,计算时间特变耗时

2、如何进行持久化,调用rdd.persist方法或cache方法,cache方法底层就是调用persist方法

******************persist(StorageLevel.MEMORY_ONLY)*******************

如果对RDD做持久化,默认持久化级别是storageLevel.MEMORY_ONLY ,也就是持久化到内存中去,这种持久化级别是效率最快的,但是由于是纯Java 对象,保存到内存中,那么内存可能保存的数量就会较少

***************persist(StorageLevel.MEMORY_ONLY_SER)****************

如果当我们集群资源有限时,那么我们可以采用MEMORY_ONLY_SER,也就是将Java对象进行序列化之后持久到内存中去,这种持久化的好处是能够持久化更多的数据到内存中,但是由于在持久化时需要序列化,取出来之后又需要反序列化这一过程,这个过程会消耗CPU计算资源,性能相对于MEMORY_ONLY 这种持久化级别来说稍微弱点,但是还是比较高效的

3、如何选择RDD持久化策略?

Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍,下面是一些通用的持久化级别的选择建议:

1)、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略,因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作

2)、如果MEMORY_ONLY策略,无法存储所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快的,只是要消耗CPU进行反序列化

3)、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了

4、能不使用DISK相关的策略,就不要使用,有的时候,从磁盘读取数据,还不如重新计算一次

十一、共享变量

1、共享变量分为两种:广播变量 和 累加器

广播变量(broadcast)

2、日常所遇问题

因为每个task都需要拷贝这样的一个副本到executor去执行,那么我们可以想象一下,如果有1000 个task在某个worker上执行,而这个副本有100M,那么意味着我们需要拷贝100G的数据都到某个worker上执行,这样的话会大大消耗我们的网络流量,同时会加大executor的内存消耗,从而增加了我们spark作业的运行时间,大大降低了spark作业的运行效率,增加了作业失败的概率

3、如何解决以上问题,也就是说什么时候使用广播变量?

当RDD引用到了一个外部变量并且这个外部变量数据量不小,同时这个RDD对应的task数量特别多,那么此时使用广播共享变量再合适不过了

我们可以将这种大的外部变量做成广播变量,外部变量做成广播变量的时候,那么每个executor的内存中只会有一个外部变量,而这个副本针对所有的task都是共享的,这样的话就减少了网络流量消耗,降低了executor的内存消耗,提高了spark作业运行效率和缩短了运行时间,同时降低了作业失败的概率

4、广播变量的使用流程:

1)、某个executor的第一个task先执行,首先会从自己的blockManager中查找外部变量,如果没有就从邻居的executor的blockManager的内存中获取这个外部变量,如果还是获取不到,就从driver端获取,拷贝这个外部变量到本地的executor的blockManager

2)、当这个executor的其他task执行时,就不需要从外面获取这个外部变量的副本,直接从本地的blockManager中获取即可

5、如何获取广播变量的值?

可以直接调用广播变量的value() 这个方法即可

【注意】广播变量是只读的,不可写

累加器(Accumulator)

Spark提供的Accumulator ,主要用于多个节点对一个变量进行共享性的操作,Accumulator只是提供了累加的功能。但是却给我们提供了多个task对一个变量并行操作的功能,但是task只能对Accumulator进行累加操作

【注意】task只能对Accumulator进行类加操作,只有Driver程序可以读取Accumulator的值

RDD分区和容错机制讲解

1、RDD 的Lineage血统

RDD只支持粗粒度转换,即在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来。以便恢复丢失的分区

2、RDD的依赖关系

RDD和它的父RDD的关系有两种不同的类型:

1)、窄依赖(一对一,多对一)

形象的比喻:独生子女

2)、宽依赖(多对多)

形象的比喻:超生

注释:划分stage的依据就是宽依赖,也就是RDD之间是否有shuffle,shuffle过程就是一个宽依赖过程,shuffle之前的tasks就属于一个stage,shuffle之后的也属于一个stage,shuffle之前和之后的操作都是窄依赖

【注意】shuffle过程分为:shuffle Write过程 和 shuffle read过程

4、DAG的生成(有向无环图)和任务的划分

DAG(Directed Acyclic Graph)叫做有向无环图(有方向无循环的图)

5、一个wordCount过程会产生多少个RDD?

至少会产生五个RDD,

第一个,从HDFS中加载后得到一个RDD(即使用sc.textFile()算子),即HadoopRDD

在sc.textFile()过程中还会产生一个RDD(调用map算子),产生一个MapPartitionRDD

第二个,使用flatMap算子,得到一个MapPartitionRDD

第三个,使用map算子,得到一个MapPartitionRDD

第四个,使用reduceByKey算子,也就是在经过了shuffle过程后又会得到一个shuffledRDD

第五个,使用saveAsTextFile算子,再产生一个MapPartitionRDD 

spark程序提交流程讲解

Spark任务简介:

Spark-submit—>SparkSubmit–>main–>submit–>doRunMain–>RunMain–>通过反射创建我们编写的主类的实例对象,调用main方法–>开始执行我们编写的代码–>初始化SparkContext对象–>创建初始的RDD–>触发action算子–>提交job–>worker执行任务–>任务结束

Spark任务详解:

1)、将我们编写的程序打成jar包

2)、调用spark-submit脚本提交任务到集群上运行

3)、运行sparkSubmit的main方法,在这个方法中通过反射的方式创建我们编写的主类的实例对象,然后调用main方法,开始执行我们的代码(注意,我们的spark程序中的driver就运行在sparkSubmit进程中)

4)、当代码运行到创建SparkContext对象时,那就开始初始化SparkContext对象了

5)、在初始化SparkContext对象的时候,会创建两个特别重要的对象,分别是:DAGScheduler

和TaskScheduler

【DAGScheduler的作用】将RDD的依赖切分成一个一个的stage,然后将stage作为taskSet提交给DriverActor

6)、在构建taskScheduler的同时,会创建两个非常重要的对象,分别是DriverActor和ClientActor

【clientActor的作用】向master注册用户提交的任务

【DriverActor的作用】接受executor的反向注册,将任务提交给executor

7)、当clientActor启动后,会将用户提交的任务和相关的参数封装到ApplicationDescription对象中,然后提交给master进行任务的注册

8)、当master接受到clientActor提交的任务请求时,会将请求参数进行解析,并封装成Application,然后将其持久化,然后将其加入到任务队列waitingApps中

9)、当轮到我们提交的任务运行时,就开始调用schedule(),进行任务资源的调度

10)、master将调度好的资源封装到launchExecutor中发送给指定的worker

11)、worker接受到Maseter发送来的launchExecutor时,会将其解压并封装到ExecutorRunner中,然后调用这个对象的start(), 启动Executor

12)、Executor启动后会向DriverActor进行反向注册

13)、driverActor会发送注册成功的消息给Executor

14)、Executor接受到DriverActor注册成功的消息后会创建一个线程池,用于执行DriverActor发送过来的task任务

15)、当属于这个任务的所有的Executor启动并反向注册成功后,就意味着运行这个任务的环境已经准备好了,driver会结束SparkContext对象的初始化,也就意味着new SparkContext这句代码运行完成

16)、当初始化sc成功后,driver端就会继续运行我们编写的代码,然后开始创建初始的RDD,然后进行一系列转换操作,当遇到一个action算子时,也就意味着触发了一个job

17)、driver会将这个job提交给DAGScheduler

18)、DAGScheduler将接受到的job,从最后一个算子向前推导,将DAG依据宽依赖划分成一个一个的stage,然后将stage封装成taskSet,并将taskSet中的task提交给DriverActor

19)、DriverActor接受到DAGScheduler发送过来的task,会拿到一个序列化器,对task进行序列化,然后将序列化好的task封装到launchTask中,然后将launchTask发送给指定的Executor

20)、Executor接受到了DriverActor发送过来的launchTask时,会拿到一个反序列化器,对launchTask进行反序列化,封装到TaskRunner中,然后从Executor这个线程池中获取一个线程,将反序列化好的任务中的算子作用在RDD对应的分区上

【注意】

Spark的任务分为为两种:

a、shuffleMapTask:shuffle之前的任务

b、resultTask:shuffle之后的任务

Spark任务的本质:

将RDD的依赖关系切分成一个一个的stage,然后将stage作为TaskSet分批次的发送到Executor上执行

十三、Checkpoint

1、使用checkpoint的场景:

某个RDD会被多次引用,计算特别复杂,计算特别耗时

担心中间某些关键的,在后面会反复几次使用的RDD,可能会因为节点的故障,导致持久化数据的丢失

2、如何对RDD进行checkpoint?

1)、设置还原点目录,设置checkpoint目录

2)、调用RDD的checkpoint的方法对该RDD进行checkpoint

3、checkpoint的原理

1)、RDD调用了checkpoint方法之后,就接受RDDCheckpointData对象的管理

2)、RDDCheckpointData对象会负责将调用了checkpoint的RDD 的状态设置为MarkedForCheckpoint

3)、当这个RDD所在的job运行结束后,会调用最后一个RDD的doCheckpoint,根据其血统向上查找,查找到被标注为MarkedForCheckpoint状态的RDD,将其状态改变为checkpointingInProgress

4)、启动一个单独的job,将血统中标记为checkpointingInProgress的RDD进行checkpoint,也就是将RDD的数据写入到checkpoint的目录中去

5)、当某个节点发生故障,导致包括持久化的数据全部丢失,此时会从还原点目录还原RDD的每个分区的数据,这样就不需要从头开始计算一次

4、checkpoint需要注意的地方

因为RDD在做checkpoint的时候,会单独启动一个job对需要进行checkpoint的RDD进行重新计算,这样就会增加spark作业运行时间,所以spark强烈建议在做checkpoint之前,应该对需要进行checkpoint的RDD进行持久化(即调用 .cache)

5、checkpoint 和持久化的区别

1)、是否改变血统:

持久化(.cache):不会改变RDD的依赖关系,也就是不会改变其血统

Checkpoint:会改变RDD的血统,做了checkpoint的RDD会清除其所有的依赖关系,并将其父RDD强制设置为checkpointRDD,并且将RDD的状态更改为checkpointed

2)、RDD的数据的可靠性:

持久化:只是将RDD的数据持久化到内存或磁盘中,但是如果节点发生故障,那么持久化的数据还是会丢失

Checkpoint:checkpoint的数据保存在第三方高可靠的分布式的文件系统中,机试节点发生故障,数据也不会丢失,所以checkpoint比持久化可靠性更高

6、后续

我们实现了checkpoint 之后,在某个task 又调用了该RDD的iterator() 方法时,就实现了高容错机制,即使RDD的持久化数据丢失,或者压根儿就没有持久化,但是还是可以通过readCheckpointOrComputer() 方法,优先从父RDD—–checkpointRDD中读取,HDFS(外部文件系统)的数据

第二部分 spark-sql

一、Spark-SQL前世今生

1、Spark SQL的特点

1)、支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。

2)、多种性能优化技术:in-memory columnar storage、byte-code generation、cost model动态评估等。

3)、组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户都可以自己重新开发,并且动态扩展

2、Spark SQL的性能优化技术简介

1)、内存列存储(in-memory columnar storage)

2)、字节码生成技术(byte-code generation)

3)、Scala代码编写的优化

3、Spark SQL and DataFrame

Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark SQL还可以作为分布式的SQL查询引擎。Spark SQL最重要的功能之一,就是从Hive中查询数据。

DataFrame,可以理解为是,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。

二、Spark-sql的使用

1、RDD转换为DataFrame(两种)

1)、使用反射的方式来推断包含了特定数据类型的RDD的元数据

2)、通过编程接口来创建DataFrame

2、UDF自定义函数和UDAF自定义聚合函数

UDF,其实更多的是针对单行输入,返回一个输出

UDAF,则可以针对多行输入,进行聚合计算,返回一个输出,功能更加强大

3、Spark-SQL工作原理

SqlParse  ———>解析器

Analyser  ———>分析器

Optimizer  ———>优化器

SparkPlan  ———>物理计划

流程:

1)、自己编写的SQL语句

大家要知道,只要在数据库类型的技术里面,比如:最传统的MySQL,Oracle等,包括现在大数据领域的数据仓库,比如hive,他的基本的SQL执行的模型,都是类似的,首先都要生成一条SQL语句的执行计划

2)、通过SqlParser(解析器)生成未解析的逻辑计划(unresolved LogicalPlan)

3)、通过Analyzer(分析器)生成解析后的逻辑计划(resolved LogicalPlan)

4)、通过Optimizer(优化器)生成优化后的逻辑计划(optimized LogicalPlan)

实际上,比如传统的Oracle等数据库,通常都会生成多个执行计划,然后呢,最后有一个优化器,针对多个计划,选择一个最好的计划,而SparkSql这儿的优化指的是,比如说,刚生成的执行计划中,有些地方的性能是显而易见的,不太好,举例说明:

比如说,我们有一个SQL语句,select name from (select … from …) where ..=..;

此时,在执行计划解析出来的时候,其实就是按照他原封不动的样子,来解析成可以执行的计划,但是呢,Optimizer 在这里其实就会对执行计划进行优化,比如说,发现where 条件,其实可以放在子查询中,这样,子查询的数量大大变小,可以优化执行速度,此时,可能就会变成如下这样:select name from (select name from …where ..=..)

5)、通过SparkPlan,生成最后的物理计划(PhysicalPlan)

到物理计划这里,那么其实就是非常“接地气”的计划了。就是说,已经很明朗了,从那几个文件读取什么数据,从那几个文件中读取,如何进行关联等等

6)、在executor中执行物理计划

逻辑的执行计划,更多的是偏向于逻辑,比如说吧,大致就是这种样子的,

From table students=>filter … => select name …

这里基本上,逻辑计划都是采用Tree ,树形结构

7)、生成RDD

Select  name  from  students => 解析,从哪里去查询,students表,在哪个文件里,从哪个文件中查询哪些数据,比如说是name这个列,此外,复杂的SQL,还有,比如说查询时,是否对表中的数据进行过滤和筛选,更不用说,复杂时,需要有多表的JOIN(咋传统数据库中,比如MySQL,执行计划还涉及到如何扫描和利用索引)

4、spark-SQL性能优化

1)、设置shuffle过程的并行度:spark.sql.shuffle.partitions(SQLContext.setConf())

2)、在hive数据仓库建设过程中,合理设置数据类型,比如能设置为int的,就不要设置为bigInt,减少数据类型导致不必要的内存开销

3)、编写SQL时,尽量给出明确的列名,比如select name from students。不要写select * 的方式。

4)、并行处理查询结果:对于spark-SQL查询的结果,如果数据量比较大,比如超过1000条,那么就不要一次性的collect()到driver再处理,使用foreach()算子,并行处理查询结果

5)、缓存表:对于一条SQL语句可能对此使用到的表,可以对其进行缓存,使用 sqlContext.cacheTable(tableName),或者DataFrame.cache()即可,spark-SQL会用内存列存储的格式进行表的缓存,然后spark-sql就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存使用和GC开销,SQLContext.uncacheTable(tableName)可以将表从缓存中移除,用SQLContext。setConf(),设置spark.sql.inMemoryColumnarStorage.batchSize参数(默认10000),可以设置列存储的单位

6)、广播join表:spark.sql.autoBroadcastJoinThreshold,默认10485760 (10 MB)。在内存够用的情况下,可以增加其大小,参数设置了一个表在join的时候,最大在多大以内,可以被广播出去优化性能

5、Hive on Spark配置

1)、安转配置好Hive和Spark

2)、Set hive.execution.engine=spark;

3)、set spark.master=spark://mini1:7077

第三部分spark-streaming

1,  Dstream

 

Dstream是sparkStreaming的数据模型,本质就是一连串不间断的RDD,但是它是一个时间段的RDD.这些时间段的RDD源源不断的连接在一起。

这个时间可以自己设置,时间设置的越短,实时性越高,但是性能消耗也越大。

2,  spark streaming从kafka获取数据,有哪几种方式?

 

有两种方式:

1.通过receiver的方式,

2,通过direct的方式,dirrect的方式需要自己来管理偏移量。

 

3,  sparkStreaming和storm的区别

sparkStreaming是spark里面的一个做流式准实时计算的组件,它使用的数据结构是Dstream,Dstream里面是一连串时间片的rdd。

相比于storm,sparkStreaming在实时性,保证数据不丢失方面都不占用优势,spark streaming在spark支持者眼中的优势是spark Streaming具有高吞吐性,最本质来说,sparkStreaming相比于storm的优势是sparkStreaming可以和spark core,spark SQL无缝整合。

4.对于需要多次引用的,并且这个dstream计算时间特别耗时,数据特别重要,那么我们就需要对dstream进行checkpoint,(只有多次引用的,进行持久化就可以了),因为即使对这个dstream进行持久化,数据也可能会丢失,而checkpoint数据丢失的可能性小,但是这样会影响spark-streaming的数据吞吐量,因为在做计算的同时,还需要将数据写入到外部存储系统中,会降低spark性能,影响吞吐量,非必要情况下不建议使用

5.如何对dstream做checkpoint

 

首先设置还原点目录,其次调用dstream的checkpoint方法

【注意】:dstream的checkpoint的周期一定要是产生batch时间的整数倍,同时spark官方建议将checkpoint的时间设置为至少10秒。通常来说,将checkpoint间隔设置为窗口操作的滑动间隔的5-10倍

6.spark程序在启动时,会去这个checkpointPath目录下查看是否有保存的driver的元数据(1.dstream的操作转换关系,2.未处理完的batch)信息,当spark-streaming程序在二次启动后就会去checkpointPath目录下还原这个程序,加载未处理的batch元数据信息在内存中恢复,继续进行任务处理

7.为了保证spark-streaming程序7*24小时运行,那么我们程序应该具备高可靠性,怎样具备高可靠性?

 

a.程序出现故障,driver死掉了,流式程序应该具备自动重启的功能

b.没有计算完成的rdd在程序异常停止后,下次启动后还会将未处理的rdd进行处理

【注意】:要在spark_submit中,添加–deploy-mode参数,默认其值为client,即在提交应用的机器上启动driver,但是要能够自动重启driver,就必须将其值设置为cluster;此外,需要添加–supervise参数,失败后自动重启

//spark_submit –executor-memory 1g –total-execute-cores 5 –deploy-model cluster –supervise

8.启用预写机制

a.预写日志机制,简写为WAL,全称为Write Ahead Log,从spark1.2版本开始,就引入了基于容错的文件系统的WAL机制。如果启用该机制,Receiver接收到的所有数据都会写入配置的checkpoint目录中的预写日志。这中机制可以让driver在恢复的时候,避免数据丢失,并且可以确保整个实时计算过程中零数据丢失

b.要配置该机制,首先调用StreamingContext的checkpoint()方法设置一个checkpoint目录,然后需要将spark.streaming.receiver.writeAheadLog.enable参数设置为true

然而,这种极强的可靠性机制,会导致Receiver的吞吐量大幅度下降,因为单位时间内,有相当一部分时间需要将数据写入预写日志。如果又希望开启预写日志机制,确保数据零损失,又不希望影响系统的吞吐量,那么可以创建多个输入DStream,启动多个Receiver

此外,在启用了预写日志机制之后,推荐将复制持久化机制禁用掉,因为所有数据已经保存在容错的文件系统中,不需要在用复制机制进行持久化,保存一份副本,只要将输入的DStream的持久化机制设置一下即可,persist(StorageLevel.MEMORY_AND_DISK_SER)。

9.spark-Streaming checkpoint概述

每一个spark-streaming应用,正常来说,都是7*24小时运转的,这就是实时计算程序的特点,因为要持续不断的对数据进行计算,因此,对实时计算应用的要求,应该是必须要能够对与应用程序逻辑无关的失败,进行容错

如果要实现这个目标,Spark Streaming程序就必须将足够的信息checkpoint到容错的存储系统上,从而让它能够从失败中进行恢复

有两种数据需要被进行checkpoint

1.元数据checkpoint-将定义了流式计算逻辑的信息,保存到容错的存储系统上,比如HDFS,当运行spark Streaming应用程序的Driver进程所在的节点失败时,该信息可以用于进行恢复,元数据信息包括:

a.配置信息—创建spark Streaming应用程序的配置信息,比如sparkConf中的信息

b.DStream的操作信息—定义了spark Stream应用程序的计算逻辑的DStream操作信息

c.未处理的batch信息—那些job正在排队,还没处理的batch信息

2.数据checkpoint—将实时计算过程中产生的RDD的数据保存到可靠的存储系统中。对于一些将多个batch的数据进行聚合,有状态的transformation操作,这是非常有用的,在这种transformation操作中,生成的RDD是依赖于之前batch的RDD的,这会导致随着时间的推移,RDD的依赖链条变得越来越长,要避免由于依赖链条越来越长,导致的一起变得越来越长的失败恢复时间,有状态的transformation操作执行过程中中间产生的RDD,会定期被checkpoint到可靠的存储系统上,比如HDFS,从而削减RDD的依赖链条进而缩短失败恢复时,RDD的恢复时间。一句话概括,元数据checkpoint主要是为了从driver失败中进行恢复;而RDD checkpoint主要是为了使用到有状态的transformation操作时,能够在其生产出的数据丢失时,进行快速的失败恢复

10.何时启用checkpoint机制?

a.使用了有状态的transformation操作—-比如updateStateByKey,或者reduceByKeyAndWindow操作被使用了,那么checkpoint目录要求必须提供的,也就是必须开启checkpoint机制,从而进行周期性的RDD checkpoint

b.要保证可以从Driver失败中进行恢复—–元数据checkpoint需要启用,来进行这种情况的恢复

【注意】并不是说,所有的spark streaming应用程序,都要启用checkpoint机制,如果既不强制要求从Driver失败中自动进行恢复,又没使用有状态的transformation操作,那么就不需要启用checkpoint,事实上,这么做反而是有助于提升性能的

11.如何自动从Driver失败中恢复过来

 

要能够自动从Driver失败中恢复过来运行spark Streaming应用程序的集群,就必须监控Driver运行的过程,并且在他失败时将他重启,对于spark自身的standalone模式,需要进行一些配置去supervise driver,在他失败时将其重启

首先,要在spark-submit中,添加–deploy-mode参数,默认其值为client,即在提交应用的机器上启动Driver,但是,要能够自动重启Driver,就必须将其值设置为cluster,此外,需要添加–supervise参数

 

部分原理图稍后上传。 

    原文作者:三万_chenbing
    原文地址: https://www.jianshu.com/p/0ba43c42100a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞