文章地址:http://www.haha174.top/article/details/255827
项目源码:https://github.com/haha174/spark.git
这篇主要通过两个例子介绍一下top N.
1.对文本文件内的数字,取最大的前3个。
下面给出一个top文本
3
5
6
7
1
4
5
6
9
0
3
下面对这个文本取出最大的前三个。
下面给出java 示例:
public class TOP3 {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("TOP3").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);
JavaRDD<String> lines=sc.textFile("C:\\Users\\haha174\\Desktop\\data\\sparkdata\\top3.txt");
// 第一个是call 的参数类型 第二个 第三个是 Tuple2的类型。
JavaPairRDD<Integer,String> numbers=lines.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String s) throws Exception {
return new Tuple2<Integer, String>(Integer.valueOf(s),s);
}
});
JavaPairRDD<Integer,String> softed=numbers.sortByKey(false);
JavaRDD<Integer> resultRDD=softed.map(new Function<Tuple2<Integer, String>, Integer>() {
@Override
public Integer call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return integerStringTuple2._1;
}
});
List<Integer> result=resultRDD.take(3);
for (Integer i:result){
System.out.println(i);
}
sc.close();
}
}
下面给出scala 示例
object TOP3 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("TOP3");
val sc=new SparkContext(conf);
var lines=sc.textFile("C:\\Users\\haha174\\Desktop\\data\\sparkdata\\top3.txt");
var linesPair=lines.map{lines=>(Integer.valueOf(lines),lines)}
var softed=linesPair.sortByKey(false);
val resultRDD=softed.map(lines=>lines._1)
val result=resultRDD.take(3)
for ( i<-result)
println(i)
}
}
2.对每个班级内的学生成绩,取出前三名(分组取topN).
第二个demo 比第一个稍微难那么一丢丢。主要多了一个分组排序
数据示例如下
class1 90
class2 56
class1 87
class1 76
class2 88
class1 95
class1 74
class2 87
class2 67
class2 77
下面给出java 示例:
public class GroupTop3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("TOP3").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("C:\\Users\\haha174\\Desktop\\data\\sparkdata\\score.txt");
JavaPairRDD<String,Integer> numbers=lines.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
String str[]=s.split(" ");
return new Tuple2<String, Integer>(str[0],Integer.valueOf(str[1]));
}
});
JavaPairRDD<String,Iterable<Integer>> groupByKey=numbers.groupByKey();
JavaPairRDD<String,Iterable<Integer>> top3Score=groupByKey.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
Integer[] top3=new Integer[3];
String name=stringIterableTuple2._1;
Iterator<Integer> scores=stringIterableTuple2._2.iterator();
while (scores.hasNext()){
Integer score=scores.next();
for(int i=0;i<3;i++){
if(top3[i]==null){
top3[i]=score;
break;
}else if(score>top3[i]){
for(int j=2;j>i;j--){
top3[j]=top3[j-1];
}
top3[i]=score;
break;
}
}
}
return new Tuple2<String, Iterable<Integer>>(name, Arrays.asList(top3));
}
});
top3Score.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
@Override
public void call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
System.out.print("name "+stringIterableTuple2._1);
Iterator<Integer> score=stringIterableTuple2._2.iterator();
while (score.hasNext()){
System.out.print(score.next()+" ");
}
System.out.println();
}
});
}
}
下面给出scala 示例:
object TOP3 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("TOP3");
val sc=new SparkContext(conf);
var lines=sc.textFile("C:\\Users\\haha74\\Desktop\\data\\sparkdata\\score.txt");
val pairs = lines.map { x =>
{
val splited = x.split(" ")
(splited(0), splited(1).toInt)
}
}
val groupedPairs = pairs.groupByKey();
val top3Score = groupedPairs.map(classScores => {
val top3 = Array[Int](-1, -1, -1)
val className = classScores._1
val scores = classScores._2
for (score <- scores) {
breakable{
for (i <- 0 until 3) {
if (top3(i) == -1) {
top3(i) = score;
break();
} else if (score > top3(i)) {
var j = 2
while (j > i) {
top3(j) = top3(j - 1);
j = j - 1
}
top3(i) = score;
break();
}
}
}
}
(className, top3);
})
top3Score.foreach(x => {
println(x._1)
val res = x._2
for (i <- res) {
println(i)
}
println("==========================")
})
}
}
欢迎关注,更多福利
这里写图片描述