之前的文章大量的内容在和大家探讨分布式存储,接下来的章节进入了分布式计算领域。坦白说,个人之前专业的重心侧重于存储,对许多计算的内容理解可能不是和确切,如果文章中的理解有所不妥,愿虚心赐教。本篇将和大家聊一聊分布式计算的一个子集:批处理。
批处理系统通常也叫脱机系统,需要大量的输入数据,运行一个作业来处理它,并产生一些输出数据。工作通常需要一段较长的时间(从几分钟到几天)。批处理作业通常是周期性地运行的(例如,一天一次)。批处理作业的主要性能度量通常是吞吐量。
1.MapReduce
批处理是我们构建可靠、可扩展和可维护应用程序的重要组成部分。而谷歌在2004年发布的批处理算法:MapReduce,是处理大规模数据集的重要模型,虽然与为数据仓库专门开发的并行处理系统相比,MapReduce是一种相当低级的编程模型,但它依然对批处理的模型理解有很大的帮助,所以我们以MapReduce作为起点,开启我们的批处理的计算之旅。
分布式存储系统与MapReduce
MapReduce是一种相当生硬,野蛮的工具,但却十分有效。单个MapReduce作业:可以有一个或多个输入,并生成一个或多个输出。 MapReduce作业是函数式编程的模型,不会修改输入,除了生成输出之外,不会产生任何副作用。输出文件按顺序编写一次(不修改已写入文件的任何现有部分)。
MapReduce作业需要读、写文件的分布式文件系统。如:HDFS,GFS,GlusterFS,Amazon S3 等等。之后我们使用HDFS作为运行环境,但这些原则适用于任何的分布式存储系统。HDFS是基于无共享的存储集群,而共享磁盘存储由集中式存储设备实现,通常使用定制硬件和特殊的网络基础设施(如光纤通道)。所以HDFS不需要特殊的硬件,只需要由传统的数据中心网络连接的计算机。HDFS的守护进程的每台计算机上运行,将允许其他节点访问存储在该计算机上文件的数据,而中央服务器NameNode跟踪哪些文件块存储在哪台机器。因此,创建一个大的文件HDFS上,可以使用集群之中的所有计算机。
为了容忍机器和磁盘故障,可以在集群的多台机器上复制文件块。所以多台机器上的同一数据的几个副本,当然这里也可以使用纠删码技术,可以允许丢失的数据以比完全复制更低的存储开销被存储。纠删码技术类似于RAID,它在同一台机器上的多个磁盘上提供冗余。不同之处在于,对纠删码的副本进行读写时需要额外的编解码计算。
MapReduce的工作流程
MapReduce与传统的UNIX命令管道的主要区别在于,MapReduce可以跨越多台计算机并行计算,手动编写的Mapper和Reducer不需要了解输入来自何处或输出的去处,由框架来处理机器之间移动数据的复杂性。
下图展示了一个MapReduce作业的工作流程,作业的输入是HDFS的一个目录,目录内每个文件块作为一个单独的分区,由一个单独的Map任务处理,每个输入文件的大小通常是数百兆字节(取决于HDFS的块大小)。MapReduce的调度器试图在存储输入文件副本块机器上运行Mapper,只要该机器有足够内存和CPU资源来运行Map任务。通过这样的方式节省了在网络上复制文件块的带宽,减少了网络负载,利用了分布式计算的局部性原理。
MapReduce的工作数据流
应用程序代码被打包成Jar文件,上传到分布式存储系统之上,对应的节点会下载应用程序的Jar文件,然后启动Map任务并开始读取输入文件,每次将一条记录传递给Mapper的回调函数,并输出Map处理过后的键值对。Map的任务的数量取决于输入文件块的数量,但是Reduce任务的数量由作业作者配置,为了确保同一个键的所有键值对都由同一个Reducer处理,框架使用一个散列键来确定键值对应该对应的Reduce任务。
MapReduce需要对键值对进行排序,但数据集可能太大,无法用一台机器上的常规排序算法进行排序。所以,每个Map任务根据散列将键值对输出到对应的Reducer的磁盘分区,并对键值对进行排序。每当Mapper完成工作时,MapReduce调度器通知Reducer,它们可以开始从Mapper获取输出文件。Reducer从Mapper端获取对应的输出的键值对文件,并进行归并排序,保持排序顺序,这个过程称之为Shuffle。
最后,Reducer调用Reduce函数来处理这些有序的键值对,并且可以生成任意数量的输出记录,并写入分布式存储系统。这便是一次完整的MapReduce任务的全过程。
MapReduce作业的链式调度
一个MapReduce作业可以解决的问题范围是有限的。因此,MapReduce的作业需要被链接到工作流中,这样一个作业的输出就成为下一个作业的输入。Hadoop的MapReduce框架,可以隐式的通过目录名来链接:第一个MapReduc的作业配置写输出到HDFS的指定的目录,第二个MapReduce作业读取相同的目录名作为输入。从MapReduce的框架来看,它们是两个独立的工作。
只有当前一个作业成功完成时,下一个作业的输入才会被认为是有效的(失败的MapReduce作业的结果会被丢弃)。所以不同的作业之前会产生依赖关系的有向无环图,来处理这些依赖关系的工作执行,目前Hadoop有许多对批处理的调度程序,如:Oozie,Azkaban, Luigi, Airflow,等。在一个大型公司之中,许多不同的团队可能运行不同的工作,它们读取彼此的输出,所以通过工具支持管理等复杂的数据流是很重要的。
2.MapReduce作业的业务场景
我们通过一个实例,来具体了解类MapReduce作业的业务场景。如下图所示:左边是一个由日志记录的行为描述,称为用户活动,右边是一个数据库的用户用户表。
用户活动日志与用户的信息表
数据分析人员的任务可能需要将用户活动与用户的信息关联起来:分析哪些页面最受年龄组的欢迎。但是用户活动日志之中,只包含了用户的ID,而不包含完整的用户信息。这时候就需要一个Join操作,最简单的实现思路是逐一检查用户活动,并对每个用户ID来查询用户数据库,显然,这样的实现会带来很糟糕的性能表现。任务吞吐量将受到数据库服务器往返时间的限制,并且本地缓存的有效性将非常依赖于数据的分布,并行运行海量的查询可能会超出数据服务器的处理能力。为了在作业过程之中有更大的吞吐量,计算必须(尽可能地)在一台机器上进行。通过网络上随机访问请求要处理的每一条记录是十分缓慢的。此外,查询远程数据库将意味着批处理作业变得不确定,因为远程数据库中的数据随时可能会更改。
因此,更好的方法是获取用户数据库的副本(使用ETL将数据库的数据中提取到“数据仓库”),并将其放入分布式存储系统之中。这样,我们可以使用MapReduce这样的工具来更加有效地处理。如下图所示:由MapReduce框架按键对Mapper输出进行分区,然后对键值对排序时,其效果是所有活动事件和具有相同用户ID的用户记录在同一个Reducer之中并且彼此相邻。之后,Reducer可以很容易地执行实际的Join逻辑:每个用户ID都调用一次Reduce函数,输出活动的URL和用户的年龄。随后可以启动一个新的MapReduce作业来计算每个URL的查看器年龄分布,并按年龄组分组。
通过MapReduce作业来处理的业务逻辑
接下来,我们来梳理一些业务层面的细节,以及用MapReduce框架的一些细节:
业务逻辑分离
在上述的业务场景之中,最重要的就是保证同一个用户ID的活动需要汇集到同一个Reducer来进行处理,这个就是前文我们聊到Shuffle的功能,所有键值相同的键值对都会被传递到相同的目的地。MapReduce编程模型将计算的通信协作与应用程序逻辑处理分离。这就是MapReduce框架的高明之处,由MapReduce的框架本身处理所有的网络通信,业务人员专注于应用程序代码的实现,如果在这个过程之中出现了节点的故障,MapReduce透明的失败重试来确保应用程序逻辑不受影响。数据分组
数据除了Join场景之外,通过键值对对数据进行分组也是数据系统常用的操作:对所有具有相同键的记录都形成一个组,之后对组内的数据进行操作。 现在问题来了?我们怎么样使用MapReduce来实现这样的分组操作呢?实现方式也很简单,通过在Map函数之中对键值对进行改造,插入使键值对产生预期分组的Key,之后分区和排序将相同的Key汇集到同一个Reducer之中。在MapReduce上实现时,分组和Join看起来非常相似。数据倾斜
如果同一个键相关的数据量非常大,对于MapReduce框架来说可能会成为一个挑战,因为相同键会汇集到同一个Reducer进行处理。例如,在社交网络中,少数名人可能有数以百万计的追随者。(第一章我们就举过这个例子)所以在MapReduce作业之中存在数据倾斜,如何来进行补偿呢?在Pig之中,会先运行一个采样任务来确定哪个键是热的,在作业实际执行时,Mapper会把出现数据倾斜的键值对通过随机选择分发个指定的多个Reducer。而Hive的倾斜连接优化采用了另一种方法。它需要在表元数据中显式指定热键,它将与这些键相关的记录存储在元数据之中,后续对表进行操作时,采用类似于Pig的优化思路。
3.批处理的意义
前文已经讨论了MapReduce作业的工作流程,现在我们回到一个问题来:所有处理的结果是什么?为什么我们一开始就要做所有这些工作? 批处理操作的核心是对数据系统之中的数据进行解析,这类操作需要扫描大量的记录,进行分组和聚合,并输出到数据库以报告的形式呈现,通过报告给消费者或分析师进行数据决策。
同样,批处理适合建立搜索索引。谷歌最初使用MapReduce是为它的搜索引擎构建索引,通过5到10个MapReduce作业的工作流来实现实现的。如果需要执行全文搜索一组文件中,通过批处理过程是一个非常有效的方法:由每个Map任务对数据分区,之后每个Reducer建立分区索引,将索引文件写入到分布式文件系统。因为通过关键字查询搜索索引是只读操作,这些索引文件在创建后是不可变的。 如果索引的文档集发生变化,一个选项是周期性地为整个文档集重新运行整个索引工作流程,并在完成新索引文件时将以前的索引文件替换为新的索引文件。(如果只是少量文件的变化,则不适用批处理任务进行处理)
批处理的作业的将输入视为不可变且避免副作用(如向外部数据库写入),不仅实现了良好的性能,而且变得更容易维护。如果您在代码中引入了一个bug,输出错误,可以简单地回滚到以前版本的代码并重新运行该作业,并且再次输出正确的结果。更简单的解决方案,可以将旧输出保存在不同的目录中,然后简单地进行切换。由于这种易于回滚的特性,功能开发可以比在不能回滚的环境中进行得更快。有利于敏捷的软件开发。批处理将逻辑处理代码与配置分离,这里便允许优雅地重用代码:一个团队可以专注于实现逻辑处理,而其他团队可以决定何时何地运行该作业。
小结:
本篇我们梳理了MapReduce的处理框架,并探讨了许多批处理作业的特点。除了MapReduce的模型,数据系统中仍然有许多处理数据的计算模型,接下来会和大家来继续探讨数据系统之中的计算模型…………..