import java.io.File; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class GetMinMaxKeyMapReduce { public static class GetMinMaxKeyMap extends Mapper<Object, Text, Text,Text> { private Text min = new Text(); private Text max = new Text(); private Long i = new Long(0); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] strs = value.toString().split("\t"); if (strs!=null && strs.length>5 &&strs[3].length() > 20 && strs[3].indexOf(" ") == -1 && strs[3].indexOf("=") == -1) { if(i==0){ min= new Text(strs[3]); max= new Text(strs[3]); } if(strs[3].compareTo(min.toString())<0){ min=new Text(strs[3]); } if(strs[3].compareTo(max.toString())>0){ max=new Text(strs[3]); } i++; } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new Text("min"), min); context.write(new Text("max"), max); } } public static class GetMinMaxKeyReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String result =""; for (Text value : values) { if(result.equals("")){ result = value.toString(); } if (("min").equals(key.toString())) { if(value.toString().compareTo(result)<0){ result=value.toString(); } } else if (("max").equals(key.toString())) { if(value.toString().compareTo(result)>0){ result=value.toString(); } } else { System.err.println("未知reduce 输入key:" + key.toString()); } } context.write(key, new Text(result)); } } public static void main(String[] args) throws Exception { File jarFile = EJob.createTempJar("bin"); ClassLoader classLoader = EJob.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); //Hadoop 运行环境 Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "bfdbjc1:12001");; //任务参数设置 Job job = new Job(conf, "GetMinMaxKey"); job.setJarByClass(GetMinMaxKeyMapReduce.class); job.setMapperClass(GetMinMaxKeyMap.class); job.setReducerClass(GetMinMaxKeyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/tables2/raw_kafka/l_date=2013-09-15")); FileOutputFormat.setOutputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/output/minmaxkey/")); //Eclipse 本地提交 ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); //等待任务运行完成 System.exit(job.waitForCompletion(true) ? 0 : 1); } }