spark从入门到放弃十: top N

文章地址:http://www.haha174.top/article/details/255827
项目源码:https://github.com/haha174/spark.git
这篇主要通过两个例子介绍一下top N.

1.对文本文件内的数字,取最大的前3个。

下面给出一个top文本
3
5
6
7
1
4
5
6
9
0
3
下面对这个文本取出最大的前三个。
下面给出java 示例:

public class TOP3 {
    public static void main(String[] args) {
        SparkConf conf=new SparkConf().setAppName("TOP3").setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);
        JavaRDD<String> lines=sc.textFile("C:\\Users\\haha174\\Desktop\\data\\sparkdata\\top3.txt");
        //  第一个是call 的参数类型  第二个  第三个是   Tuple2的类型。
        JavaPairRDD<Integer,String> numbers=lines.mapToPair(new PairFunction<String, Integer, String>() {

            public Tuple2<Integer, String> call(String s) throws Exception {
                return  new Tuple2<Integer, String>(Integer.valueOf(s),s);
            }
        });
        JavaPairRDD<Integer,String> softed=numbers.sortByKey(false);
        JavaRDD<Integer> resultRDD=softed.map(new Function<Tuple2<Integer, String>, Integer>() {
            @Override
            public Integer call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                return integerStringTuple2._1;
            }
        });
       List<Integer> result=resultRDD.take(3);
       for (Integer i:result){
           System.out.println(i);
       }
        sc.close();
    }
}

下面给出scala 示例

object TOP3 {
  def main(args: Array[String]): Unit = {
      val conf=new SparkConf().setMaster("local").setAppName("TOP3");
      val sc=new SparkContext(conf);
      var lines=sc.textFile("C:\\Users\\haha174\\Desktop\\data\\sparkdata\\top3.txt");
      var linesPair=lines.map{lines=>(Integer.valueOf(lines),lines)}
      var softed=linesPair.sortByKey(false);
      val resultRDD=softed.map(lines=>lines._1)
      val result=resultRDD.take(3)
      for ( i<-result)
    println(i)

    }
}

2.对每个班级内的学生成绩,取出前三名(分组取topN).

第二个demo 比第一个稍微难那么一丢丢。主要多了一个分组排序
数据示例如下
class1 90
class2 56
class1 87
class1 76
class2 88
class1 95
class1 74
class2 87
class2 67
class2 77
下面给出java 示例:

public class GroupTop3 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("TOP3").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("C:\\Users\\haha174\\Desktop\\data\\sparkdata\\score.txt");
        JavaPairRDD<String,Integer> numbers=lines.mapToPair(new PairFunction<String, String, Integer>() {

            public Tuple2<String, Integer> call(String s) throws Exception {
                String str[]=s.split(" ");
                return  new Tuple2<String, Integer>(str[0],Integer.valueOf(str[1]));
            }
        });
        JavaPairRDD<String,Iterable<Integer>> groupByKey=numbers.groupByKey();
        JavaPairRDD<String,Iterable<Integer>> top3Score=groupByKey.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {

            public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
                Integer[] top3=new Integer[3];
                String name=stringIterableTuple2._1;
                Iterator<Integer> scores=stringIterableTuple2._2.iterator();
                while (scores.hasNext()){
                    Integer score=scores.next();
                    for(int i=0;i<3;i++){
                        if(top3[i]==null){
                            top3[i]=score;
                            break;
                        }else  if(score>top3[i]){
                            for(int j=2;j>i;j--){
                                top3[j]=top3[j-1];
                            }
                            top3[i]=score;
                            break;
                        }

                    }
                }
                return new Tuple2<String, Iterable<Integer>>(name, Arrays.asList(top3));
            }
        });
        top3Score.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
                System.out.print("name   "+stringIterableTuple2._1);
                Iterator<Integer> score=stringIterableTuple2._2.iterator();
                while (score.hasNext()){
                    System.out.print(score.next()+"   ");
                }
                System.out.println();
            }
        });
    }
}

下面给出scala 示例:

object TOP3 {
  def main(args: Array[String]): Unit = {
      val conf=new SparkConf().setMaster("local").setAppName("TOP3");
      val sc=new SparkContext(conf);
      var lines=sc.textFile("C:\\Users\\haha74\\Desktop\\data\\sparkdata\\score.txt");
    val pairs = lines.map { x =>
    {
      val splited = x.split(" ")
      (splited(0), splited(1).toInt)
    }
    }

    val groupedPairs = pairs.groupByKey();


    val top3Score = groupedPairs.map(classScores => {
      val top3 = Array[Int](-1, -1, -1)

      val className = classScores._1

      val scores = classScores._2

      for (score <- scores) {
     breakable{
          for (i <- 0 until 3) {
            if (top3(i) == -1) {
              top3(i) = score;
          break();
            } else if (score > top3(i)) {
              var j = 2
              while (j > i) {
                top3(j) = top3(j - 1);
                j = j - 1
              }
              top3(i) = score;
              break();
            }
          }
     }

      }
      (className, top3);
    })

    top3Score.foreach(x => {
      println(x._1)
      val res = x._2
      for (i <- res) {
        println(i)
      }
      println("==========================")
    })
}
}

欢迎关注,更多福利

《spark从入门到放弃十: top N》 这里写图片描述

    原文作者:意浅离殇
    原文地址: https://www.jianshu.com/p/484c38ee8ee6
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞