spark算子:combineByKey

假设我们有一组个人信息,我们针对人的性别进行分组统计,并进行统计每个分组中的记录数。

scala> val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy"))
      people: List[(String, String)] = List((male,Mobin), (male,Kpop), (female,Lucy), (male,Lufei), (female,Amy))

scala> val rdd = sc.parallelize(people)
      rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> val combinByKeyRDD = rdd.combineByKey(
           |   (x: String) => (List(x), 1),
           |   (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1),
           |   (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[String], Int))] = ShuffledRDD[1] at combineByKey at <console>:25

scala> combinByKeyRDD.foreach(println)
      (female,(List(Lucy, Amy),2))
      (male,(List(Mobin, Kpop, Lufei),3))
scala>

输出步骤:

Partition1:
K="male"  -->  ("male","Mobin")  --> createCombiner("Mobin") =>  peo1 = (  List("Mobin") , 1 )
K="male"  -->  ("male","Kpop")  --> mergeValue(peo1,"Kpop") =>  peo2 = (  "Kpop"  ::  peo1_1 , 1 + 1 )    //Key相同调用mergeValue函数对值进行合并
K="female"  -->  ("female","Lucy")  --> createCombiner("Lucy") =>  peo3 = (  List("Lucy") , 1 )

Partition2:
K="male"  -->  ("male","Lufei")  --> createCombiner("Lufei") =>  peo4 = (  List("Lufei") , 1 )
K="female"  -->  ("female","Amy")  --> createCombiner("Amy") =>  peo5 = (  List("Amy") , 1 )

Merger Partition:
K="male" --> mergeCombiners(peo2,peo4) => (List(Lufei,Kpop,Mobin))
K="female" --> mergeCombiners(peo3,peo5) => (List(Amy,Lucy))

上边的信息中,个人信息中只有一个值,如果value是元组的话,需要定义出一个type:

scala>       val people = List(("male", ("Mobin",89)),("male", ("Kpop",98)),("female", ("Lucy",99)),("male", ("Lufei",77)),("female", ("Amy",97)))
scala> val rdd = sc.parallelize(people) rdd: org.apache.spark.rdd.RDD[(String, (String, Int))] = ParallelCollectionRDD[2] at parallelize at <console>:23 scala> type MVType = (String, Int) defined type alias MVType scala> val combinByKeyRDD = rdd.combineByKey( | (x: MVType) => (List(x), 1), | (peo: (List[MVType], Int), x:MVType) => (x :: peo._1, peo._2 + 1), | (sex1: (List[MVType], Int), sex2: (List[MVType], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[(String, Int)], Int))] = ShuffledRDD[3] at combineByKey at <console>:27 scala> combinByKeyRDD.foreach(println) (male,(List((Mobin,89), (Kpop,98), (Lufei,77)),3)) (female,(List((Lucy,99), (Amy,97)),2))

 

    原文作者:spark
    原文地址: https://www.cnblogs.com/yy3b2007com/p/7806277.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞