MapReduce 重要组件——Recordreader组件 [转]

(1)以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类;

(2)系统默认的RecordReader是LineRecordReader,如TextInputFormat;而SequenceFileInputFormat的RecordReader是SequenceFileRecordReader; (3)LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value; (4)应用场景:自定义读取每一条记录的方式;自定义读入key的类型,如希望读取的key是文件的路径或名字而不是该行在文件中的偏移量。   自定义RecordReader: (1)继承抽象类RecordReader,实现RecordReader的一个实例; (2)实现自定义InputFormat类,重写InputFormat中createRecordReader()方法,返回值是自定义的RecordReader实例; (3)配置job.setInputFormatClass()设置自定义的InputFormat实例;   源码见org.apache.mapreduce.lib.input.TextInputFormat类;   RecordReader例子: 应用场景: 数据: 1 2 3 4 5 6 7 …… 要求:分别计算奇数行与偶数行数据之和 奇数行综合:10+30+50+70=160 偶数行综合:20+40+60=120   新建项目TestRecordReader,包com.recordreader, 源代码MyMapper.java: package com.recordreader;   import java.io.IOException;   import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;   public class MyMapper extends Mapper {   @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub context.write(key, value); }   }     源代码MyPartitioner.java: package com.recordreader;   import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;   public class MyPartitioner extends Partitioner {   @Override public int getPartition(LongWritable key, Text value, int numPartitions) { // TODO Auto-generated method stub if(key.get() % 2 == 0){ key.set(1); return 1; } else { key.set(0); return 0; } }   }   源代码MyReducer.java: package com.recordreader;   import java.io.IOException;   import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;   public class MyReducer extends Reducer {   @Override protected void reduce(LongWritable key, Iterable value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sum = 0; for(Text val: value){ sum += Integer.parseInt(val.toString()); } Text write_key = new Text(); IntWritable write_value = new IntWritable(); if(key.get() == 0) write_key.set(“odd:”); else  write_key.set(“even:”); write_value.set(sum); context.write(write_key, write_value); }   }   源代码MyRecordReader.java: package com.recordreader;   import java.io.IOException;   import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader;   public class MyRecordReader extends RecordReader { private long start; private long end; private long pos; private FSDataInputStream fin = null; private LongWritable key = null; private Text value = null; private LineReader reader = null; @Override public void close() throws IOException { // TODO Auto-generated method stub fin.close(); }   @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; }   @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; }   @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; }   @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit fileSplit = (FileSplit)inputSplit; start = fileSplit.getStart(); end = start + fileSplit.getLength(); Configuration conf = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); fin = fs.open(path); fin.seek(start); reader = new LineReader(fin); pos = 1; }   @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub if(key == null) key = new LongWritable(); key.set(pos); if(value == null) value = new Text(); if(reader.readLine(value) == 0) return false; pos++; return true; }   }   源代码MyFileInputFormat.java: package com.recordreader;   import java.io.IOException;   import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   public class MyFileInputFormat extends FileInputFormat {   @Override public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub return new MyRecordReader(); }   @Override protected boolean isSplitable(JobContext context, Path filename) { // TODO Auto-generated method stub return false; }   }   源代码TestRecordReader.java: package com.recordreader;   import java.io.IOException;   import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;       public class TestRecordReader { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{   Configuration conf = new Configuration();    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();    if (otherArgs.length != 2) {      System.err.println(“Usage: wordcount “);      System.exit(2);    }    Job job = new Job(conf, “word count”);    job.setJarByClass(TestRecordReader.class);    job.setMapperClass(MyMapper.class);        job.setReducerClass(MyReducer.class);    job.setPartitionerClass(MyPartitioner.class);    job.setNumReduceTasks(2);    job.setInputFormatClass(MyFileInputFormat.class);                FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    System.exit(job.waitForCompletion(true) ? 0 : 1);   } }

    原文作者:MapReduce
    原文地址: https://www.cnblogs.com/whtydn/p/4463529.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞