(1)以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类;
(2)系统默认的RecordReader是LineRecordReader,如TextInputFormat;而SequenceFileInputFormat的RecordReader是SequenceFileRecordReader; (3)LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value; (4)应用场景:自定义读取每一条记录的方式;自定义读入key的类型,如希望读取的key是文件的路径或名字而不是该行在文件中的偏移量。 自定义RecordReader: (1)继承抽象类RecordReader,实现RecordReader的一个实例; (2)实现自定义InputFormat类,重写InputFormat中createRecordReader()方法,返回值是自定义的RecordReader实例; (3)配置job.setInputFormatClass()设置自定义的InputFormat实例; 源码见org.apache.mapreduce.lib.input.TextInputFormat类; RecordReader例子: 应用场景: 数据: 1 2 3 4 5 6 7 …… 要求:分别计算奇数行与偶数行数据之和 奇数行综合:10+30+50+70=160 偶数行综合:20+40+60=120 新建项目TestRecordReader,包com.recordreader, 源代码MyMapper.java: package com.recordreader; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub context.write(key, value); } } 源代码MyPartitioner.java: package com.recordreader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartitioner extends Partitioner { @Override public int getPartition(LongWritable key, Text value, int numPartitions) { // TODO Auto-generated method stub if(key.get() % 2 == 0){ key.set(1); return 1; } else { key.set(0); return 0; } } } 源代码MyReducer.java: package com.recordreader; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer { @Override protected void reduce(LongWritable key, Iterable value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sum = 0; for(Text val: value){ sum += Integer.parseInt(val.toString()); } Text write_key = new Text(); IntWritable write_value = new IntWritable(); if(key.get() == 0) write_key.set(“odd:”); else write_key.set(“even:”); write_value.set(sum); context.write(write_key, write_value); } } 源代码MyRecordReader.java: package com.recordreader; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; public class MyRecordReader extends RecordReader { private long start; private long end; private long pos; private FSDataInputStream fin = null; private LongWritable key = null; private Text value = null; private LineReader reader = null; @Override public void close() throws IOException { // TODO Auto-generated method stub fin.close(); } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit fileSplit = (FileSplit)inputSplit; start = fileSplit.getStart(); end = start + fileSplit.getLength(); Configuration conf = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); fin = fs.open(path); fin.seek(start); reader = new LineReader(fin); pos = 1; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub if(key == null) key = new LongWritable(); key.set(pos); if(value == null) value = new Text(); if(reader.readLine(value) == 0) return false; pos++; return true; } } 源代码MyFileInputFormat.java: package com.recordreader; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class MyFileInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub return new MyRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { // TODO Auto-generated method stub return false; } } 源代码TestRecordReader.java: package com.recordreader; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class TestRecordReader { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println(“Usage: wordcount “); System.exit(2); } Job job = new Job(conf, “word count”); job.setJarByClass(TestRecordReader.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2); job.setInputFormatClass(MyFileInputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }