Spark RDD Partition 算子

这里介绍了一些 spark RDD 中比较难理解的算子,方便以后回忆。

mapPartitions

transformation转换

def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

val a = sc.parallelize(List(1,2,3))
val b = a.map(_*2)
//结果List[Int] = List(2,4,6)

该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多。

mapPartitions 最终返回一个iterator

看一个例子,生成一个List型的RDD 放到两个分区中,计算每个分区的和,放到一个List中,并返回一个迭代器。

//分到两个分区,第一个分区为1、2,第二个分区为 3、4、5
val rdd1 = sc.parallelize(List(1 to 5), 2) //1到5的list,分到两个分区
var rdd2 = rdd1.mapPartitions{
    x => {
        var result = List[Int]()
        var i = 0
        while(x.hasNext()) {
            i += x.next()
        }
        //::往集合的头部追加一个元素,这里有两个分区,即result里有两个元素
        result.::(i).iterator 
    }
}

//调用rdd的 collect方法将每个分区的数值累加
rdd2.collect
//Array[Int] = Array(3, 12)

mapPartitionsWithIndex

transformation转换

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。

var rdd3 = sc.makeRDD(1 to 5,2)
//rdd3有两个分区
var rdd4 = rdd1.mapPartitionsWithIndex{
    (x, iter) => {
        var result = List[String]()
        var i = 0
        while(iter.hasNext){
            i += iter.next()
        }
        result.::(x + " | " + i).iterator
    }
}
//rdd3将rdd4中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
rdd3.collect
//res13: Array[String] = Array(0 | 3, 1 | 12)

aggregate

action行动,聚合操作

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

有点复杂,U表示初始值,seqOp 是以U 为起始,聚合每一个分区中的元素,第二个参数对每个分区seqOp 聚合后的结果,再聚合。
直接看个例子,就能理解了:

val rdd = sc.parallelize(List(1 to 5), 2)
//起始值为0,每一个分区的元素进行相加,然后对每个分区的结果进行相加
rdd.aggregate(0)(_+_, _+_)
//res14: Int 15

aggregateByKey

和 aggregate 相比,是针对键值对的操作。
对每一个分区,将key值相同的,先聚合;再对每个分区的结果进行聚合。

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5),("dog", 12), ("mouse", 4),("cat", 12),("mouse", 2)), 2)
//对每个分区,取出相同key的最大值;并对每个分区的最大值进行相加
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
//第一个分区("cat",2), ("cat", 5),("dog", 12),聚合后: ("cat",5), ("dog", 12)
//第二个分区("mouse", 4),("cat", 12),("mouse", 2),聚合后: ("mouse",4), ("cat", 12)
//最终结果:Array((dog,12), (cat,17), (mouse,4))

combineByKey

transformation转换,和reduceByKey是相同的效果。

def combineByKey[C](createCombiner: (V) => C,  
                    mergeValue: (C, V) => C,   
                    mergeCombiners: (C, C) => C): RD
  • createCombiner
    combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建那个键对应的累加器的初始值。
  • mergeValue
    如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并。
  • mergeCombiners
    由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的
    mergeCombiners() 方法将各个分区的结果进行合并。
例子一

统计所有单词的个数

val userScores = Array(("hello", 1), ("hello", 2), ("world", 2), ("world", 1), ("hello", 3), ("hello", 2), ("world", 2), ("world", 3))
val rdd = sc.parallelize(userScores, 2) //两个分区
val rdd2 = rdd.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2.collect

下面对上面的代码做下分析:

  • x => x
    第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再做运算。
    x 是每个分区中每个key中 value中 的第一个值, (hello,1) (hello,2) (world,2) (world,1) –> (hello(1,2),world(2,1)) –> x就相当于hello的第一个1, world中的第一个2,并以此作为初始值。

  • (a: Int, b: Int) => a + b
    类似于 reduceByKey,把每个分区中 相同Key的,以上面的值作为初始值,进行相加,即 分区一:hello -> 1+2=3,world -> 2+1=3;分区二:hello -> 3+2=5,world -> 2+3=5,得到分区一:(hello,3) (world,3) ,分区二:(hello,5) (world,5)

  • (m: Int, n: Int) => m + n)
    把每个分区的结果,进行相加合并,即(hello,8) (world,8)

如果把 x => x 变成 x => x + 10,会是什么效果?
初始值会相加10,拿分区一举例:hello –> x值为1,1+10=11 –> hello -> 11+2=13,每个分区都会 加10,最终统计所有分区,就会加20

例子二

统计每种动物,把个数为1的动物 放在一个笼子,把个数为2的动物 统一放到另外个笼子。

val rdd1 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd2 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
//zip函数用于将两个RDD组合成Key/Value形式的RDD
val rdd3 = rdd1.zip(rdd2)
//:+  追加一个元素到 list 尾部,++  合并两个list
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)

上面通过 zip 合并后的新List为:
List( (1,"dog"), (1,"cat"), (2,"gnu"), (2,"salmon"), (2,"rabbit"), (1,"turkey"), (2,"wolf"), (2,"bear"), (2,"bee"))
我们的目的是最终产生 这样格式的数据:
(1, (dog, cat, turkey)), (2, (gnu, salmon, rabbit, wolf, bear, bee))

所以首先要产生一个列表,然后把key相同的value 追加到列表中

  • List(_)
    List(_) 其实就是 x => List(x),(1,”dog”) 中 1作为key 第一次出现,x 即是 dog,这里把x转化为 list,以便于在后面把相同key的 value 追加进去

  • (x: List[String], y: String) => x :+ y
    合并分区中 所有相同key 的value值到列表中
    我们的分析都是以第一个元素为例子,上面x中存放的是 (“dog”),y代表 所有相同k的 每个value 值,x :+ y 即把 y追加到 x 列表中,后面一个元素是 (1,”cat”),追加后的形式是 (“dog”, “cat”)

  • (m: List[String], n: List[String]) => m ++ n
    统计所有的分区,把每个分区的 相同key 的List,合并到一个List中

combineByKey 和 aggregateByKey 的区别

这两者有什么区别呢?细细观察,这两者是很相似的,都是先对每个分区相同的 K 进行聚合,然后把每个分区聚合的结果再聚合

不同点是初始值不一样,aggregateByKey 是手动指定一个初始值;而 combineByKey 是把第一个K出现的 value 指定一个 func,转换后的数据作为初始值。

foldByKey

transformation转换
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

对于每个 K 对应的 V,先把 zeroValue 和 V 作为 func 的参数,进行转换,得到 V’

再对 所有 相同 K 的 V’ 再进行 func 转换。看个例子:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
scala> rdd1.foldByKey(0)(_+_).collect
res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) 
//将rdd1中每个key对应的V进行累加之前,先把 V 和 zeroValue = 0    进行映射函数为   +   操作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:
//("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)

再看:

scala> rdd1.foldByKey(2)(_+_).collect
res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
//先将 zeroValue=2 应用于每个V, 得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函
//数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)
    原文作者:博弈史密斯
    原文地址: https://www.jianshu.com/p/80423529c59e
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞