sortByKey
官方文档描述:
Sort the RDD by key, so that each partition contains a sorted range of the elements in ascending order.
Calling `collect` or `save` on the resulting RDD will return or output an ordered list of records (in the `save` case,
they will be written to multiple `part-X` files in the filesystem, in order of the keys).
函数原型:
def sortByKey(): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]
源码分析:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
**
sortByKey() 将 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的数据依赖很简单,先使用 shuffle 将 records 聚集在一起(放到对应的 partition 里面),然后将 partition 内的所有 records 按 key 排序,最后得到的 MapPartitionsRDD 中的 records 就有序了。目前 sortByKey() 先使用 Array 来保存 partition 中所有的 records,再排序。
**
实例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
final Random random = new Random(100);
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<Integer, Integer>(integer,random.nextInt(10));
}
});
JavaPairRDD<Integer,Integer> sortByKeyRDD = javaPairRDD.sortByKey();
System.out.println(sortByKeyRDD.collect());
repartitionAndSortWithinPartitions
官方文档描述:
Repartition the RDD according to the given partitioner and, within each resulting partition,
sort records by their keys.This is more efficient than calling `repartition`
and then sorting within each partition because it can push the sorting down into the shuffle machinery.
函数原型:
def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V]
def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K]) : JavaPairRDD[K, V]
源码分析:
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
**
从源码中可以看出,该方法依据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序;通过对比sortByKey发现,这种方式比先分区,然后在每个分区中进行排序效率高,这是因为它可以将排序融入到shuffle阶段。
**
实例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
final Random random = new Random();JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<Integer, Integer>(integer,random.nextInt(10));
}
});
JavaPairRDD<Integer,Integer> RepartitionAndSortWithPartitionsRDD = javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
@Override
public int numPartitions() { return 2; }
@Override
public int getPartition(Object key) { return key.toString().hashCode() % numPartitions();
}
});
System.out.println(RepartitionAndSortWithPartitionsRDD.collect());