使用scala
- 因为spark的源代码就是scala,想成为专家就必须会scala,尤其是RDD的代码大量的都是基于scala集合库的概念,并且immutable,lambda等scala相关的语言设计概念也是天生的体现在spark代码中。scala函数式语言可以让你更好的理解和书写spark代码。
- spark支持python,但是语言的转换意味着性能的损失。
Spark如何工作
- spark的角色:spark不是数据库,也不是数据存储方案,而是数据计算框架。spark的运行时建立在cluster manager(例如YARN)的调度之上,把最终的结果放入外部分布式存储(例如S3)之中。
- spark core:核心数据处理引擎,提供API来使用RDD处理数据。
- spark sql:与spark core并驾齐驱,提供多语言的API,最重要的是提供了半结构化的数据类型抽象DataFrames和Datasets。
- spark ml:机器学习和统计算法的库,最终会从RDD-based的API过渡到Dataframe-based的API。
- spark streaming:通过调度spark core来进行的micro-batch的流式处理。
- spark graphX:图处理的spark框架,尚不成熟。
- 其他:你会发现,读取csv文件,连接到JDBC数据库等等这些特性,spark都没有直接提供,这些你可以在spark-submit中包含。
RDD知识
尽管官方已经不推荐直接使用RDD,但是具备大概的RDD的知识可以帮助你更好的优化。
惰性求值。当一个action发生时,此前的一系列transformation操作都会转化成一个DAG,这个DAG会分布到不同的节点上计算。这与传统MR计算模型不同,不需要在节点之间传递大量的中间计算结果。当然,并不是所有的transform都是惰性求值的,sortByKey就是action和transform都有的操作。
故障冗余。fault tolerance。在分布式计算中,一个节点down掉,就得重新计算。传统的MR模型是通过log日志的机制对每一步的中间结果记录下来来恢复数据。spark中,由于采用了lazy evaluation,每一个partition的RDD都有依赖信息(也就是计算过程的依赖路径,execution plan),一个partition的数据丢失了,则重新计算即可,况且重新计算(而不是多次复制数据来防止故障,这和MR计算很不一样)丢失的节点也可以很容易并行加速。
故障排除。debugging。由于采用lazy evaluation,很多时候transform步骤所抛出的异常只会在action的那一步显式,这也是为什么大部分时候,一个bug在很早的transform步骤发生,但是只在driver节点才抛出。
内存持久化和内存管理。spark相比传统的MR计算在repeated computation上有巨大的优势,其优势在于每一步的executor的数据都是在内存中。RDD的persist函数则是提供了三种方式来存储数据:
- In memory as deseriazlied Java object. 以反序列化的原始java对象存储在内存中。速度最快,但是由于是原始的java对象,内存占用大。
- In memory as serialized data。以序列化的数据存在。spark object对象通过序列化成字节流,在网络间传输。CPU密集型。慢,但是内存占用小。用Kryo库,更省空间。
- On disk。当数据无法存在executor节点的内存中,直接写入到磁盘,最慢。但是省空间。
RDD的类型。spark定义了一个顶级的RDD抽象接口,然后有多个不同类型的RDD实现,比如NewHadoopRDD,ShuffleRDD。RDD定义的函数类型分为两类:action和transform。action通常有collect
,count
,collectAsMap
,sample
,reduce
, andtake
.这些都会触发计算。collect的扩展性很差,很容易引起driver节点的内存溢出。
RDD的依赖。分为宽依赖和窄依赖。wide dependency:子RDD依赖于无限多个父RDD(比如sort,groupByKey)。narrow dependency:子RDD只依赖于一个父RDD(比如filter,map),或者一个子RDD依赖于多个父RDD(只要是这个依赖和RDD本身的数值无关,比如coalesce)。
Spark 任务执行模型概述
一个spark Application是由一个driver进程和若干个executor进程组成。任务的逻辑运行在driver节点中,并向executor节点发送指令来完成操作。一个spark的cluster集群可以同时运行多个spark Applications,每个Application由一个SparkContext的抽象来表达,这些Applications有cluster manager调度。
一个spark application运行时,遇到一个action,比如collect,就会触发evaluation,这时会产生一个job,一个job包含了多个stage(这些stage在内部被抽象为DAG,决定了谁先被执行,谁后被执行)。stage的划分边界主要是由不同的action来决定,最常见的比如groupByKey,由于涉及到数据shuffle的传输,就会被划分为两个stage。每个stage都是在同一个executor上执行,因为数据不会被传输,其包含了多个task(可能并行计算),一个task就是真正干货的人,这些task都是在本地执行的,只有当所有的task都结束了,下一个stage才能继续。
SparkContext:是一个重要概念,它代表了一个上面提到的Application,运行在driver进程中。本身包含了运行期间的多个参数。
Spark SQL 和 DataFrames, Datasets
Spark SQL提供的DateFrames和Datesets是在RDD之上加入了schema的信息,从而更利于优化。Datesets提供了编译时类型检查。一般来说,是比RDD更好的选择。
聚合操作:如果是对df在多个列上进行不同的聚合操作,最好一次调用df.agg(func1(“col1”), func2(“col2”)),而避免多次调用。
集合操作:intersect,except,distinct是很慢的。
数据表示:Tungsten是Spark SQL中用来优化内存数据的一项技术,在前面提到过RDD在内存中会以3种方式来存储,而Tungsten可以更有效的表达数据,内存更紧凑,序列化更快。
数据存取:Spark SQL提供了和Spark core不同的API来存取数据,对于json数据,spark可以先去‘猜’schema。
数据创建:df和ds可以由local collection来创建,但是注意,这些local collection都是在driver进程中创建的,因此这些由local collection创建的df和ds一般都只是用于unit test,或者用于broadcast join大的数据集。
数据分区:partition函数可以促进filter push down
UDF和UDAF:如果自定义的(聚合)函数不是JVM语言编写,那么性能多半会折扣,因为数据的转换。为了避免性能损失,可以用scala来自定义函数,注册到sqlCtx.udf.register(),再使用python来调用。即使是用JVM语言来自定义函数,但是仍没有SQL表达式块。
查询优化
Spark SQL查询过程浅析:Spark Catalyst分析df上的transform,生成logical plan,然后优化,产生physical plan,cost-based和rule-based优化规则都会被使用。最重要的优化点就是在data source层的谓词下推(只在Spark SQL中):predicate push down。然后生成可执行的java代码。
对于大型的查询语句执行计划,或者一些迭代算法,Catalyst优化器并不能很好的处理。使用explain来debug。
Join优化
对于Core Spark来说,也就是RDD join,默认使用shuffle hash join,大部分情况下会引起数据的shuffle传输。
- 因此很简单的一个优化技术是在join之前,尽量缩小df的大小。
- 在join前,两方RDD均使用partitioner,这样就可以使两RDD落入到相同的partition。
- 一个大RDD,一个小RDD,那么小的RDD使用broadcast join到各个大的RDD节点上,避免shuffle。Spark core本身没有现成的broadcast join函数,你得自己实现。Spark SQL有现成的broadcast函数。
对于Spark SQL来说,优化器可以进行reorder,push down等技术来优化join。但是也意味着你无法人工控制shuffle。支持多种join类型。
- df
- self join需要重命名
- broadcast join支持
- ds:使用joinWith来实现join数据
Transformation优化
先讲RDD的变换,其中最重要的概念就是‘宽依赖’和‘窄依赖’。
性能的影响:前者通常意味着数据shuffle,后者则不会。因此在写transform函数的时候,要特别注意:collect,groupByKey之类是宽依赖,filter,map之类是窄依赖。
故障冗余的影响:前者在故障恢复时的代价比后者要高很多。因为‘窄依赖’的变换操作中,如果在一个partition上数据丢失了,只需要重新计算丢失的那个子RDD上。而‘宽依赖’的变换操作中,如果一个partition数据丢失,则最坏情况下,需要重新计算所有的子RDD,比如排序。
coalesce的特殊之处:当coalesce减小partition的数量时,是窄依赖。当coalesce增加partition的数量时,是宽依赖,会引起shuffle。
减小GC压力:一,尽量重用对象。二,尽量使用小的数据结构和primitive type,Array就很好。
变量共享:driver向worker发送变量,使用SparkContext的broadcast,这个变量必须是本地可序列化对象类型。worker向driver发送结果变量,使用Accumulator,做法是继承AccumulatorV2接口,在SparkContext的register注册即可。并且Spark内部,要使用Accumulator来记录task的指标metrics。
RDD的重用好处:最常见的方法就是persist和checking point,但是大多数情况下两者会降低性能。在少数情况下,却能提高性能:1 迭代计算,在相同的RDD上多次join计算,事先persist到内存中。2 多个action在相同的RDD上,那么最好在每一步action之后使用persist,这样就可以重用RDD。3 如果transform计算代价过高或过长,那么最好persist中间结果,这样就可以更容易快速恢复数据。
RDD的重用代价:1 persist会占用内存 2 要特别小心persist会破坏stage中的pipeline,尤其是‘窄依赖’中的装换操作,比如在rdd.filter()和rdd.map()中间你插入一个persist,中间结果会被记录备份下来,方便了故障恢复,但是性能下降。
RDD的重用类型:
- persist:有几个选项
- useDisk:数据无法放入内存中,则写入磁盘。一定要在计算代价和磁盘读取之间做权衡。
- useMemory:存入executor内存中,
- useOfHeap:存入Spark executor之外的内存中,比如Tachyon系统。
- deserialized:以反序列化的Java对象存储,内存空间利用率高,比如有Kryo
- replication:多次复制
- checkpointing:数据写入到外部存储系统中,比如S3,不会记录RDD的stage信息。checkingpoint的最大用处是降低故障的风险。
Key/Value Transformation优化
GroupByKey:每个key,对应一个iterator。官方不推荐的用法,会引起memory issue,关键是数据shuffle之后,一个key对应的iterator,都得加载进内存。默认使用HashPartitioner,但也可以自定义。因此直接在RDD上的GroupByKey很容易出问题,建议在df或ds上进行groupBy操作。如果非得在RDD上计算GroupByKey的类似操作,最好使用reduceByKey或者aggregateByKey来代替,因为后者的本质是MR中map side reduction,在数据shuffle之前就先‘压缩’了数据,数据量自然就降下来了,从而减少了网络IO和内存爆炸。
repartition和coalesce:前者一定会引起shuffle,后者在partition数量变大的情况下,会shuffle数据。因此,大多数情况下,性能调优的关键在于shuffle less和shuffle better。而解决的关键就是partitioner。Spark自带的有HashPartitioner和RangePartitioner。
使用Spark时对语言的考虑
Spark支持非JVM的语言,比如python,但是通常使用rpc通信方式,然后把逻辑代码发送给worker。非JVM语言的代码,Spark交互时,会产生copy数据从JVM到目标语言的runtime,这也是为什么速度会降低。有必要知道Java API的使用,R,python的绑定都是建立在Java API之上。
Spark参数调优
默认的spark参数只能满足最基本的要求,因此必须得考虑具体的cluster大小等等众多的情况。因此SparkConf是最需要关注的地方。spark.driver.memory: 默认1G,这也是在collect操作时得千万注意的地方。spark.executor.memory:默认1G。
driver的调参,一般情况下,增大driver的内存并不会加快计算,因为计算都是分布到executor节点上运行。但是,如果有collect操作,所有的数据都会发给driver,这时就需要加大driver的内存并且改变spark.driver.maxResultSize的设置。所以一般的调试过程是:先给driver尽可能少的内存,这样executor就有足够的内存,然后等问题出现了,逐渐增加。
executor的调参,是少量的big executor还是大量的small executor?
- 大量的small executor:这个很容易引起CPU资源的短缺和内存不够用,因为数据也是分布在不同的节点上的。所以如果资源宽松的话,内存4g起。
- 大量的big executor:这个很容易造成浪费,5个core对于executor来说就是上限。
- M和R区:一个executor中被划分为M区和R区,前者用于计算,后者用于cache数据。之间的比例可以调整。一般来说,如果是一般计算,不用调参,如果是在运行迭代算法,希望RDD能一直cache到内存R区中,那么可以增大R区的比例。
partition的调参,可以使用repartition或coalesce来调整数据的分布。需要找到一个平衡点,每个分区过大,executor的内存吃紧;分区过小,executor就是浪费。需要不断地调整。可以参看UI中的spill disk的情况。