Spark优化
- worker 的资源分配:cpu, memroy, executors
- spark.yarn.executor.memoryOverhead, 0.1 * spark.executor.memory
- YARN AM needs a core
- HDFS Throughput. Best is to keep under 5 cores per executor, else can lead to bad HDFS I/O throughput
- shuffle的block size不能大于2G
Spark 使用一个叫 ByteBuffer 的数据结构来作为 shuffle 数据的缓存,但这个 ByteBuffer 默认分配的内存是 2g,所以一旦 shuffle 的数据超过 2g 的时候,shuflle 过程会出错。影响 shuffle 数据大小的因素有以下常见的几个:
- partition 的数量,partition 越多,分布到每个 partition 上的数据越少,越不容易导致 shuffle 数据过大;
- 数据分布不均匀,一般是 groupByKey 后,存在某几个 key 包含的数据过大,导致该 key 所在的 partition 上数据过大;
- 在运行 spark sql 的时候,默认的 shuffle 的 partition 的数量是200,容易导致 shuffle 数据过大;
一般的解决办法是:
- 增加 partition 的数量;
- 在代码中可以使用 rdd.repartition() or rdd.coalesce() 改变 partition 数量;
- 在 Spark SQL, 设置较大的 spark.sql.shuffle.partitions;
- 一般一个partition的大小为 128M 为宜;
如果 partition 小于 2000 和大于 2000 的两种场景下,Spark 使用不同的数据结构来在 shuffle 时记录相关信息,在 partition 大于 2000 时,会有另一种更高效 [压缩] 的数据结构来存储信息。所以如果你的 partition 没到 2000,但是很接近 2000,可以放心的把 partition 设置为 2000 以上。
- 合理优化 DAG
- 尽量避免 shuffle
- 尽量避免 cartesian
- reduceByKey/aggregateByKey 代替 groupByKey
- treeReduce 代替 reduce
- foreachPartitions 代替 foreach
- repartitionAndSortWithinPartitions 代替 repartition 与 sort 类操作
- 数据倾斜以及设置合理的partition
- Scheduler Delay: spark 分配 task 所花费的时间
- Executor Computing Time: executor 执行 task 所花费的时间
- Getting Result Time: 获取 task 执行结果所花费的时间
- Result Serialization Time: task 执行结果序列化时间
- Task Deserialization Time: task 反序列化时间
- Shuffle Write Time: shuffle 写数据时间
- Shuffle Read Time: shuffle 读数据所花费时间
如果 partition 数据不均匀的时候,上面的各个时间耗费都不太稳定,导致最终的耗费时间很长。
场景1:如果HDFS上一个文件夹的文件个数为100个,每个文件大小约150M,那么每个文件在HDFS上就会有两个block(一个128M,一个22M),spark在读取这个文件夹的时候,默认就会分配200个partition。但是问题来了,由于每个block的数据不平均,每个partition的数据就会有倾斜,CPU就不能充分利用,有些计算很快完成的就会空闲,导致最终的计算所花费的时间较长。所以,这种情况下,在读取完所有的数据之后,应该进行repartition一次,partition的个数以总的cores或者cores的倍数为宜。
场景2:如果数据是 (key, value) 类型,key 的分布不均匀,比较常见的方法是把 key 进行 salt 处理,比如说原来有 2 个 key (key1, key2),并且 key1 对应的数据集很大,而 key2 对应的数据集相对较小,可以把 key 扩张成多个 key (key1-1, key1-2, …, key1-n, key2-1, key2-2, …, key2-m) ,并且保证 key1-* 对应的数据都是原始 key1 对应的数据集上划分而来的,key2-* 上对应的数据都是原始的 key2 对应的数据集上划分而来。这样之后,我们有 m+n 个 key,而且每个 key 对应的数据集都相对较小,并行度增加,每个并行程序处理的数据集大小差别不大,可以大大提速并行处理效率。
- 数据格式
- 如果自己能够决定数据的存储格式的时候,尽量使用二进制的数据格式,比如 Avro, Thrift, Protobuf, Parquet, ORC。最差的就是用Json格式了。
- 在对 RDD 进行 shuffle 和 cache 时,数据都是需要被序列化才可以存储的,此时也可以指定序列化的格式,比如Kryo。
Kryo序列化机制,一旦启用以后,会生效的几个地方:
- 算子函数中使用到的外部变量,在传输的时候肯定要序列化了
- 算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗,算子函数中用到了外部变量,会序列化,使用Kryo。
- 持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER
- 持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。当使用了序列化的持久化级别时,在将每个RDD partition序列化成一个大的字节数组时,就会使用Kryo进一步优化序列化的效率和性能。
- shuffle
- shuffle的时候可以优化网络传输的性能,在进行stage间的task的shuffle操作时,节点与节点之间的task会互相大量通过网络拉取和传输文件,此时,这些数据既然通过网络传输,也是可能要序列化的,就会使用Kryo。
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Kryo之所以没有被作为默认的序列化类库的原因,主要是因为Kryo要求,如果要达到它的最佳性能的话,那么就一定要注册你自定义的类(比如,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,否则Kryo达不到最佳性能)。注册你使用到的,需要通过Kryo序列化的,一些自定义类,SparkConf.registerKryoClasses()
- map VS mapPartitions
- map 是每个 RDD 的元素都会调用一次 map 方法,mapPartitions 是在一个 partition 里面只会调用一次 map 方法,省去了每次调用函数时候的 setup 和 cleanup 的开销
- map 的每次结果不会缓存在内存中,mapPartitions 会把计算结果都缓存起来,一起返回给新的 RDD
如果最后的结果需要取 topN, 建议在 mapPartitions 里面就返回该 partition 的 topN,在最后计算 topN 的时候,数据量就会小很多。
多次使用的 RDD 合理使用 cache 或者 persist
spark streaming
- 设置合理的批处理时间(batchDuration)
- 设置合理的Kafka拉取量(maxRatePerPartition重要)
- 如果在一个批时间内处理不完所有数据,还可以设置并行job数,spark.streaming.concurrentJobs
- 其它
- numpy的ndarray的转化(在RDD cache之前就将array转成ndarray);
- 如果已经归一化的矩阵,计算cosine_similarity可以用dot代替;
- 优化 cpu 计算,使用 avx 或者 sse3 指令集,比如把暂存器XMM 128bit提升至YMM 256bit,以增加一倍的运算效率;
- 优化 ndarray 的矩阵计算,使用 ndarray 的 mmul 方法进行大矩阵运行;