==[原理]从RDDs到Spark

从RDDs到Spark – Blueve 湛蓝
http://blueve.me/archives/1437

《==[原理]从RDDs到Spark》 QQ20160212-2@2x

Spark是近年来非常火爆的分布式计算框架,可以说它紧跟Hadoop的脚步,并且在很多方面实现了超越。在Spark官方的宣传中我们也可以看到,Hadoop能做到的事情,Spark也可以做,而且通常可以做得更好。事实上,越来越多的业内公司都开始试水Spark,因为其核心卖点内存计算加速以及简洁优雅的编程接口不光可以为某些分布式批处理应用提速,还可以降低分布式计算应用的编写门槛。由于我不是数据工程师,所以我并不太关心分布式应用的编写环节(例如如何利用Spark进行数据挖掘、机器学习……),我更关心Spark这个计算框架以及这个分布式系统的体系结构。现在有很多书都在写Spark源码,但是很遗憾的是,在我看来,这些源码分析做得很“工程”,要知道很少有人会因为某某软件开源,就去死磕它的代码,人们往往需要带着目的去研究他们的代码,从而补足书面文献中所缺少的部分,其次才是看他们的实现是否够“优雅”,够“软件”。

希望本文能做做成一个系列,记录我学习Spark的点滴,同时也将我的学习思路分享给大家。

研究一个软件,相对于“逆向”,倘若我们可以直接获得Spark本身的设计图纸,那岂不是更加有效率?事实上,Spark官网已经在一个相对醒目的位置给出了Spark架构设计的
相关文献,最下方是Spark在设计与实现过程当中产出的学术论文。学术论文由于其体裁本身的特点,使得他们具备严谨的科学素质以及最为凝练的阐述,因此从这些文献入手,也应是能够最大限度排除噪音,找到我们所关心的核心问题。其中标记为Best Paper Award的Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing[1]就是本文的核心。

RDDs – Resilient distributed datasets,翻译过来大概就是弹性分布式数据集,可以说它就是Spark的灵魂,Spark最大的创新之处就在于使用RDDs来抽象分布式环境下的数据及其转换,并尽可能的让数据保留在内存中进行计算,减少I/O。

在Hadoop MapReduce计算框架中,面对一个需要多个MapReduce组合的大型任务,在每个MR任务之间,数据的交互都是通过HDFS来进行的,打个比方,现在有一个玩具,在中国制造,但这个玩具的牌子是美国的,并且该品牌在中国并没有开放销售,因此想要买这个玩具的话你就必须从国外买,于是这个玩具就经历了
中国制造 -> 中国仓库 -> 美国仓库 -> 中国仓库 -> 中国买家 这一过程,相信有过类似购物经历的人已经非常苦恼,既然是在中国制造的,那为什么不在中国的工厂制造完毕之后直接发给我呢?是的,其实这就是MapReduce计算模型的缺陷,
Map -> Reduce -> HDFS -> Map -> Reudce…,人们很快就意识到了这个问题,于是DAG运算得以发展面世。我很喜欢Apache Tez主页上的这张图:

《==[原理]从RDDs到Spark》 QQ20160212-0@2x

显而易见,将前一项作业的中间结果直接复用,将会带来巨大的性能提升。回到我们之前的例子中,仅仅这个改变就相当于变成了中国制造 -> 中国仓库 -> 中国买家,不仅转运费少了,物流的时间还大大缩短。实际上,Spark做得还要更为出色一些。
RDDs在上面的图中可以被认为是一个Map阶段或Reduce阶段的产物,在论文中,RDD被定义为(1)只读的(2)分区记录的集合,它的创建方式非常有限(1)从数据源生成(2)从其他RDD转化,因为这个性质,文献中也一再强调,对RDD的操作是粗粒度的。其实和Map Reduce编程模型类似,在MR中,我们对数据进行的转化也就是两类:Map和Reduce。而RDDs提供转化方式更为丰富,例如join,filter之类的。
貌似上面说得都有些复杂,大多数时候,其实你只要把RDD想成一般的ReadOnlyList<XXX>对象就可以了,只不过这个List中的数据是可以分散在好多台机器上的。开发者看到这个特性一定是笑嘻嘻的,想象一下,你想统计一个年龄List中>=18岁的数目,在传统编程语言中你可以这么写(C#):

1

ages.Where(age => age >= 18).Count();

在Spark中你仍然可以这么写!

1

ages.filter(_ >= 18).count()

写法几乎是一样的,但是后者的ages的数据集可能是10TB的大小,filter可以运行在成百上千台机器上,尽管你可以对此一无所知。这就是RDDs API带来的编程快感。
然而RDDs的内部就不是看上去这样简单了。RDDs会记录它自身的生成轨迹(lineage),相当于每个RDD都自备家谱,这又有什么好处呢?这里就涉及到错误恢复机制。我们都知道,分布式系统中,一个非常常见的问题就是,计算节点可能随时都会失效,倘若这个计算节点恰好正在承担一些计算任务,如果不能妥善的进行错误恢复,单个节点的子任务失败,就可能会导致整个大任务的崩溃。而lineage的存在,就意味着当某些RDD失效的时候,我们可以寻根问祖,按照他的生成轨迹重新生成就是了。好比做咖啡,磨好了豆子备好了咖啡壶,结果发现备好的水洒了,难道你会把之前所有的工序都重复一遍么?当然不,你只需要知道饮水机->水这样一个lineage,再去找饮水机接一杯水就可以了。
当然,说归说,RDDs在设计上其实还有一个非常有意义的地方:延迟调用。以之前的代码为例,ages在调用filter方法过后,并不会立即就对ages刷刷刷得过滤,它只会定义一个新的RDD,这个RDD的lineage或者说配方是filter(_ >= 18)。这就是所谓的转换。与转换对应的则是动作,例如count(),只有当一个RDD执行一个动作的时候,它的那套lineage才会被真正的执行。这样说来,其实RDDs本身就是个菜谱,所有的转换操作只是在为这个菜谱添砖加瓦,而一声令下开伙做饭的是动作。你可能会问,这样有什么好处?一个显而易见的好处就在于,在动作执行之前,我们可以有更大的优化空间,开发者定义的RDD的转换真的是最佳方案的么?两个filter可不可以合并?是先筛选还是先join?……延迟调用为这些优化提供了巨大的优化空间,执行引擎得到的只是一个最终的RDD,它可以顺着这个RDD的lineage,优化整个RDD的执行计划,并最终执行计算。
说到这里,其实我们已经渐渐地离开了RDDs的“表”,开始探讨它的“里”。RDD之间的依赖关系其实可以按照对数据的操作性质分为两类:宽/窄:

《==[原理]从RDDs到Spark》 QQ20160212-1@2x

显然,“窄”是非常受欢迎的,假如右侧有某个块(节点)失效了,那么只需要顺着它那根线往回找那个对应的块重新计算一遍补上就可以了,而宽依赖的错误恢复就要麻烦得多,右边的任何一个块失效,都必须把左边所有的块都找出来再算一遍。由于这个性质,在实际计算的调度当中,Spark会将RDDs的计算划分为多个Stage,而划分标准就是Shuffle操作,或者说是出现宽依赖的地方:

《==[原理]从RDDs到Spark》 QQ20160212-2@2x

Stage内包含了尽可能多的窄依赖,这令他们可以被以流水线的形式并行化执行。黑色的块表示它被缓存了,缓存的存在使得RDD的lineage更加简化。比如说G倘若出了什么意外,Stage 1是不用再计算一遍的,执行引擎只用去跑Stage 2和3。宽依赖的父层(通常为某个Stage的最后一层)会被考虑物化(我更喜欢称之为实体化),这样发生错误的时候,这个Stage的这个尾巴只要还留着,就不用计算前面那么多的步骤了。

在这一基础上,计算调度器会考虑数据本地化,尽可能把task分配给那些不需要数据通讯或移动的节点(例如B中的块1数据在节点A,那么这个块到G中块1的映射计算,就会被尽可能的分配给节点A来做)。为了均衡节点间的通讯效率以及节点计算资源的等待,调度策略选用的是延迟调度,有兴趣的读者可以单独去搜搜。

对于某些迭代类型的作业,某些RDD的lineage可能会超长(迭代100次可能就会有100层依赖),如果在某次迭代的时候发生了错误,那么回溯的代价是十分昂贵的,对于这种情况,RDD也提供了传统的“检查点”式的错误恢复机制,某些单机游戏就有这样的机制,如果你死了,并不会让你从头开始游戏,而是从最近的一个记录点重生(例如:Bad land、Limbo)。在文中提到,目前的Check Point必须要靠编程人员手工来设置,未来可能会做成自动的,毕竟Spark比你更了解你程序运行的时间代价。

Spark采用LRU作为RDD的内存轮替策略,让有限的内存中可以尽可能存储那些利用率更高的RDD。在这篇论文发布的时候,作者提到了Spark的每个实例之间的内存是相互隔离的,也就是说实例A中的RDD,在实例B中是读取不到的,尽管他们可能是同一个东西,近年来,关于Shared RDD也有不少研究,不过近期发布的最新版
Spark 1.6中,统一的内存管理已经被实现
SPARK-10000,这无疑是个利好消息, 当RDD可以在多个实例内奔跑的情况下,我们就可以使用更多的复用手段来优化不同实例的计算。2016.2.27更新: 实际上这个统一内存管理和RDDs这篇论文上所说的还是有一定差别的,SPARK-10000主要实现的是“执行用内存”和“计算用内存”的共享,从而提高内存的利用率。

本篇博文基本上算是把这篇论文用大白话翻译了一遍,但我们必须注意到,这篇文章已经发布很多年了,Spark也有了长足的发展,DataFrame伴随着SparkSQL被提出,Dataset的概念也伴着1.6版本诞生,这些RDD的升级版无疑可以更灵活、更有效地解决分布式计算中的关键问题。我想,未来的文章中,我们将会顺着1.6版本的源代码,看一看RDD、DataFrame、Dataset到底是什么样的东西,以及他们到底相对于RDD有哪些改进。

[1]Zaharia M, Chowdhury M, Das T, et al. Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing[J]. In-Memory Cluster Computing”. USENIX Symposium on Networked Systems Design and Implementation (NSDI, 2012, 70(2):141-146.

    原文作者:葡萄喃喃呓语
    原文地址: https://www.jianshu.com/p/816be95f3000
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞