Spark RDD键值对操作

1.Pair RDD的转化操作

以键值对集合{(1,2),(3,4),(3,6)}为例

1.reduceByKey(func)    合并具有相同键的值
   rdd.reduceByKey((x,y) => x+y) 结果:{(1,2),(3,10)}
2.groupByKey()    对具有相同键的值进行分组 
  rdd.groupByKey()  结果:{(1,[2]),(3,[4,6])}
3.mapValues(func) 对pair RDD 中的每个值应用一个函数而不改变键
  rdd.mapValues(x => x+1) 结果:{(1,3),(3,5),(3,7)}
 **==》等价于rdd.map( case (x,y) :(x,func(y) )**
4.flatMapValues(func) 对pair RDD 中的每个值应用一个返回迭代器的函数,
然后对返回的每个元素都生成一个对应原键的键值对记录。通常用于符号化。
  rdd.flatMapValues(x=> (x to 5) ) 结果:{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}
5.keys()  返回一个仅包含键的RDD  
    rdd.keys() 结果:{1,3,3}
6.values()   返回一个仅包含值的RDD 
    rdd.values()  {2,4,6}
7.sortByKey() 返回一个根据键排序的RDD
   rdd.sortByKey() 结果: {(1,2),(3,4),(3,6)}
8.filter(func)  对元素进行过滤 因为pairRDD 也是RDD,是Tuple2对象
  pairs.filter{case (key,value) => value.length < 20}
9.**conbineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)  
    使用不同的返回类型合并具有相同键的值
例子:这个比较难
val result = input.combineByKey((v) => (v,1),
          (acc: (Int,Int),v) =>acc._1+v,acc._2+1),
          (acc1:(Int,Int),acc2:(Int,Int)) => (acc1._2+acc2._2,acc1._2+acc2._2)
          ).map(key,value._1/value._2.toFloat)
result.collectAsMap().map(println(_))
注意:三个函数对应了三个过程
(1)def createCombiner(value): (value,1)
(2)def mergeValue(acc,value) :(acc[0]+value,acc[1]+1)
(3) mergeCombiners(acc1,acc2) :(acc1[0]+acc1[0],acc1[1]+acc2[1])

2.针对两个pair RDD的转化操作 rdd={(1,2),(3,4),(3,6)} other={(3,9)}

1.subtractByKey 删除掉RDD中键与other RDD中的键相同的元素 
  rdd.subtractByKey(other)  结果:{(1,2)}
2.join  对连个RDD进行内连接
 rdd.join(other)   结果: {(3,(4,9)),(3,(4,9))}  
3.rightOuterJoin   对另个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接)
  rdd.rightOuterJoin(other)  结果:{(3,(some(4),9)),(3,(some(6),9))}
4.leftOuterJoin  对两个RDD进行连接操作,确保第二个RDD键必须存在(左外连接)
 rdd.leftOuterJoin(other)  结果:{ (1,(2,None)),(3,(4,some(9)),(3,(6,some(9))) }
5.cogroup  将两个RDD中拥有相同键的数据分组到一起
    rdd.cogroup(other)   结果:{(1,([2],[])),(3,([4,6],[9]))}

可以进行并行对调优

val data = Seq( ("a",3),("b",4),("a",1) ) 
sc.parallelize(data).reduceByKey( (x,y) => x+y,10 ) 并行对为10

自定义排序

    val data = List((1,4),(4,8),(0,4),(12,8))
    val rdd=sc.parallelize(data)
    implicit val sortIntegerByString = new Ordering[Int] {
         override def compare(Integer a, Integer b) {
             return String.valueOf(a).compareTo(String.valueOf(b))
         } 
   }
  rdd.sortByKey( sortIntegerByString  )

3.pairRDD 的行动操作

以键值对集合{(1,2),(3,4),(3,6)}

  1.countByKey()  对每个键对应的元素分别计数
    rdd.countByKey()  结果:{(1,1),(3,2)}
  2. collectAsMap()  将结果以映射表的形式返回,以便查询 
    rdd.collectAsMap()  结果:Map{(1,2),(3,6)}
  3.lookup(key)  返回给定键对应的所有值
    rdd.lookup(3)  结果:[4,6]
    原文作者:lmem
    原文地址: https://www.jianshu.com/p/b3caf825a4b9
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞