MapReduce的自制Writable分组输出及组内排序

问题描述:

输入文件格式如下:

name1    2

name3    4

name1    6

name1    1

name3    3

name1    0

要求输出的文件格式如下:

name1    0,1,2,6

name3    3,4

要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序

思路:

常规的输出,无法排序key所对应的多个值的顺序。为了排序组内中的值,需要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,可以利用这两个方法来实现组内排序。但是这些排序都是基于key的,则就要将key和value定义成组合键。

但是必须要保证第一列相同的全部都放在同一个分区中,则就需要自定义分区,分区的时候只考虑第一列的值。由于partitioner仅仅能保证每一个reducer接受同一个name的所有记录,但是reducer仍然是通过键进行分组的分区,也就说该分区中还是按照键来分成不同的组,还需要分组只参考name值

先按照name分组,再在name中内部进行排序。

解决方法:

运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。

 由于要按照name分组,则就需要定义分组策略,然后设置setGroupingComparatorClass。

setGroupingComparatorClass主要定义哪些key可以放置在一组,分组的时候会对组合键进行比较,由于这里只需要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。

对于组内的排序,可以利用setSortComparatorClass来实现,

这个方法主要用于定义key如何进行排序在它们传递给reducer之前,

这里就可以来进行组内排序。

具体代码:

     Hadoop版本号:hadoop1.1.2

自定义组合键

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package  whut; import  java.io.DataInput; import  java.io.DataOutput; import  java.io.IOException; import  org.apache.hadoop.io.IntWritable; import  org.apache.hadoop.io.Text; import  org.apache.hadoop.io.WritableComparable; //自定义组合键策略 //java基本类型数据 public  class  TextInt  implements  WritableComparable{      //直接利用java的基本数据类型      private  String firstKey;      private  int  secondKey;      //必须要有一个默认的构造函数      public  String getFirstKey() {          return  firstKey;      }      public  void  setFirstKey(String firstKey) {          this .firstKey = firstKey;      }      public  int  getSecondKey() {          return  secondKey;      }      public  void  setSecondKey( int  secondKey) {          this .secondKey = secondKey;      }                                                                                                                                                                                 @Override      public  void  write(DataOutput out)  throws  IOException {          // TODO Auto-generated method stub          out.writeUTF(firstKey);          out.writeInt(secondKey);      }      @Override      public  void  readFields(DataInput in)  throws  IOException {          // TODO Auto-generated method stub          firstKey=in.readUTF();          secondKey=in.readInt();      }      //map的键的比较就是根据这个方法来进行的      @Override      public  int  compareTo(Object o) {          // TODO Auto-generated method stub          TextInt ti=(TextInt)o;          //利用这个来控制升序或降序          //this本对象写在前面代表是升序          //this本对象写在后面代表是降序          return  this .getFirstKey().compareTo(ti.getFirstKey());      } }

分组策略

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package  whut; import  org.apache.hadoop.io.WritableComparable; import  org.apache.hadoop.io.WritableComparator; //主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组 public  class  TextComparator  extends  WritableComparator {      //必须要调用父类的构造器      protected  TextComparator() {          super (TextInt. class , true ); //注册comparator      }      @Override      public  int  compare(WritableComparable a, WritableComparable b) {          // TODO Auto-generated method stub          TextInt ti1=(TextInt)a;          TextInt ti2=(TextInt)b;          return  ti1.getFirstKey().compareTo(ti2.getFirstKey());      } }

组内排序策略

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package  whut; import  org.apache.hadoop.io.WritableComparable; import  org.apache.hadoop.io.WritableComparator; //分组内部进行排序,按照第二个字段进行排序 public  class  TextIntComparator  extends  WritableComparator {      public  TextIntComparator()      {          super (TextInt. class , true );      }      //这里可以进行排序的方式管理      //必须保证是同一个分组的      //a与b进行比较      //如果a在前b在后,则会产生升序      //如果a在后b在前,则会产生降序      @Override      public  int  compare(WritableComparable a, WritableComparable b) {          // TODO Auto-generated method stub          TextInt ti1=(TextInt)a;          TextInt ti2=(TextInt)b;          //首先要保证是同一个组内,同一个组的标识就是第一个字段相同          if (!ti1.getFirstKey().equals(ti2.getFirstKey()))             return  ti1.getFirstKey().compareTo(ti2.getFirstKey());          else             return  ti2.getSecondKey()-ti1.getSecondKey(); //0,-1,1      }                                                                                                                                                            }

分区策略

1 2 3 4 5 6 7 8 9 10 11 package  whut; import  org.apache.hadoop.io.IntWritable; import  org.apache.hadoop.mapreduce.Partitioner; //参数为map的输出类型 public  class  KeyPartitioner  extends  Partitioner<TextInt, IntWritable> {      @Override      public  int  getPartition(TextInt key, IntWritable value,  int  numPartitions) {          // TODO Auto-generated method stub          return  (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;      } }

MapReduce策略

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 package  whut; import  java.io.IOException; import  org.apache.hadoop.conf.Configuration; import  org.apache.hadoop.conf.Configured; import  org.apache.hadoop.fs.Path; import  org.apache.hadoop.io.IntWritable; import  org.apache.hadoop.io.Text; import  org.apache.hadoop.mapreduce.Job; import  org.apache.hadoop.mapreduce.Mapper; import  org.apache.hadoop.mapreduce.Reducer; import  org.apache.hadoop.mapreduce.Mapper.Context; import  org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import  org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import  org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import  org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import  org.apache.hadoop.util.Tool; import  org.apache.hadoop.util.ToolRunner; //需要对数据进行分组以及组内排序的时候 public  class  SortMain  extends  Configured  implements  Tool{      //这里设置输入文格式为KeyValueTextInputFormat      //name1 5      //默认输入格式都是Text,Text      public  static  class  GroupMapper  extends         Mapper<Text, Text, TextInt, IntWritable>  {          public  IntWritable second= new  IntWritable();          public  TextInt tx= new  TextInt();          @Override          protected  void  map(Text key, Text value, Context context)                  throws  IOException, InterruptedException {              String lineKey=key.toString();              String lineValue=value.toString();              int  lineInt=Integer.parseInt(lineValue);              tx.setFirstKey(lineKey);              tx.setSecondKey(lineInt);              second.set(lineInt);              context.write(tx, second);          }      }      //设置reduce      public  static  class  GroupReduce  extends  Reducer<TextInt, IntWritable, Text, Text>      {          @Override          protected  void  reduce(TextInt key, Iterable<IntWritable> values,                 Context context)                  throws  IOException, InterruptedException {              StringBuffer sb= new  StringBuffer();              for (IntWritable val:values)              {                  sb.append(val+ "," );              }              if (sb.length()> 0 )              {                  sb.deleteCharAt(sb.length()- 1 );              }              context.write( new  Text(key.getFirstKey()),  new  Text(sb.toString()));          }      }                                                                                                                                               @Override      public  int  run(String[] args)  throws  Exception {          // TODO Auto-generated method stub          Configuration conf=getConf();          Job job= new  Job(conf, "SecondarySort" );          job.setJarByClass(SortMain. class );          // 设置输入文件的路径,已经上传在HDFS          FileInputFormat.addInputPath(job,  new  Path(args[ 0 ]));          // 设置输出文件的路径,输出文件也存在HDFS中,但是输出目录不能已经存在          FileOutputFormat.setOutputPath(job,  new  Path(args[ 1 ]));                                                                                                                                                       job.setMapperClass(GroupMapper. class );          job.setReducerClass(GroupReduce. class );          //设置分区方法          job.setPartitionerClass(KeyPartitioner. class );                                                                                                                                                       //下面这两个都是针对map端的          //设置分组的策略,哪些key可以放置到一组中          job.setGroupingComparatorClass(TextComparator. class );          //设置key如何进行排序在传递给reducer之前.          //这里就可以设置对组内如何排序的方法          /*************关键点**********/          job.setSortComparatorClass(TextIntComparator. class );          //设置输入文件格式          job.setInputFormatClass(KeyValueTextInputFormat. class );          //使用默认的输出格式即TextInputFormat          //设置map的输出key和value类型          job.setMapOutputKeyClass(TextInt. class );          job.setMapOutputValueClass(IntWritable. class );          //设置reduce的输出key和value类型          //job.setOutputFormatClass(TextOutputFormat.class);          job.setOutputKeyClass(Text. class );          job.setOutputValueClass(Text. class );          job.waitForCompletion( true );          int  exitCode=job.isSuccessful()? 0 : 1 ;          return  exitCode;      }                                                                                                                                               public  static  void  main(String[] args)   throws  Exception      {         int  exitCode=ToolRunner.run( new  SortMain(), args);         System.exit(exitCode);      } }

注意事项

   1,设置分组排序按照升序还是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明

   2,设置组内值进行升序还是降序的排序是在组内排序策略中的compare()方法注释说明的。

   3,这里同时最重要的一点是,将第二列即放在组合键中,又作为value,这样对于组合键排序也就相当于对于value进行排序了。

   4,在自定义组合键的时候,对于组合键中的数据的基本类型可以采用Java的基本类型也可以采用Hadoop的基本数据类型,对于Hadoop的基本数据类型一定要记得初始化new一个基本数据类型对象。对于组合键类,必须要有默认的构造方法。

本文转自 zhao_xiao_long 51CTO博客,原文链接:http://blog.51cto.com/computerdragon/1287721

    原文作者:MapReduce
    原文地址: https://yq.aliyun.com/articles/475905
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞