CombineByKey 基于键聚合
这是最基本的聚合操作, 很多封装的函数都是基于它, 但能用更方便的函数就不要使用它.
package cn.zb;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.io.Serializable;
import java.util.Arrays;
@Slf4j
public class CombineByKey {
static class AvgCount implements Serializable {
public int total;
public int num;
public AvgCount(int total, int num) {
this.total = total;
this.num = num;
}
public float avg() {
return total / (float) num;
}
}
public static void main(String[] args) {
String inputFile = args[0];
String outputFile = args[1];
log.info("输入的文件是:{}, 输出文件:{}", inputFile, outputFile);
SparkConf conf = new SparkConf().setAppName("wordcount");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(inputFile);
JavaPairRDD<String, AvgCount> wordCountPair = lines
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.combineByKey(
// 创建当前分区当前Key的Combiner, 每个分区每个Key只有一个
v -> new AvgCount(v, 1),
// 把当前分区相同的Key的Value叠加进来(与累加器一起)
(avgCount, v) -> {
avgCount.total += v;
avgCount.num++;
return avgCount;
},
// 把所有分区进行叠加
(a1, a2) -> {
a1.num += a2.num;
a1.total += a2.total;
return a1;
}
);
wordCountPair.mapValues(avgCount -> avgCount.total)
.saveAsTextFile(outputFile);
sc.stop();
}
}
Join 连接操作
有join 内连接; leftOutJoin 左外连接和右外连接, 和数据库的连接查询原理相同.
package cn.zb;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* 连接操作
*/
@Slf4j
public class PairRDDJoin {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("wordcount");
JavaSparkContext sc = new JavaSparkContext(conf);
List shopAddress = Arrays.asList(
new Tuple2<>("chaoshifa", "shangdi"),
new Tuple2<>("hualian", "xierqi"),
new Tuple2<>("woerma", "changping")
);
List shopRattings = Arrays.asList(
new Tuple2<>("chaoshifa", "1"),
new Tuple2<>("woerma", "3")
);
// 从内存数据中创建2个PairRDD
JavaPairRDD addressPair = sc.parallelizePairs(shopAddress);
JavaPairRDD rattingPair = sc.parallelizePairs(shopRattings);
JavaPairRDD joinResult = addressPair.join(rattingPair);
Map map = joinResult.collectAsMap();
log.info("joinResult, 内连接的结果:{}", map);
Map leftJoinResult = addressPair.leftOuterJoin(rattingPair).collectAsMap();
log.info("leftJoin, 左外连接:{}", leftJoinResult);
Map rightJoinResult = addressPair.rightOuterJoin(rattingPair).collectAsMap();
log.info("rightJoin, 右外连接:{}", rightJoinResult);
}
}
执行结果:
18/01/23 19:27:10 INFO PairRDDJoin: joinResult, 内连接的结果:{chaoshifa=(shangdi,1), woerma=(changping,3)}
18/01/23 19:27:10 INFO PairRDDJoin: leftJoin, 左外连接:{hualian=(xierqi,Optional.empty), chaoshifa=(shangdi,Optional[1]), woerma=(changping,Optional[3])}
18/01/23 19:27:11 INFO PairRDDJoin: rightJoin, 右外连接:{chaoshifa=(Optional[shangdi],1), woerma=(Optional[changping],3)}
sortByKey 数据排序
package cn.zb;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class WordCount {
public static void main(String[] args) {
String inputFile = args[0];
String outputFile = args[1];
SparkConf sparkConf = new SparkConf().setAppName("wordcount");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
// 获取文件行
JavaRDD<String> lines = sparkContext.textFile(inputFile);
// 转化为单词
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
// 转换为键值对
JavaPairRDD<String, Integer> counts = words.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((v1, v2) -> v1 + v2);
// 根据键排序
JavaPairRDD<String, Integer> sortedCount = counts.sortByKey();
// 保存文件
sortedCount.saveAsTextFile(outputFile);
sparkContext.stop();
}
}