Spark RDD aggregateByKey

aggregateByKey 这个RDD有点繁琐,整理一下使用示例,供参考

 

直接上代码

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Edward on 2016/10/27.
  */
object AggregateByKey {
  def main(args: Array[String]) {
    val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey")
      .setMaster("local")
    val sc: SparkContext = new SparkContext(sparkConf)

    val data = List((1, 3), (1, 2), (1, 4), (2, 3))
    var rdd = sc.parallelize(data,2)//数据拆分成两个分区

    //合并在不同partition中的值,a,b的数据类型为zeroValue的数据类型
    def comb(a: String, b: String): String = {
      println("comb: " + a + "\t " + b)
      a + b
    }
    //合并在同一个partition中的值, a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
    def seq(a: String, b: Int): String = {
      println("seq: " + a + "\t " + b)
      a + b
    }

    rdd.foreach(println)
    
//
zeroValue 中立值,定义返回value的类型,并参与运算 //seqOp 用来在一个partition中合并值的 //comb 用来在不同partition中合并值的 val aggregateByKeyRDD: RDD[(Int, String)] = rdd.aggregateByKey("100")(seq,comb) //打印输出 aggregateByKeyRDD.foreach(println) sc.stop() } }

 

输出结果说明:

 /*
将数据拆分成两个分区

//分区一数据
(1,3)
(1,2)
//分区二数据
(1,4)
(2,3)

//分区一相同key的数据进行合并
seq: 100     3   //(1,3)开始和中立值进行合并  合并结果为 1003
seq: 1003     2   //(1,2)再次合并 结果为 10032

//分区二相同key的数据进行合并
seq: 100     4  //(1,4) 开始和中立值进行合并 1004
seq: 100     3  //(2,3) 开始和中立值进行合并 1003

将两个分区的结果进行合并
//key为2的,只在一个分区存在,不需要合并 (2,1003)
(2,1003)

//key为1的, 在两个分区存在,并且数据类型一致,合并
comb: 10032     1004
(1,100321004)

* */

 

参考代码及下面的说明进行理解 

 

官网的说明

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

源码中函数的说明 

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
    原文作者:spark
    原文地址: https://www.cnblogs.com/one--way/p/6006296.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞