Spark Core - 高效的使用 RDD join

Spark 作为分布式的计算框架,最为影响其执行效率的地方就是频繁的网络传输。所以一般的,在不存在数据倾斜的情况下,想要提高 Spark job 的执行效率,就尽量减少 job 的 shuffle 过程(减少 job 的 stage),或者退而减小 shuffle 带来的影响,join 操作也不例外。

所以,针对 spark RDD 的 join 操作的使用,提供一下几条建议:

  1. 尽量减少参与 join 的 RDD 的数据量。
  2. 尽量避免参与 join 的 RDD 都具有重复的key。
  3. 尽量避免或者减少 shuffle 过程。
  4. 条件允许的情况下,使用 map-join 完成 join。

我们来举个例子,现在一共有两个 RDD,一个是元素为 (String, Double) 类型的 userScoresRDD, 其 key 代表用户 id,其 value 代表用户游戏的历史分数,id 与分数为一对多的关系。另一个为元素为 (String, String) 的 userMobileRDD, 其 key 代表用户的id,value 代表用户的手机号码。我们现在需要得到每个用户的最高分以及其手机号,使得可以使用短信的方式向每个用户告知其最高的游戏记录(发短信有些浪费了)。

尽量减少参与 join 的 RDD 的数据量

按照套路,先举个反例,如下:

代码 1

  def joinGetUserBestScoreWithMobile1(userScoresRDD: RDD[(String, Double)],
                                     userMobileRDD: RDD[(String, String)])
                                        : RDD[(String, (Double, String))] = {
    val userScoreAndMobile = userScoresRDD.join(userMobileRDD)
    userScoreAndMobile.reduceByKey((x, y) => if (x._1 > y._1) x else y)
  }

在上面的例子中,先进行的 join 操作,在用户的每条游戏记录上都添加了一枚手机号,然后在带着手机号的 RDD 上通过 reduceByKey 得到每个用户最高分已经手机号。

这样做明显会影响效率,我们明显可以先算出每个用户的最高分,然后在去得到他的手机号:

代码 2

  def joinGetUserBestScoreWithMobile2(userScoresRDD: RDD[(String, Double)],
                                      userMobilesRDD: RDD[(String, String)])
                                        : RDD[(String, (Double, String))] = {
    val userBestScore = userScoresRDD.reduceByKey((x, y) => if (x > y) x else y)
    userBestScore.join(userMobilesRDD)
  }

两种都使用的reduceByKey,但后者会明显减少参与 join 操作的数据量,即减少了shuffle 的时间,又减少了计算的时间,增加效率,降低了数据的冗余。

尽量避免参与 join 的 RDD 都具有重复的key

此条建议是为了避免发生两个RDD full join 而笛卡尔积的情况。

在我们的例子中,假如每个用户都拥有多个手机号,为了避免 full join 而使数据暴增,我们可以在代码2的基础上,先对 userMobilesRDD 使用 combileByKey 进行处理,减少重复的 key。

尽量避免或者减少 shuffle 过程

Join 怎么才能避免或减少 shuffle 操作呢? 我们知道只有父子RDD的依赖关系为宽依赖的时候,才会发生shuffle,所以关键就是控制父子RDD的依赖关系。join 操作有两个父RDD(即被join的RDD),一个子RDD(join后的结果),首先需要了解一下join操作时依赖的判断过程。下面即为过程源代码:

Spark 源代码 org.apache.spark.rdd.CoGroupedRDD

  override def getDependencies: Seq[Dependency[_]] = {
    rdds.map { rdd: RDD[_] =>
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](
          rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
      }
    }
  }

其中 part 为 join 所使用的分区器,rdds 为参加 join 的RDD。通过代码,我们就可以了解到,当父RDD与 join操作 使用相同的分区器的时候,父子RDD才会建立窄依赖(OneToOneDependency)关系,否则就使用宽依赖关系,并且 shuffle 使用join操作的分区器来进行分区。

所以最差情况下,如下图一,两个父RDD的分区器与 join 使用的分区都不相同(一般是父RDD的分区器都为 None),两个父RDD到子RDD,都会进行shuffle操作:

《Spark Core - 高效的使用 RDD join》 图一

好一点的情况,如下图二,即只有一个父RDD的分区器与 join操作 所使用的相同。这样只会在一个RDD上发生 shuffle。

《Spark Core - 高效的使用 RDD join》 图二

最后就是最完美的情况,两个父RDD的分区器都与join操作使用的分区器相同。如下图三,不会发生任何shuffle操作:

《Spark Core - 高效的使用 RDD join》 图三

所以,我们可以实际情况,至少减少一次不必要的 shuffle 操作。

下一步我们要做的就是指定父RDD与 join 操作的的分区器为相同的。我们知道,许多的宽依赖操作都可以为其指定分区器,以决定其生成的RDD所使用的分区器,比如 reduceByKey。当然 join 操作也例外,所以我们可以在 join 的时候传入指定的分区器,这样来达到我们想要减少 shuffle 的目的。但是,当我们为两个父RDD指定了相同的分区器的时候,就不需要再为 join 操作传入指定的分区器,这是因为join操作会拿到两个父RDD的中分区器中分区数多的那个分区器作为默认分区器。

关于 join 操作获取默认分区器的详细,具体请看源代码(org.apache.spark.Partitioner 的 defaultPartitioner)

实践一下

让我们回到我们的例子,可以发现我们在使用 reduceByKey 生成 userBestScoreRDD 的时候,使用的是 userMobilesRDD 的分区器(或者在 join 时将要被使用的分区器)。

  def joinGetUserBestScoreWithMobile4(userScoresRDD: RDD[(String, Double)],
                                      userMobilesRDD: RDD[(String, String)]): RDD[(String, (Double, Option[String]))] = {
    // 如果 userMobilesRDD 存在已知的 partitioner,就直接获取
    // 没有就构建返回 userMobilesRDD 将要默认使用的 HashPartitioner.
    val mobileRDDPartitioner = userMobilesRDD.partitioner match {
      case (Some(p)) => p
      case (None) => new HashPartitioner(userMobilesRDD.partitions.length)
    }
    //
    val userBestScoreRDD = userScoresRDD.reduceByKey(mobileRDDPartitioner,
                                               (x,y) => if (x > y) x else y)
    // 在做 join 的时候。至少省去了一次 shuffle 的所带来的代价。
    userBestScoreRDD.join(userMobilesRDD)
  }

仔细分析的话,在整个joinGetUserBestScoreWithMobile4方法里,相比于之前的代码示例,我们至少减少了一次shuffle操作。这取决于userMobilesRDD的分区器情况。如果userMobilesRDD没有分区器(为None),则userMobilesRDD在参与join的时候会进行 shuffle 操作,而userBestScoreRDD则不会发生shuffle操作。则一共的shuffle次数为2(加上 reduceByKey 一次).这也就是我们所说的“好一点的情况”。

如果userMobilesRDD已经有了分区器,则 userMobilesRDD 与 userBestScoreRDD 在join的时候都不需要shuffle,所以仅仅 reduceByKey 进行了一次shuffle.这也就是我们所说的“完美情况”。

两个分区器怎样才叫做相同,具体要看分区器 equals 方法的实现,以HashPartitioner为例,分区数相同,分区器就相同。

条件允许的情况下,使用 map-join 完成 join

Map join 想必都很熟悉,就不在写介绍了。Spark core 没有提供 map-join 的实现,具体的实现方案就是将小的 RDD 持久化到driver中后,广播到大RDD的各个分区中,自己实现 join 操作。较为通用的代码如下:

  def manualBroadCastHashJoin[K: ClassTag, V1: ClassTag, V2: ClassTag](
                                      smallRDD: RDD[(K, V1)],
                                      bigRDD: RDD[(K, V2)],
                                      sc: SparkContext): RDD[(K, (V1, V2))] = {
    val smallDataLocaled: Map[K, V1] = smallRDD.collectAsMap()
    bigRDD.sparkContext.broadcast(smallDataLocaled)

    bigRDD.mapPartitions(p => {
      p.flatMap {
        case (k, v2) =>
          smallDataLocaled.get(k) match {
            case None => Seq.empty[(K, (V1, V2))]
            case Some(v1) => Seq((k, (v1, v2)))
          }
      }
    }, preservesPartitioning = true)
  }

End!!

    原文作者:Henvealf
    原文地址: https://www.jianshu.com/p/d3138ef01498
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞