Spark RDD依赖的深度优先搜索

1 Overview

最近在刷刷算法题,看到经典的树搜索的算法,正巧之前记得 Spark RDD 中有一处利用 DFS 来判断 RDD 依赖关系的代码,因此专门拿出来分析一下。

2 Code

/** * Return the ancestors of the given RDD that are related to it only through a sequence of * narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains * no ordering on the RDDs returned. */
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
    val ancestors = new mutable.HashSet[RDD[_]]

    def visit(rdd: RDD[_]): Unit = {
      val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
      val narrowParents = narrowDependencies.map(_.rdd)
      val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
      narrowParentsNotVisited.foreach { parent =>
        ancestors.add(parent)
        visit(parent)
      }
    }

    visit(this)

    // In case there is a cycle, do not include the root itself     ancestors.filterNot(_ == this).toSeq
}

3 分析

代码很清晰,就是用递归的方式写完这个寻找 RDD 的 Narrow 祖先。

val ancestors = new mutable.HashSet[RDD[_]]

ancestors 是一个 Set 数据结构,用来存放已经查找过的 父 RDD。

narrowDependencies, narrowParents, narrowParentsNotVisited 三个变量,按照名字是很容易理解的,分别是找到 RDD 的窄依赖,窄依赖的父依赖以及没有被访问过的窄依赖。

最后这一段,将没有被访问过的父依赖,依次加入 ancetors 表示已经访问过了。

narrowParentsNotVisited.foreach { parent =>
    ancestors.add(parent)
    visit(parent)
}

有心的读者会发现最后一行注释。

In case there is a cycle, do not include the root itself

大意就是如果如果不去除根节点 RDD,那么 narrowParentsNotVisited 是不能被结束的,意思就是相乘了环而导致循环无法结束。

4 Test Case

// org/apache/spark/rdd/RDDSuite.scala test("getNarrowAncestors") {
    val rdd1 = sc.parallelize(1 to 100, 4)
    val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
    val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
    val rdd4 = rdd3.reduceByKey(_ + _)
    val rdd5 = rdd4.mapValues(_ + 1).mapValues(_ + 2).mapValues(_ + 3)
    val ancestors1 = rdd1.getNarrowAncestors
    val ancestors2 = rdd2.getNarrowAncestors
    val ancestors3 = rdd3.getNarrowAncestors
    val ancestors4 = rdd4.getNarrowAncestors
    val ancestors5 = rdd5.getNarrowAncestors

    // Simple dependency tree with a single branch     assert(ancestors1.size === 0)
    assert(ancestors2.size === 2)
    assert(ancestors2.count(_ === rdd1) === 1)
    assert(ancestors2.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1)
    assert(ancestors3.size === 5)
    assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4)

    // Any ancestors before the shuffle are not considered     assert(ancestors4.size === 0)
    assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 0)
    assert(ancestors5.size === 3)
    assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
    assert(ancestors5.count(_ === rdd3) === 0)
    assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 2)
}

建议可以跑一下 RDDSuite.scala 测试类中的关于 getNarrowAncestors 方法。很显然,针对第二部分的情况,窄依赖只跟踪到 shuffle 之前,也就是一个 RDD 血缘遇到 shuffle 操作,那么窄依赖的依赖链条就会重新计数。

    原文作者:runzhliu
    原文地址: https://zhuanlan.zhihu.com/p/61572439
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞