3.5 容错机制及依赖

3.5 容错机制及依赖

一般而言,对于分布式系统,数据集的容错性通常有两种方式:

1)数据检查点(在Spark中对应Checkpoint机制)。

2)记录数据的更新(在Spark中对应Lineage血统机制)。

对于大数据分析而言,数据检查点操作成本较高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低,同时会消耗大量存储资源。

Spark选择记录更新的方式。但更新粒度过细时,记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建RDD的一系列变换序列记录下来,以便恢复丢失的分区。

3.5.1 Lineage(血统)机制

每个RDD除了包含分区信息外,还包含它从父辈RDD变换过来的步骤,以及如何重建某一块数据的信息,因此RDD的这种容错机制又称“血统”(Lineage)容错。Lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做以便恢复数据。

相比其他系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据Transformation操作(如filter、map、join等)。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新计算和恢复丢失的数据分区。但这种数据模型粒度较粗,因此限制了Spark的应用场景。所以可以说Spark并不适用于所有高性能要求的场景,但同时相比细颗粒度的数据模型,也带来了性能方面的提升。

RDD在Lineage容错方面采用如下两种依赖来保证容错方面的性能:

❑ 窄依赖(Narrow Dependeny):窄依赖是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区。也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。其中,1个父RDD分区对应1个子RDD分区,可以分为如下两种情况:

■ 子RDD分区与父RDD分区一一对应(如map、filter等算子)。

■ 一个子RDD分区对应N个父RDD分区(如co-paritioned(协同划分)过的Join)。

❑ 宽依赖(Wide Dependency,源码中称为Shuffle Dependency):

❑ 宽依赖是指一个父RDD分区对应多个子RDD分区,可以分为如下两种情况:

■ 一个父RDD对应所有子RDD分区(未经协同划分的Join)。

■ 一个父RDD对应多个RDD分区(非全部分区)(如groupByKey)。

窄依赖与宽依赖关系如图3-10所示。

插图

图3-10 两种依赖关系

从图3-10可以看出对依赖类型的划分:根据父RDD分区是对应一个还是多个子RDD分区来区分窄依赖(父分区对应一个子分区)和宽依赖(父分区对应多个子分区)。如果对应多个,则当容错重算分区时,对于需要重新计算的子分区而言,只需要父分区的一部分数据,因此其余数据的重算就导致了冗余计算。

对于宽依赖,Stage计算的输入和输出在不同的节点上,对于输入节点完好,而输出节点死机的情况,在通过重新计算恢复数据的情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上追溯其祖先看是否可以重试(这就是lineage,血统的意思),窄依赖对于数据的重算开销要远小于宽依赖的数据重算开销。

窄依赖和宽依赖的概念主要用在两个地方:一个是容错中相当于Redo日志的功能;另一个是在调度中构建DAG作为不同Stage的划分点(前面调度机制中已讲过)。

依赖关系在lineage容错中的应用总结如下:

1)窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成,并且父RDD的计算结果进行hash并传到对应节点上之后,才能计算子RDD。

2)数据丢失时,对于窄依赖,只需要重新计算丢失的那一块数据来恢复;对于宽依赖,则要将祖先RDD中的所有数据块全部重新计算来恢复。所以在长“血统”链特别是有宽依赖时,需要在适当的时机设置数据检查点(checkpoint机制在下节讲述)。可见Spark在容错性方面要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。

在Spark容错机制中,如果一个节点宕机了,而且运算属于窄依赖,则只要重算丢失的父RDD分区即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。更深入地来说:在窄依赖关系中,当子RDD的分区丢失,重算其父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,因此不存在冗余计算。而在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区使用,其中有一部分数据对应的是其他不需要重新计算的子RDD分区中的数据,因此在宽依赖关系下,这样计算就会产生冗余开销,这也是宽依赖开销更大的原因。为了减少这种冗余开销,通常在Lineage血统链比较长,并且含有宽依赖关系的容错中使用Checkpoint机制设置检查点。

3.5.2 Checkpoint(检查点)机制

通过上述分析可以看出Checkpoint的本质是将RDD写入Disk来作为检查点。这种做法是为了通过lineage血统做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。

下面从代码层面介绍Checkpoint的实现。

1.设置检查点数据的存取路径[SparkContext.scala]

2.设置检查点的具体实现

  [RDD.scala]

        /* 设置检查点入口 */

private[spark] def doCheckpoint(): Unit = {

    原文作者:Albert陈凯
    原文地址: https://www.jianshu.com/p/4f40ecfb5f04
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞