示例
数据:
1949-10-01 14:21:02 34c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1951-12-02 12:21:02 45c 1951-12-03 12:21:02 50c 1951-12-23 12:21:02 33c 1950-10-02 12:21:02 41c 1950-10-03 12:21:02 27c 1951-07-01 12:21:02 45c 1951-07-02 12:21:02 46c 1951-07-03 12:21:03 47c
要求:
将每年每月中的气温排名前三的数据找出来
实现:
1.每一年用一个reduce任务处理;
2.创建自定义数据类型,存储 [年-月-日-温度];
2.自己实现排序函数 根据 [年-月-温度] 降序排列,也可以在定义数据类型中进行排序;
3.自己实现分组函数,对 [年-月] 分组,reduce的key是分组结果,根据相同的分组值,统计reduce的value值,只统计三个值就可以,因为已经实现了自排序函数。
注意点:
1.自定义数据类型的使用;
2.自定义排序类的使用;
3.自定义分组类的使用,分组类对那些数据进行分组;
4.自定义分区类,分区类与reduce job个数的关系;
示例代码:
RunJob.java
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.FileSystem; 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.NullWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 import java.io.IOException; 14 import java.text.ParseException; 15 import java.text.SimpleDateFormat; 16 import java.util.Calendar; 17 import java.util.Date; 18 19 /** 20 * weather 统计天气信息 21 * 22 * 数据: 23 * 1999-10-01 14:21:02 34c 24 * 1999-11-02 13:01:02 30c 25 * 26 * 要求: 27 * 将每年的每月中气温排名前三的数据找出来 28 * 29 * 实现: 30 * 1.每一年用一个reduce任务处理; 31 * 2.创建自定义数据类型,存储 [年-月-日-温度]; 32 * 2.自己实现排序函数 根据 [年-月-温度] 降序排列,也可以在定义数据类型中进行排序; 33 * 3.自己实现分组函数,对 [年-月] 分组,reduce的key是分组结果,根据相同的分组值,统计reduce的value值,只统计三个值就可以,因为已经实现了自排序函数。 34 * 35 * Created by Edward on 2016/7/11. 36 */ 37 public class RunJob { 38 39 public static void main(String[] args) 40 { 41 //access hdfs's user 42 System.setProperty("HADOOP_USER_NAME","root"); 43 44 Configuration conf = new Configuration(); 45 conf.set("fs.defaultFS", "hdfs://node1:8020"); 46 47 48 try { 49 FileSystem fs = FileSystem.get(conf); 50 51 Job job = Job.getInstance(conf); 52 job.setJarByClass(RunJob.class); 53 job.setMapperClass(MyMapper.class); 54 job.setReducerClass(MyReducer.class); 55 56 //需要指定 map out 的 key 和 value 57 job.setOutputKeyClass(InfoWritable.class); 58 job.setOutputValueClass(Text.class); 59 60 //设置分区 继承 HashPartitioner 61 job.setPartitionerClass(YearPartition.class); 62 //根据年份创建指定数量的reduce task 63 job.setNumReduceTasks(3); 64 65 //设置排序 继承 WritableComparator 66 //job.setSortComparatorClass(SortComparator.class); 67 68 //设置分组 继承 WritableComparator 对reduce中的key进行分组 69 job.setGroupingComparatorClass(GroupComparator.class); 70 71 FileInputFormat.addInputPath(job, new Path("/test/weather/input")); 72 73 Path path = new Path("/test/weather/output"); 74 if(fs.exists(path))//如果目录存在,则删除目录 75 { 76 fs.delete(path,true); 77 } 78 FileOutputFormat.setOutputPath(job, path); 79 80 boolean b = job.waitForCompletion(true); 81 if(b) 82 { 83 System.out.println("OK"); 84 } 85 86 } catch (Exception e) { 87 e.printStackTrace(); 88 } 89 } 90 91 92 public static class MyMapper extends Mapper<LongWritable, Text, InfoWritable, Text > { 93 94 private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 95 96 @Override 97 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 98 String[] str = value.toString().split("\t"); 99 100 try { 101 Date date = sdf.parse(str[0]); 102 Calendar c = Calendar.getInstance(); 103 c.setTime(date); 104 int year = c.get(Calendar.YEAR); 105 int month = c.get(Calendar.MONTH)+1; 106 int day = c.get(Calendar.DAY_OF_MONTH); 107 108 double temperature = Double.parseDouble(str[1].substring(0,str[1].length()-1)); 109 110 InfoWritable info = new InfoWritable(); 111 info.setYear(year); 112 info.setMonth(month); 113 info.setDay(day); 114 info.setTemperature(temperature); 115 116 context.write(info, value); 117 118 } catch (ParseException e) { 119 e.printStackTrace(); 120 } 121 } 122 } 123 124 public static class MyReducer extends Reducer<InfoWritable, Text, Text, NullWritable> { 125 @Override 126 protected void reduce(InfoWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 127 int i=0; 128 for(Text t: values) 129 { 130 i++; 131 if(i>3) 132 break; 133 context.write(t, NullWritable.get()); 134 } 135 } 136 } 137 }
InfoWritable.java
1 import org.apache.hadoop.io.WritableComparable; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 /** 8 * 自定义数据类型 继承 WritableComparable 9 * 【年-月-日-温度】 10 * Created by Edward on 2016/7/11. 11 */ 12 public class InfoWritable implements WritableComparable<InfoWritable> { 13 14 private int year; 15 private int month; 16 private int day; 17 private double temperature; 18 19 public void setYear(int year) { 20 this.year = year; 21 } 22 23 public void setMonth(int month) { 24 this.month = month; 25 } 26 27 public void setDay(int day) { 28 this.day = day; 29 } 30 31 public void setTemperature(double temperature) { 32 this.temperature = temperature; 33 } 34 35 public int getYear() { 36 return year; 37 } 38 39 public int getMonth() { 40 return month; 41 } 42 43 public int getDay() { 44 return day; 45 } 46 47 public double getTemperature() { 48 return temperature; 49 } 50 51 /** 52 * 53 * 对象比较,对温度进行倒序排序 54 */ 55 @Override 56 public int compareTo(InfoWritable o) { 57 58 int result = Integer.compare(this.getYear(),o.getYear()); 59 if(result == 0) 60 { 61 result = Integer.compare(this.getMonth(),o.getMonth()); 62 if(result == 0) 63 { 64 return -Double.compare(this.getTemperature(), o.getTemperature()); 65 } 66 else 67 return result; 68 } 69 else 70 return result; 71 72 //return this==o?0:-1; 73 } 74 75 @Override 76 public void write(DataOutput dataOutput) throws IOException { 77 dataOutput.writeInt(this.year); 78 dataOutput.writeInt(this.month); 79 dataOutput.writeInt(this.day); 80 dataOutput.writeDouble(this.temperature); 81 } 82 83 @Override 84 public void readFields(DataInput dataInput) throws IOException { 85 this.year = dataInput.readInt(); 86 this.month = dataInput.readInt(); 87 this.day = dataInput.readInt(); 88 this.temperature = dataInput.readDouble(); 89 } 90 }
YearPartition.java
1 import org.apache.hadoop.io.Text; 2 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 3 4 /** 5 * 6 * 创建分区,通过key中的year来创建分区 7 * 8 * Created by Edward on 2016/7/11. 9 */ 10 public class YearPartition extends HashPartitioner <InfoWritable, Text>{ 11 @Override 12 public int getPartition(InfoWritable key, Text value, int numReduceTasks) { 13 return key.getYear()%numReduceTasks; 14 } 15 }
GroupComparator.java
1 import org.apache.hadoop.io.WritableComparable; 2 import org.apache.hadoop.io.WritableComparator; 3 4 /** 5 * 创建分组类,继承WritableComparator 6 * 【年-月】 7 * Created by Edward on 2016/7/11. 8 */ 9 public class GroupComparator extends WritableComparator { 10 11 GroupComparator() 12 { 13 super(InfoWritable.class, true); 14 } 15 16 @Override 17 public int compare(WritableComparable a, WritableComparable b) { 18 InfoWritable ia = (InfoWritable)a; 19 InfoWritable ib = (InfoWritable)b; 20 21 int result = Integer.compare(ia.getYear(),ib.getYear()); 22 if(result == 0) 23 { 24 return Integer.compare(ia.getMonth(),ib.getMonth()); 25 } 26 else 27 return result; 28 } 29 }
SortComparator.java
1 import org.apache.hadoop.io.WritableComparable; 2 import org.apache.hadoop.io.WritableComparator; 3 4 /** 5 * 排序类,继承WritableComparator 6 * 排序规则【年-月-温度】 温度降序 7 * Created by Edward on 2016/7/11. 8 */ 9 public class SortComparator extends WritableComparator { 10 11 /** 12 * 调用父类的构造函数 13 */ 14 SortComparator() 15 { 16 super(InfoWritable.class, true); 17 } 18 19 20 /** 21 * 比较两个对象的大小,使用降序排列 22 * @param a 23 * @param b 24 * @return 25 */ 26 @Override 27 public int compare(WritableComparable a, WritableComparable b) { 28 29 InfoWritable ia = (InfoWritable)a; 30 InfoWritable ib = (InfoWritable)b; 31 32 int result = Integer.compare(ia.getYear(),ib.getYear()); 33 if(result == 0) 34 { 35 result = Integer.compare(ia.getMonth(),ib.getMonth()); 36 if(result == 0) 37 { 38 return -Double.compare(ia.getTemperature(), ib.getTemperature()); 39 } 40 else 41 return result; 42 } 43 else 44 return result; 45 } 46 }