程序执行流程如下:
map()–>getPartition()分区—>write()(序列化,每一行都顺序执行这三个方法)—>readFields()—->compareTo()排序—->readFields()—>分组compare—>reduce()
对于每一行的内容,依次执行map()–>getPartition()—->write()(序列化,会在指定的输出目录下生成temporary目录),全部序列化完成之后执行readFields(),之后进行排序,全部排序完毕后才会再次执行readFields(),
之后进行分组,分出一组,执行一次reduce()写出数据
1.OrderBean(分了区之后就不需要对OrderId进行排序了,因为同一分区OrderId相同)
1 package mapreduce.order; 2
3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import org.apache.hadoop.io.WritableComparable; 7
8 public class OrderBean implements WritableComparable<OrderBean>{ 9 private String orderId; 10 private Double price; 11
12 public String getOrderId() { 13 return orderId; 14 } 15
16 public void setOrderId(String orderId) { 17 this.orderId = orderId; 18 } 19
20 public Double getPrice() { 21 return price; 22 } 23
24 public void setPrice(Double price) { 25 this.price = price; 26 } 27
28 public OrderBean() { 29 super(); 30 } 31
32 /** 33 * 序列化 34 * @param out 35 * @throws IOException 36 */
37 @Override 38 public void write(DataOutput out) throws IOException { 39 out.writeUTF(orderId); 40 out.writeDouble(price); 41 } 42
43 /** 44 * 反序列化 45 * @param in 46 * @throws IOException 47 */
48 @Override 49 public void readFields(DataInput in) throws IOException { 50 orderId = in.readUTF(); 51 price = in.readDouble(); 52 } 53
54
55 @Override 56 public String toString() { 57 return orderId + "\t" + price; 58 } 59
60 /** 61 * 二次排序(如果进行了分区,可以直接对price排序,不再需要二次排序) 62 * @param o 63 * @return 64 */
65 @Override 66 public int compareTo(OrderBean o) { 67 //先比较orderId(字符串) 68 // int result = this.getOrderId().compareTo(o.getOrderId()); 69
70 //相同再比较price 71 // if(result == 0) { 72 //降序,数字的compareTo() -1表示小于
73 int result = this.getPrice() > o.getPrice()?-1:1; 74 // }
75 return result; 76 } 77 }
2.OrderMapper
1 package mapreduce.order; 2
3 import java.io.IOException; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.NullWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Mapper; 8
9 public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { 10 OrderBean order = new OrderBean(); 11
12 @Override 13 protected void map(LongWritable key, Text value, 14 Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context) 15 throws IOException, InterruptedException { 16
17 // 1.读取一行
18 String line = value.toString(); 19
20 // 2.切割并封装对象
21 String[] split = line.split("\t"); 22 order.setOrderId(split[0]); 23 order.setPrice(Double.parseDouble(split[2])); 24
25 // 3.输出
26 context.write(order, NullWritable.get()); 27 } 28 }
3.OrderPartioner
1 package mapreduce.order; 2
3 import org.apache.hadoop.io.NullWritable; 4 import org.apache.hadoop.mapreduce.Partitioner; 5
6 /** 7 * 分区 8 * @author tele 9 * 10 */
11 public class OrderPartioner extends Partitioner<OrderBean, NullWritable> { 12 /** 13 * 默认是根据orderBean的hashcode进行分区 14 * 现在更改为orderBean对象的orderId进行分区,这样相同orderId的即可进入同一分区 15 */
16 @Override 17 public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) { 18 return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks; 19 } 20 }
4.OrderGroupComparator
1 package mapreduce.order; 2
3 import org.apache.hadoop.io.WritableComparable; 4 import org.apache.hadoop.io.WritableComparator; 5
6 /** 7 * reduce分组的方法 8 * 9 * @author tele 10 * 11 */
12 public class OrderGroupComparator extends WritableComparator { 13 /** 14 * 确保类型转换成功,能够创建实例,如果是false,创建的实例为null 15 */
16 protected OrderGroupComparator() { 17 super(OrderBean.class, true); 18 } 19
20 /** 21 * reduce默认是根据key进行分组,即此处的orderBean,重写之后 22 * 根据orderBean的orderId进行分组,相同的orderId的orderBean将会分到同一组 23 */
24 @Override 25 public int compare(WritableComparable a, WritableComparable b) { 26 OrderBean bean1 = (OrderBean) a; 27 OrderBean bean2 = (OrderBean) b; 28 return bean1.getOrderId().compareTo(bean2.getOrderId()); 29 } 30 }
5.OrderReducer(reduce()每次读入一组)
1 package mapreduce.order; 2
3 import java.io.IOException; 4 import org.apache.hadoop.io.NullWritable; 5 import org.apache.hadoop.mapreduce.Reducer; 6
7 public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { 8 @Override 9 protected void reduce(OrderBean key, Iterable<NullWritable> value, 10 Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context) 11 throws IOException, InterruptedException { 12 context.write(key, NullWritable.get()); 13 } 14 }
6.OrderDriver(必须设置分组类)
1 package mapreduce.order; 2
3 import java.io.IOException; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.NullWritable; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10
11 /** 12 * 13 * @author tele 14 * 15 */
16 public class OrderDriver { 17 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 18 // 1.获得job实例
19 Configuration conf = new Configuration(); 20 Job job = Job.getInstance(conf); 21
22 // 2.设置driverclass
23 job.setJarByClass(OrderDriver.class); 24 // mapclass
25 job.setMapperClass(OrderMapper.class); 26 // reduceclass
27 job.setReducerClass(OrderReducer.class); 28
29 // 3.map输入输出数据类型
30 job.setMapOutputKeyClass(OrderBean.class); 31 job.setMapOutputValueClass(NullWritable.class); 32
33 // 4.最终输出数据类型
34 job.setOutputKeyClass(OrderBean.class); 35 job.setOutputValueClass(NullWritable.class); 36
37 // 7.设置分区类
38 job.setPartitionerClass(OrderPartioner.class); 39 job.setNumReduceTasks(3); 40
41 // 8.设置分组类
42 job.setGroupingComparatorClass(OrderGroupComparator.class); 43
44 // 5.输入与输出路径
45 FileInputFormat.setInputPaths(job, new Path(args[0])); 46 FileOutputFormat.setOutputPath(job, new Path(args[1])); 47
48 // 6.提交与退出
49 boolean result = job.waitForCompletion(true); 50 System.exit(result ? 0 : 1); 51
52 } 53 }