假设我们有一组个人信息,我们针对人的性别进行分组统计,并进行统计每个分组中的记录数。
scala> val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy")) people: List[(String, String)] = List((male,Mobin), (male,Kpop), (female,Lucy), (male,Lufei), (female,Amy)) scala> val rdd = sc.parallelize(people) rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23 scala> val combinByKeyRDD = rdd.combineByKey( | (x: String) => (List(x), 1), | (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), | (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[String], Int))] = ShuffledRDD[1] at combineByKey at <console>:25 scala> combinByKeyRDD.foreach(println) (female,(List(Lucy, Amy),2)) (male,(List(Mobin, Kpop, Lufei),3)) scala>
输出步骤:
Partition1: K="male" --> ("male","Mobin") --> createCombiner("Mobin") => peo1 = ( List("Mobin") , 1 ) K="male" --> ("male","Kpop") --> mergeValue(peo1,"Kpop") => peo2 = ( "Kpop" :: peo1_1 , 1 + 1 ) //Key相同调用mergeValue函数对值进行合并 K="female" --> ("female","Lucy") --> createCombiner("Lucy") => peo3 = ( List("Lucy") , 1 ) Partition2: K="male" --> ("male","Lufei") --> createCombiner("Lufei") => peo4 = ( List("Lufei") , 1 ) K="female" --> ("female","Amy") --> createCombiner("Amy") => peo5 = ( List("Amy") , 1 ) Merger Partition: K="male" --> mergeCombiners(peo2,peo4) => (List(Lufei,Kpop,Mobin)) K="female" --> mergeCombiners(peo3,peo5) => (List(Amy,Lucy))
上边的信息中,个人信息中只有一个值,如果value是元组的话,需要定义出一个type:
scala> val people = List(("male", ("Mobin",89)),("male", ("Kpop",98)),("female", ("Lucy",99)),("male", ("Lufei",77)),("female", ("Amy",97)))
scala> val rdd = sc.parallelize(people) rdd: org.apache.spark.rdd.RDD[(String, (String, Int))] = ParallelCollectionRDD[2] at parallelize at <console>:23 scala> type MVType = (String, Int) defined type alias MVType scala> val combinByKeyRDD = rdd.combineByKey( | (x: MVType) => (List(x), 1), | (peo: (List[MVType], Int), x:MVType) => (x :: peo._1, peo._2 + 1), | (sex1: (List[MVType], Int), sex2: (List[MVType], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[(String, Int)], Int))] = ShuffledRDD[3] at combineByKey at <console>:27 scala> combinByKeyRDD.foreach(println) (male,(List((Mobin,89), (Kpop,98), (Lufei,77)),3)) (female,(List((Lucy,99), (Amy,97)),2))