Mapreduce 订单分组案例

程序执行流程如下:

map()–>getPartition()分区—>write()(序列化,每一行都顺序执行这三个方法)—>readFields()—->compareTo()排序—->readFields()—>分组compare—>reduce()

对于每一行的内容,依次执行map()–>getPartition()—->write()(序列化,会在指定的输出目录下生成temporary目录),全部序列化完成之后执行readFields(),之后进行排序,全部排序完毕后才会再次执行readFields(),

之后进行分组,分出一组,执行一次reduce()写出数据

1.OrderBean(分了区之后就不需要对OrderId进行排序了,因为同一分区OrderId相同)

 1 package mapreduce.order;  2 
 3 import java.io.DataInput;  4 import java.io.DataOutput;  5 import java.io.IOException;  6 import org.apache.hadoop.io.WritableComparable;  7 
 8 public class OrderBean implements WritableComparable<OrderBean>{  9     private String orderId; 10     private Double price; 11     
12     public String getOrderId() { 13         return orderId; 14  } 15 
16     public void setOrderId(String orderId) { 17         this.orderId = orderId; 18  } 19 
20     public Double getPrice() { 21         return price; 22  } 23 
24     public void setPrice(Double price) { 25         this.price = price; 26  } 27 
28     public OrderBean() { 29  super(); 30  } 31 
32     /** 33  * 序列化 34  * @param out 35  * @throws IOException 36      */
37  @Override 38     public void write(DataOutput out) throws IOException { 39         out.writeUTF(orderId); 40         out.writeDouble(price); 41  } 42 
43     /** 44  * 反序列化 45  * @param in 46  * @throws IOException 47      */
48  @Override 49     public void readFields(DataInput in) throws IOException { 50         orderId = in.readUTF(); 51         price   = in.readDouble(); 52  } 53 
54     
55  @Override 56     public String toString() { 57         return orderId + "\t" + price; 58  } 59 
60     /** 61  * 二次排序(如果进行了分区,可以直接对price排序,不再需要二次排序) 62  * @param o 63  * @return 64      */
65  @Override 66     public int compareTo(OrderBean o) { 67         //先比较orderId(字符串) 68     // int result = this.getOrderId().compareTo(o.getOrderId()); 69         
70         //相同再比较price 71     // if(result == 0) { 72             //降序,数字的compareTo() -1表示小于
73         int    result = this.getPrice() > o.getPrice()?-1:1; 74     // }
75         return result; 76  } 77 }

2.OrderMapper

 1 package mapreduce.order;  2 
 3 import java.io.IOException;  4 import org.apache.hadoop.io.LongWritable;  5 import org.apache.hadoop.io.NullWritable;  6 import org.apache.hadoop.io.Text;  7 import org.apache.hadoop.mapreduce.Mapper;  8 
 9 public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { 10     OrderBean order = new OrderBean(); 11 
12  @Override 13     protected void map(LongWritable key, Text value, 14             Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context) 15  throws IOException, InterruptedException { 16 
17         // 1.读取一行
18         String line = value.toString(); 19 
20         // 2.切割并封装对象
21         String[] split = line.split("\t"); 22         order.setOrderId(split[0]); 23         order.setPrice(Double.parseDouble(split[2])); 24 
25         // 3.输出
26         context.write(order, NullWritable.get()); 27  } 28 }

3.OrderPartioner

 1 package mapreduce.order;  2 
 3 import org.apache.hadoop.io.NullWritable;  4 import org.apache.hadoop.mapreduce.Partitioner;  5 
 6 /**  7  * 分区  8  * @author tele  9  * 10  */
11 public class OrderPartioner extends Partitioner<OrderBean, NullWritable> { 12     /** 13  * 默认是根据orderBean的hashcode进行分区 14  * 现在更改为orderBean对象的orderId进行分区,这样相同orderId的即可进入同一分区 15      */
16  @Override 17     public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) { 18         return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks; 19  } 20 }

4.OrderGroupComparator

 1 package mapreduce.order;  2 
 3 import org.apache.hadoop.io.WritableComparable;  4 import org.apache.hadoop.io.WritableComparator;  5 
 6 /**  7  * reduce分组的方法  8  *  9  * @author tele 10  * 11  */
12 public class OrderGroupComparator extends WritableComparator { 13     /** 14  * 确保类型转换成功,能够创建实例,如果是false,创建的实例为null 15      */
16     protected OrderGroupComparator() { 17         super(OrderBean.class, true); 18  } 19 
20     /** 21  * reduce默认是根据key进行分组,即此处的orderBean,重写之后 22  * 根据orderBean的orderId进行分组,相同的orderId的orderBean将会分到同一组 23      */
24  @Override 25     public int compare(WritableComparable a, WritableComparable b) { 26         OrderBean bean1 = (OrderBean) a; 27         OrderBean bean2 = (OrderBean) b; 28         return bean1.getOrderId().compareTo(bean2.getOrderId()); 29  } 30 }

5.OrderReducer(reduce()每次读入一组)

 1 package mapreduce.order;  2 
 3 import java.io.IOException;  4 import org.apache.hadoop.io.NullWritable;  5 import org.apache.hadoop.mapreduce.Reducer;  6 
 7 public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {  8  @Override  9     protected void reduce(OrderBean key, Iterable<NullWritable> value, 10             Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context) 11  throws IOException, InterruptedException { 12         context.write(key, NullWritable.get()); 13  } 14 }

6.OrderDriver(必须设置分组类)

 1 package mapreduce.order;  2 
 3 import java.io.IOException;  4 import org.apache.hadoop.conf.Configuration;  5 import org.apache.hadoop.fs.Path;  6 import org.apache.hadoop.io.NullWritable;  7 import org.apache.hadoop.mapreduce.Job;  8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10 
11 /** 12  * 13  * @author tele 14  * 15  */
16 public class OrderDriver { 17     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 18         // 1.获得job实例
19         Configuration conf = new Configuration(); 20         Job job = Job.getInstance(conf); 21 
22         // 2.设置driverclass
23         job.setJarByClass(OrderDriver.class); 24         // mapclass
25         job.setMapperClass(OrderMapper.class); 26         // reduceclass
27         job.setReducerClass(OrderReducer.class); 28 
29         // 3.map输入输出数据类型
30         job.setMapOutputKeyClass(OrderBean.class); 31         job.setMapOutputValueClass(NullWritable.class); 32 
33         // 4.最终输出数据类型
34         job.setOutputKeyClass(OrderBean.class); 35         job.setOutputValueClass(NullWritable.class); 36 
37         // 7.设置分区类
38         job.setPartitionerClass(OrderPartioner.class); 39         job.setNumReduceTasks(3); 40 
41         // 8.设置分组类
42         job.setGroupingComparatorClass(OrderGroupComparator.class); 43 
44         // 5.输入与输出路径
45         FileInputFormat.setInputPaths(job, new Path(args[0])); 46         FileOutputFormat.setOutputPath(job, new Path(args[1])); 47 
48         // 6.提交与退出
49         boolean result = job.waitForCompletion(true); 50         System.exit(result ? 0 : 1); 51 
52  } 53 }

 

    原文作者:MapReduce
    原文地址: https://www.cnblogs.com/tele-share/p/9648772.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞