repartitionAndSortWithinPartitions算是一个高效的算子,是因为它要比使用repartition And sortByKey 效率高,这是由于它的排序是在shuffle过程中进行,一边shuffle,一边排序;具体见spark shuffle的读操作;
关于为什么比repartition And sortByKey 效率高,首先简要分析repartition 和sortbykey’的流程:
(1)rePartition
(2)sortByKey
repartitionAndSortWithinPartitions的使用
(1)使用repartitionAndSortWithinPartitions时,需要自己传入一个分区器参数,这个分区器 可以是系统提供的,也可以是自定义的:例如以下Demo中使用的KeyBasePartitioner,同时需要自定义一个排序的隐式变量,当我们使用repartitionAndSortWithinPartitions时,我们自定义的my_self_Ordering 排序规则就会传入到def implicitly[T](implicit e: T) = e
(2)二次排序
排序规则都需要在自定义的隐式变量my_self_Ordering中实现
private val ordering = implicitly[Ordering[K]]
//这里是使用了上下文界定,这个T就是Ordering[K]
def implicitly[T](implicit e: T) = e
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
Demo案例
val sparkConf = new SparkConf().setAppName("test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val wordsRDD: RDD[String] = sc.textFile("D:\\Spark_数据\\numbers_data.txt")
val resultRDD = wordsRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(each => (each._2, each._1))
/**
* key怎么排序,在这里定义
* 为什么在这里声明一个隐式变量呢,是因为在源码中,方法中有一个隐式参数;不设置是按照默认的排序规则进行排序;
*/
implicit val my_self_Ordering = new Ordering[String] {
override def compare(a: String, b: String): Int = {
val a_b: Array[String] = a.split("_")
val a_1 = a_b(0).toInt
val a_2 = a_b(1).toInt
val b_b = b.split("_")
val b_1 = b_b(0).toInt
val b_2 = b_b(1).toInt
if (a_1 == b_1) {
a_2 - b_2
} else {
a_1 - b_1
}
}
}
val rdd = resultRDD.map(x => (x._1 + "_" + x._2, x._2)).repartitionAndSortWithinPartitions(new KeyBasePartitioner(2))
/**
* 自定义分区器
*
* @param partitions
*/
class KeyBasePartitioner(partitions: Int) extends Partitioner {
//分区数
override def numPartitions: Int = partitions
//该方法决定了你的数据被分到那个分区里面
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[String]
Math.abs(k.hashCode() % numPartitions)
}
}