aggregateByKey 这个RDD有点繁琐,整理一下使用示例,供参考
直接上代码
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} /** * Created by Edward on 2016/10/27. */ object AggregateByKey { def main(args: Array[String]) { val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey") .setMaster("local") val sc: SparkContext = new SparkContext(sparkConf) val data = List((1, 3), (1, 2), (1, 4), (2, 3)) var rdd = sc.parallelize(data,2)//数据拆分成两个分区 //合并在不同partition中的值,a,b的数据类型为zeroValue的数据类型 def comb(a: String, b: String): String = { println("comb: " + a + "\t " + b) a + b } //合并在同一个partition中的值, a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型 def seq(a: String, b: Int): String = { println("seq: " + a + "\t " + b) a + b } rdd.foreach(println)
//zeroValue 中立值,定义返回value的类型,并参与运算 //seqOp 用来在一个partition中合并值的 //comb 用来在不同partition中合并值的 val aggregateByKeyRDD: RDD[(Int, String)] = rdd.aggregateByKey("100")(seq,comb) //打印输出 aggregateByKeyRDD.foreach(println) sc.stop() } }
输出结果说明:
/* 将数据拆分成两个分区 //分区一数据 (1,3) (1,2) //分区二数据 (1,4) (2,3) //分区一相同key的数据进行合并 seq: 100 3 //(1,3)开始和中立值进行合并 合并结果为 1003 seq: 1003 2 //(1,2)再次合并 结果为 10032 //分区二相同key的数据进行合并 seq: 100 4 //(1,4) 开始和中立值进行合并 1004 seq: 100 3 //(2,3) 开始和中立值进行合并 1003 将两个分区的结果进行合并 //key为2的,只在一个分区存在,不需要合并 (2,1003) (2,1003) //key为1的, 在两个分区存在,并且数据类型一致,合并 comb: 10032 1004 (1,100321004) * */
参考代码及下面的说明进行理解
官网的说明
源码中函数的说明
/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/