一些疑问:
1 全排序的话,最后的应该sortJob.setNumReduceTasks(1);
2 如果多个reduce task都去修改 一个静态的 IntWritable ,IntWritable会乱序吧~
输入数据:
file1
2
32
654
32
15
756
65223
file2
5956
22
650
92
file3
26
54
6
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MySort { public static class IntSortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{ private IntWritable val = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString().trim(); val.set(Integer.parseInt(line)); context.write(val, NullWritable.get()); } } public static class IntSortReducer extends Reducer<IntWritable, NullWritable, IntWritable,IntWritable>{ private IntWritable k = new IntWritable(); public void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{ k.set(1); for (NullWritable value : values) { context.write(k, key); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String dir_in = "hdfs://localhost:9000/in_sort"; String dir_out = "hdfs://localhost:9000/out_sort"; Path in = new Path(dir_in); Path out = new Path(dir_out); Configuration conf = new Configuration(); Job sortJob = new Job(conf, "my_sort"); sortJob.setJarByClass(MySort.class); sortJob.setInputFormatClass(TextInputFormat.class); sortJob.setMapperClass(IntSortMapper.class); //sortJob.setCombinerClass(SortReducer.class); //countJob.setPartitionerClass(HashPartitioner.class); sortJob.setMapOutputKeyClass(IntWritable.class); sortJob.setMapOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(sortJob, in); sortJob.setReducerClass(IntSortReducer.class); sortJob.setNumReduceTasks(1); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(IntWritable.class); //countJob.setOutputFormatClass(SequenceFileOutputFormat.class); FileOutputFormat.setOutputPath(sortJob, out); sortJob.waitForCompletion(true); } }
结果: 1 2 1 6 1 15 1 22 1 26 1 32 1 32 1 54 1 92 1 650 1 654 1 756 1 5956 1 65223
修改reduce函数(不是用Iterable) public static class IntSortReducer extends Reducer<IntWritable, NullWritable, IntWritable,IntWritable>{ private IntWritable k = new IntWritable(); public void reduce(IntWritable key, NullWritable value, Context context) throws IOException, InterruptedException{ k.set(1); //for (NullWritable value : values) { context.write(k, key); //} } }
结果:(不是很理解,为啥去掉iterable后就只输出一个value key哪去了呢) 2 6 15 22 26 32 32 54 92 650 654 756 5956 65223
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MySort { public static class IntSortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{ private IntWritable val = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString().trim(); val.set(Integer.parseInt(line)); context.write(val, NullWritable.get()); } } public static class IntSortReducer extends Reducer<IntWritable, NullWritable, IntWritable,IntWritable>{ private static IntWritable num = new IntWritable(1); public void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{ for (NullWritable value : values) { context.write(num, key); num = new IntWritable(num.get() + 1); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String dir_in = "hdfs://localhost:9000/in_sort"; String dir_out = "hdfs://localhost:9000/out_sort"; Path in = new Path(dir_in); Path out = new Path(dir_out); Configuration conf = new Configuration(); Job sortJob = new Job(conf, "my_sort"); sortJob.setJarByClass(MySort.class); sortJob.setInputFormatClass(TextInputFormat.class); sortJob.setMapperClass(IntSortMapper.class); //sortJob.setCombinerClass(SortReducer.class); //countJob.setPartitionerClass(HashPartitioner.class); sortJob.setMapOutputKeyClass(IntWritable.class); sortJob.setMapOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(sortJob, in); sortJob.setReducerClass(IntSortReducer.class); sortJob.setNumReduceTasks(1); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(IntWritable.class); //countJob.setOutputFormatClass(SequenceFileOutputFormat.class); FileOutputFormat.setOutputPath(sortJob, out); sortJob.waitForCompletion(true); } }
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223