MapReduce按照两个字段对数据进行排序

按照k2排序,要求k2必须是可以比较的,即必须实现WritableComparable接口。

但是如果还想让别的字段(比如v2中的一些字段)参与排序怎么办?

需要重新定义k2….把需要参与排序的字段都放到k2中.

这块用代码实现:

假如数据现在的结构是

3       3

3       2

3       1

2       2

2       1

1       1

看代码:

 1 import java.io.DataInput;
 2 import java.io.DataOutput;
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.NullWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.WritableComparable;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 
17 public class TwoIntSortApp {
18     
19     public static void main(String[] args) throws Exception {
20         Job job = Job.getInstance(new Configuration(), TwoIntSortApp.class.getSimpleName());
21         job.setJarByClass(TwoIntSortApp.class);
22         FileInputFormat.setInputPaths(job, args[0]);
23         
24         job.setMapperClass(TwoIntSortMapper.class);
25         job.setMapOutputKeyClass(TwoInt.class);
26         job.setMapOutputValueClass(NullWritable.class);
27         
28         job.setReducerClass(TwoIntSortReducer.class);
29         job.setOutputKeyClass(TwoInt.class);
30         job.setOutputValueClass(NullWritable.class);
31 
32         FileOutputFormat.setOutputPath(job, new Path(args[1]));
33         job.waitForCompletion(true);        
34     }
35     
36     public static class TwoIntSortMapper extends Mapper<LongWritable, Text, TwoInt, NullWritable>{
37         TwoInt k2 = new TwoInt();
38         @Override
39         protected void map(LongWritable key, Text value,
40                 Mapper<LongWritable, Text, TwoInt, NullWritable>.Context context)
41                 throws IOException, InterruptedException {
42             String[] splited = value.toString().split("\t");
43             k2.set(splited[0],splited[1]);
44             context.write(k2, NullWritable.get());
45             System.out.println("Mapper-----第一个数:"+k2.first+" 第二个数:"+k2.second);
46         }
47     }
48     
49     public static class TwoIntSortReducer extends Reducer<TwoInt, NullWritable, TwoInt, NullWritable>{
50         int i=1;
51         @Override
52         protected void reduce(TwoInt k2, Iterable<NullWritable> arg1,
53                 Reducer<TwoInt, NullWritable, TwoInt, NullWritable>.Context context)
54                 throws IOException, InterruptedException {
55             context.write(k2,NullWritable.get());
56             System.out.println("调用次数"+(i++));
57             System.out.println("Reducer-----第一个数:"+k2.first+" 第二个数:"+k2.second);
58         }
59     }
60     
61     public static class TwoInt implements WritableComparable<TwoInt>{
62         int first;
63         int second;
64         public void write(DataOutput out) throws IOException {
65             out.writeInt(first);
66             out.writeInt(second);
67         }
68         
69         public void set(String s1,String s2){
70             this.first = Integer.parseInt(s1);
71             this.second = Integer.parseInt(s2);
72         }
73 
74         public void readFields(DataInput in) throws IOException {
75             this.first = in.readInt();
76             this.second = in.readInt();
77             
78         }
79 
80         public int compareTo(TwoInt o) {
81             int r1 = this.first - o.first;
82             if(r1 < 0){
83                 return -1;
84             }else if(r1 > 0){
85                 return 1;
86             }
87             int r2 = this.second - o.second;
88             return  (r2 < 0 ? -1 : (r2 > 0 ? 1 : 0)); 
89         }
90         
91         @Override
92         public String toString() {
93             return this.first+"\t"+this.second;
94         }
95     }
96 }

 //==============================================================

在job上设置Combiner类…

        
        job.setCombinerClass(TwoIntSortReducer.class);//设置Combiner类
        job.setGroupingComparatorClass(MyGroupingCompartor.class);//设置自定义的分组类

  

 1     public static class MyGroupingCompartor extends WritableComparator{
 2         @Override
 3         public int compare(WritableComparable a, WritableComparable b) {
 4             TwoInt aa = (TwoInt)a; 
 5             TwoInt bb = (TwoInt)b; 
 6             return aa.first-bb.first<0?-1:(aa.first-bb.first>0?1:0);//只要是第一列相同的就认为是一个分组.
 7             /*
 8              * 1    1
 9              * 2    1
10              * 2    2
11              * 3    1
12              * 3    2
13              * 3    3
14              * 这样就分成了三组
15              */
16         }
17     }

 

    原文作者:MapReduce
    原文地址: https://www.cnblogs.com/DreamDrive/p/5678175.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞