spark PairRDD 键值对操作

CombineByKey 基于键聚合

这是最基本的聚合操作, 很多封装的函数都是基于它, 但能用更方便的函数就不要使用它.

package cn.zb;

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.io.Serializable;
import java.util.Arrays;

@Slf4j
public class CombineByKey {


    static class AvgCount implements Serializable {
        public int total;
        public int num;

        public AvgCount(int total, int num) {
            this.total = total;
            this.num = num;
        }

        public float avg() {
            return total / (float) num;
        }
    }

    public static void main(String[] args) {
        String inputFile = args[0];
        String outputFile = args[1];

        log.info("输入的文件是:{}, 输出文件:{}", inputFile, outputFile);

        SparkConf conf = new SparkConf().setAppName("wordcount");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile(inputFile);
        JavaPairRDD<String, AvgCount> wordCountPair = lines
                .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                .mapToPair(word -> new Tuple2<>(word, 1))
                .combineByKey(
                        // 创建当前分区当前Key的Combiner, 每个分区每个Key只有一个
                        v -> new AvgCount(v, 1),

                        // 把当前分区相同的Key的Value叠加进来(与累加器一起)
                        (avgCount, v) -> {
                            avgCount.total += v;
                            avgCount.num++;
                            return avgCount;
                        },

                        // 把所有分区进行叠加
                        (a1, a2) -> {
                            a1.num += a2.num;
                            a1.total += a2.total;
                            return a1;
                        }
                );

        wordCountPair.mapValues(avgCount -> avgCount.total)
                .saveAsTextFile(outputFile);

        sc.stop();
    }
}

Join 连接操作

有join 内连接; leftOutJoin 左外连接和右外连接, 和数据库的连接查询原理相同.

package cn.zb;

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * 连接操作
 */
@Slf4j
public class PairRDDJoin {


    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("wordcount");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List shopAddress = Arrays.asList(
                new Tuple2<>("chaoshifa", "shangdi"),
                new Tuple2<>("hualian", "xierqi"),
                new Tuple2<>("woerma", "changping")
        );
        List shopRattings = Arrays.asList(
                new Tuple2<>("chaoshifa", "1"),
                new Tuple2<>("woerma", "3")
        );

        // 从内存数据中创建2个PairRDD
        JavaPairRDD addressPair = sc.parallelizePairs(shopAddress);
        JavaPairRDD rattingPair = sc.parallelizePairs(shopRattings);

        JavaPairRDD joinResult = addressPair.join(rattingPair);
        Map map = joinResult.collectAsMap();
        log.info("joinResult, 内连接的结果:{}", map);

        Map leftJoinResult = addressPair.leftOuterJoin(rattingPair).collectAsMap();
        log.info("leftJoin, 左外连接:{}", leftJoinResult);

        Map rightJoinResult = addressPair.rightOuterJoin(rattingPair).collectAsMap();
        log.info("rightJoin, 右外连接:{}", rightJoinResult);
    }
}

执行结果:

18/01/23 19:27:10 INFO PairRDDJoin: joinResult, 内连接的结果:{chaoshifa=(shangdi,1), woerma=(changping,3)}

18/01/23 19:27:10 INFO PairRDDJoin: leftJoin, 左外连接:{hualian=(xierqi,Optional.empty), chaoshifa=(shangdi,Optional[1]), woerma=(changping,Optional[3])}

18/01/23 19:27:11 INFO PairRDDJoin: rightJoin, 右外连接:{chaoshifa=(Optional[shangdi],1), woerma=(Optional[changping],3)}

sortByKey 数据排序

package cn.zb;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class WordCount {

    public static void main(String[] args) {
        String inputFile = args[0];
        String outputFile = args[1];

        SparkConf sparkConf = new SparkConf().setAppName("wordcount");
        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
        // 获取文件行
        JavaRDD<String> lines = sparkContext.textFile(inputFile);
        // 转化为单词
        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        // 转换为键值对
        JavaPairRDD<String, Integer> counts = words.mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey((v1, v2) -> v1 + v2);
        // 根据键排序
        JavaPairRDD<String, Integer> sortedCount = counts.sortByKey();
        // 保存文件
        sortedCount.saveAsTextFile(outputFile);
        sparkContext.stop();
    }
}
点赞