有人说mapreduce中不是有一个自动排序和分组(按key排序和分组)的嘛,我们为什么还需要自己写排序算法呢?
因为很多时候这种自动排序无法满足我们的需求,所以我们需要自定义排序算法!
需求1:
#首先按照第一列升序排列,当第一列相同时,第二列升序排列
3 3
3 2
3 1
2 2
2 1
1 1
—-结果—————
1 1
2 1
2 2
3 1
3 2
3 3
需求2:
#:第一列降序排列,第一列相同时,第二列升序排列
3 1
3 2
3 3
2 1
2 2
1 1
—-结果—————–
3 1
3 2
3 3
2 1
2 2
1 1
需求已出来,接下来我们实现!
package sort;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class MySortApp {
public static final String INPUT_PATH="hdfs://192.168.0.9:9000/data";
public static final String OUTPUT_PATH="hdfs://192.168.0.9:9000/sortData";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf,MySortApp.class.getSimpleName());
//设置输入路径
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
//设置输入格式化
job.setInputFormatClass(TextInputFormat.class);
//设置自定义map
job.setMapperClass(MyMapper.class);
//设置map输出类型
job.setMapOutputKeyClass(MyK2.class);
job.setMapOutputValueClass(LongWritable.class);
//分区
job.setPartitionerClass(HashPartitioner.class);
//设置reduce任务
job.setNumReduceTasks(1);
//排序、分组
//规约
//设置自定义reduce类
job.setReducerClass(MyReduce.class);
//设置reduce输出类型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//删除已存在的路径
FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
Path path=new Path(OUTPUT_PATH);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
//设置输出路径
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
//设置输出格式化类
job.setOutputFormatClass(TextOutputFormat.class);
//提交任务
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, MyK2, LongWritable>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
MyK2 myK2 = new MyK2(Long.parseLong(split[0]), Long.parseLong(split[1]));
context.write(myK2, new LongWritable(Long.parseLong(split[1])));
}
}
static class MyReduce extends Reducer<MyK2, LongWritable,LongWritable, LongWritable>{
@Override
protected void reduce(MyK2 myk2, Iterable<LongWritable> v2s,Context context)
throws IOException, InterruptedException {
context.write(new LongWritable(myk2.myk2), new LongWritable(myk2.myv2));
}
}
}
package sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MyK2 implements WritableComparable<MyK2>{
public long myk2;
public long myv2;
public MyK2() {
// TODO Auto-generated constructor stub
}
public MyK2(long myk2, long myv2) {
this.myk2 = myk2;
this.myv2 = myv2;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(myk2);
out.writeLong(myv2);
}
@Override
public void readFields(DataInput in) throws IOException {
this.myk2=in.readLong();
this.myv2=in.readLong();
}
/**
* #首先按照第一列升序排列,当第一列相同时,第二列升序排列
* 当k2进行排序时,会调用该方法.
* 当第一列不同时,升序;当第一列相同时,第二列升序
*/
/*
@Override
public int compareTo(MyK2 my) {
long temp=this.myk2-my.myk2;
if(temp!=0){
return (int) temp;
}
return (int) (this.myv2-my.myv2);
}
*/
/**
* ###作业:第一列降序排列,第一列相同时,第二列升序排列
* 当k2进行排序时,会调用该方法
* 第一列:降序,当第一列相同时,第二列:升序
*/
@Override
public int compareTo(MyK2 my) {
long temp=this.myk2-my.myk2;
if(temp>0){
temp=-1;
return (int) temp;
}else if(temp<0){
temp=1;
return (int) temp;
}
return (int) (this.myv2-my.myv2);
}
/* (非 Javadoc)
*
*
* @param obj
* @return
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
// TODO Auto-generated method stub
return super.equals(obj);
}
/* (非 Javadoc)
*
*
* @return
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
// TODO Auto-generated method stub
return super.hashCode();
}
}
呵呵,基本功能实现!(thanks 超哥)