mapreduce之自定义排序算法

有人说mapreduce中不是有一个自动排序和分组(按key排序和分组)的嘛,我们为什么还需要自己写排序算法呢?

因为很多时候这种自动排序无法满足我们的需求,所以我们需要自定义排序算法!

需求1:

#首先按照第一列升序排列,当第一列相同时,第二列升序排列
3 3
3 2
3 1
2 2
2 1
1 1
—-结果—————
1 1
2 1
2 2
3 1
3 2
3 3

需求2:
#:第一列降序排列,第一列相同时,第二列升序排列
3 1
3 2
3 3
2 1
2 2
1 1

—-结果—————–

3       1
3       2
3       3
2       1
2       2
1       1

需求已出来,接下来我们实现!

package sort;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class MySortApp {
	
	public static final String INPUT_PATH="hdfs://192.168.0.9:9000/data";
	public static final String OUTPUT_PATH="hdfs://192.168.0.9:9000/sortData";
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf,MySortApp.class.getSimpleName());
		
		//设置输入路径
		FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
		//设置输入格式化
		job.setInputFormatClass(TextInputFormat.class);
		
		//设置自定义map
		job.setMapperClass(MyMapper.class);
		//设置map输出类型
		job.setMapOutputKeyClass(MyK2.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		//分区
		job.setPartitionerClass(HashPartitioner.class);
		//设置reduce任务
		job.setNumReduceTasks(1);
		
		//排序、分组
		
		//规约
		
		//设置自定义reduce类
		job.setReducerClass(MyReduce.class);
		//设置reduce输出类型
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		
		//删除已存在的路径
		FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		Path path=new Path(OUTPUT_PATH);
		if(fileSystem.exists(path)){
			fileSystem.delete(path,true);
		}
		
		//设置输出路径
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		//设置输出格式化类
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//提交任务
		job.waitForCompletion(true);
	}
	
	static class MyMapper extends Mapper<LongWritable, Text, MyK2, LongWritable>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("\t");
			MyK2 myK2 = new MyK2(Long.parseLong(split[0]), Long.parseLong(split[1]));
			context.write(myK2, new LongWritable(Long.parseLong(split[1])));
		}
	}
	
	static class MyReduce extends Reducer<MyK2, LongWritable,LongWritable, LongWritable>{
		@Override
		protected void reduce(MyK2 myk2, Iterable<LongWritable> v2s,Context context)
				throws IOException, InterruptedException {
			context.write(new LongWritable(myk2.myk2), new LongWritable(myk2.myv2));
		}
	}
}

package sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class MyK2 implements WritableComparable<MyK2>{

	public long myk2;
	public long myv2;
	
	public MyK2() {
		// TODO Auto-generated constructor stub
	}

	public MyK2(long myk2, long myv2) {
		this.myk2 = myk2;
		this.myv2 = myv2;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(myk2);
		out.writeLong(myv2);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.myk2=in.readLong();
		this.myv2=in.readLong();
	}
	
	/**
	 * #首先按照第一列升序排列,当第一列相同时,第二列升序排列
	 * 当k2进行排序时,会调用该方法.
	 * 当第一列不同时,升序;当第一列相同时,第二列升序
	 */
	/*
	@Override
	public int compareTo(MyK2 my) {
		long temp=this.myk2-my.myk2;
		if(temp!=0){
			return (int) temp;
		}
		return (int) (this.myv2-my.myv2);
	}
	*/
	/**
	 * ###作业:第一列降序排列,第一列相同时,第二列升序排列
	 * 当k2进行排序时,会调用该方法
	 * 第一列:降序,当第一列相同时,第二列:升序
	 */
	@Override
	public int compareTo(MyK2 my) {
		long temp=this.myk2-my.myk2;
		if(temp>0){
			temp=-1;
			return (int) temp;
		}else if(temp<0){
			temp=1;
			return (int) temp;
		}
		return (int) (this.myv2-my.myv2);
	}

	/* (非 Javadoc) 
	 *  
	 *  
	 * @param obj
	 * @return 
	 * @see java.lang.Object#equals(java.lang.Object) 
	 */ 
	@Override
	public boolean equals(Object obj) {
		// TODO Auto-generated method stub
		return super.equals(obj);
	}

	/* (非 Javadoc) 
	 *  
	 *  
	 * @return 
	 * @see java.lang.Object#hashCode() 
	 */ 
	@Override
	public int hashCode() {
		// TODO Auto-generated method stub
		return super.hashCode();
	}
	
	

}

《mapreduce之自定义排序算法》

呵呵,基本功能实现!(thanks 超哥)

    原文作者:排序算法
    原文地址: https://blog.csdn.net/zwx19921215/article/details/20712839
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞