1.Pair RDD的转化操作
以键值对集合{(1,2),(3,4),(3,6)}为例
1.reduceByKey(func) 合并具有相同键的值
rdd.reduceByKey((x,y) => x+y) 结果:{(1,2),(3,10)}
2.groupByKey() 对具有相同键的值进行分组
rdd.groupByKey() 结果:{(1,[2]),(3,[4,6])}
3.mapValues(func) 对pair RDD 中的每个值应用一个函数而不改变键
rdd.mapValues(x => x+1) 结果:{(1,3),(3,5),(3,7)}
**==》等价于rdd.map( case (x,y) :(x,func(y) )**
4.flatMapValues(func) 对pair RDD 中的每个值应用一个返回迭代器的函数,
然后对返回的每个元素都生成一个对应原键的键值对记录。通常用于符号化。
rdd.flatMapValues(x=> (x to 5) ) 结果:{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}
5.keys() 返回一个仅包含键的RDD
rdd.keys() 结果:{1,3,3}
6.values() 返回一个仅包含值的RDD
rdd.values() {2,4,6}
7.sortByKey() 返回一个根据键排序的RDD
rdd.sortByKey() 结果: {(1,2),(3,4),(3,6)}
8.filter(func) 对元素进行过滤 因为pairRDD 也是RDD,是Tuple2对象
pairs.filter{case (key,value) => value.length < 20}
9.**conbineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)
使用不同的返回类型合并具有相同键的值
例子:这个比较难
val result = input.combineByKey((v) => (v,1),
(acc: (Int,Int),v) =>acc._1+v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._2+acc2._2,acc1._2+acc2._2)
).map(key,value._1/value._2.toFloat)
result.collectAsMap().map(println(_))
注意:三个函数对应了三个过程
(1)def createCombiner(value): (value,1)
(2)def mergeValue(acc,value) :(acc[0]+value,acc[1]+1)
(3) mergeCombiners(acc1,acc2) :(acc1[0]+acc1[0],acc1[1]+acc2[1])
2.针对两个pair RDD的转化操作 rdd={(1,2),(3,4),(3,6)} other={(3,9)}
1.subtractByKey 删除掉RDD中键与other RDD中的键相同的元素
rdd.subtractByKey(other) 结果:{(1,2)}
2.join 对连个RDD进行内连接
rdd.join(other) 结果: {(3,(4,9)),(3,(4,9))}
3.rightOuterJoin 对另个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接)
rdd.rightOuterJoin(other) 结果:{(3,(some(4),9)),(3,(some(6),9))}
4.leftOuterJoin 对两个RDD进行连接操作,确保第二个RDD键必须存在(左外连接)
rdd.leftOuterJoin(other) 结果:{ (1,(2,None)),(3,(4,some(9)),(3,(6,some(9))) }
5.cogroup 将两个RDD中拥有相同键的数据分组到一起
rdd.cogroup(other) 结果:{(1,([2],[])),(3,([4,6],[9]))}
可以进行并行对调优
val data = Seq( ("a",3),("b",4),("a",1) )
sc.parallelize(data).reduceByKey( (x,y) => x+y,10 ) 并行对为10
自定义排序
val data = List((1,4),(4,8),(0,4),(12,8))
val rdd=sc.parallelize(data)
implicit val sortIntegerByString = new Ordering[Int] {
override def compare(Integer a, Integer b) {
return String.valueOf(a).compareTo(String.valueOf(b))
}
}
rdd.sortByKey( sortIntegerByString )
3.pairRDD 的行动操作
以键值对集合{(1,2),(3,4),(3,6)}
1.countByKey() 对每个键对应的元素分别计数
rdd.countByKey() 结果:{(1,1),(3,2)}
2. collectAsMap() 将结果以映射表的形式返回,以便查询
rdd.collectAsMap() 结果:Map{(1,2),(3,6)}
3.lookup(key) 返回给定键对应的所有值
rdd.lookup(3) 结果:[4,6]