Spark分布式原理及碰到的三个坑

《Spark分布式原理及碰到的三个坑》 摄于玉泉

Spark 是继 MP 之后的分布式计算框架,采用分布式架构就好比投入了更多的人力,虽然每个人做事的效率一样,但人多了完成特定目标的时间也就短了。

一个项目要完成,除了每个人单独完成自己的事外还需要有人来协调,比如最初给每个人分配任务,又或者汇总每个人完成的结果。

Spark 里 executer 可以理解为一小群单独做事的人,而 driver 担任这个管理者的职务。driver 是提交任务的那台机器,负责协调 executer 之间的数据传输。

在配置 Spark 任务时,我们会指定四个参数:num-executors、executor-cores、executor-memory、driver-memory。num-executors 表示投入了几群人,executor-cores 表示每群人中有几个人干活,executor-memory 表示每群人分配到的最大数据量,driver-memory 表示管理者最多能操作的数据量。

也就是说,Spark 任务中在底下干活的人数是 num-executors * executor-cores。每个人当前只能处理一件事情,所以默认情况下 Spark 中 RDD 的分区数也是这么多个。

如果自己在代码中使用 repartition 操作,若多于干活的人数,多的部分也只能有人空出手来再处理。从这也可以看出,假设数据较均匀的话,repatition(n) 中 n 的选择最合理的是干活人数的整数倍。

由于大部分数据都是在 executer 而不是在 driver 上执行的,所以一些在本地执行程序的思维会给我们带来一些坑,我在开发 Spark 任务时就碰到过三次因此带来的坑。

一号坑

调试 Spark 任务时有时会需要打印 rdd 类型的数据,刚开始操作时很容易就通过以下这种方式:


rddData.map(x=> println(x)).collect()

当其实println(x)这个操作是在某台 executer 上完成的,所以打印操作也是在那台机器上输出。于是 driver 这台机器上自然就得不到想要的结果。

正确的方式就是使用 take 方法把部分数据收集到 driver 机器上,再使用 print 函数。

rddData.take(5).foreach(x => println(x))

二号坑

在上一篇文章《Spark 读取本地文件》中提到,有时我们需要从本地读取文件。

Spark 中有一个读取csv文件的函数

val df = spark.read.csv(fileName)

但同样的,read这个函数是从 executer 上读取相应文件,所以若你只是在 driver 上生成了文件程序就会报错。

要想在 driver 上读取本地文件可通过 scala.io.Source.FromFile 来完成,具体的可见上述文章。

三号坑

前面提到,每一个分区就会交由一个“人”来完成,所以一个分区中的数据量要与单个“人”的处理能力相匹配。

除此之外,比如使用 reduceByKey 时 driver 会协调各个分区中的数量,这就会造成 shuffle,若每个分区的数据都很少,shuffle 就会增多,这严重降低的效率。

比如最近我在开发一个算法时,需要用到一份商品的基础数据。整份数据其实也就几百兆大小,处理起来理因非常快。但在测试时却发现四五十分钟才能处理完。

后来发现保存为 hdfs 的源文件的分区数有3000来个,每个分区只有几十K的数据,所以造成 driver 协调时耗时严重。后来我在处理数据之前使用 repartition 命令将分区缩小到10,结果四分钟内就程序跑完了。

    原文作者:学习之术
    原文地址: https://www.jianshu.com/p/7e8ddca8aa3c
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞