多种方式实现:
1. 实现MulitpleOutputFormat类(旧API),如MultipleTextOutputFormat 和 MultpleSequenceFileOutputFormat 是它的两个具体实现。
通过自己实现MulitpleOutputFormat类,重载 generateFileNameForKeyValue 方法,达到目的。
2. 由于MulitpleOutputFormat类(旧API)被标记为过时,且新的API中不包含这个类,所以自己参考MulitpleOutputFormat实现一个新的MulitpleOutputFormat类。
3. 使用 MultipleOutputs(新旧API)
4. 我把它称为 Writable 方法(引用)
注:旧API包含MulitpleOutputFormat 和 MultipleOutputs;新API只包含MultipleOutputs,但新MultipleOutputs类包含了旧API两个类的功能;
新MultipleOutputs类也不具备可灵活设置输出文件名称的特性,所以实现一个新的MulitpleOutputFormat类;
具体实现示例:
1. 实现MulitpleOutputFormat类,如MultipleTextOutputFormat 和 MultpleSequenceFileOutputFormat 是它的两个具体实现。
通过自己实现MulitpleOutputFormat类,重载 generateFileNameForKeyValue 方法,达到目的。
package com.zxw.hadoop.driver; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.MultipleOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Progressable; public class WordCount_Old { public static class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable count = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, count); } } } public static class IntSumReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } result.set(sum); output.collect(key, result); } } public static class WordCountOutputFormat extends MultipleOutputFormat<Text, IntWritable> { private TextOutputFormat<Text, IntWritable> output = null; @Override protected RecordWriter<Text, IntWritable> getBaseRecordWriter( FileSystem fs, JobConf job, String name, Progressable progressable) throws IOException { if (output == null) { output = new TextOutputFormat<Text, IntWritable>(); } return output.getRecordWriter(fs, job, name, progressable); } @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, String name) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "result.txt"; } } public static void main(String[] args) throws Exception { JobConf job = new JobConf(WordCount_Old.class); job.setJobName("WordCount_Old"); String[] otherArgs = new GenericOptionsParser(job, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } job.setJarByClass(WordCount_Old.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormat(WordCountOutputFormat.class);// 设置输出格式 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); RunningJob runningJob = JobClient.runJob(job); runningJob.waitForCompletion(); } }
2. 由于MulitpleOutputFormat类被标记为过时,且新的API中不包含这个类,所以自己参考MulitpleOutputFormat实现一个新的MulitpleOutputFormat类。
代码说明:定义了LineRecordWriter,MultipleOutputFormat两个类,采用wordcount示例做测试;
package com.zxw.hadoop.mapreduce.lib.output; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * 本来属于TextOutputFormat内部类,现在抽取出来,供其他类使用 * @author connor * * @param <K> * @param <V> */ public class LineRecordWriter<K, V>extends RecordWriter<K, V>{ private static final String utf8 = "UTF-8"; private static final byte[] newline; protected DataOutputStream out; private final byte[] keyValueSeparator; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
package com.zxw.hadoop.mapreduce.lib.output; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public abstract class MultipleOutputFormat<K, V> extends FileOutputFormat<K, V>{ private MultiRecordWriter writer = null; @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } /**通过 key, value, conf 来确定输出文件名(含扩展名)*/ protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); public class MultiRecordWriter extends RecordWriter<K, V> { /** RecordWriter 的缓存 */ private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; /** 输出目录 */ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void write(K key, V value) throws IOException, InterruptedException { // 得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass( job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(new DataOutputStream( codec.createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create( file, false); recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } return recordWriter; } } }
package com.zxw.hadoop.driver; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.zxw.hadoop.mapreduce.lib.output.MultipleOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static class AlphabetOutputFormat extends MultipleOutputFormat<Text, IntWritable> { @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "other.txt"; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(AlphabetOutputFormat.class);// 设置输出格式 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3. 使用 MultipleOutputs
MultipleOutputs 是在 job 指定的 output 输出的基础上,新增加一些额外的输出,与 MulitpleOutputFormat 相比,它才是真正意义上的多文件输出。
package com.zxw.hadoop.driver; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class TestwithMultipleOutputs extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, IntWritable> { private MultipleOutputs<Text, IntWritable> mos; protected void setup(Context context) throws IOException,InterruptedException { this.mos = new MultipleOutputs<Text,IntWritable>(context); } public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] tokens = line.split("-"); //1 => MOSInt-m-00000 mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1]))); //2 => MOSText-m-00000 mos.write("MOSText", new Text(tokens[0]),new Text(tokens[2])); //3 => tokens[0]/-m-00000 mos.write("MOSText", new Text(tokens[0]),new Text(line),tokens[0]+"/"); } protected void cleanup(Context context) throws IOException,InterruptedException { mos.close(); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf,"word count with MultipleOutputs"); job.setJarByClass(TestwithMultipleOutputs.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(MapClass.class); job.setNumReduceTasks(0); MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class); MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args); System.exit(res); } }
如果不需要输出 output,只需要 namedOutput,可以在 Job 定义时设置 OutputFormat 格式为 NullOutputFormat,并去掉 reduce 的output.collect 方法,只保留 namenode 的 outputcollector 即可。
前面介绍了 MultipleOutputs 在 reducer 中的应用,其实它也可以应用在 mapper 过程中,具体方法与之类似,但是在 mapper 的输出中,只有 output 输出被发送到 reducer 阶段,作为 reducer 的输入,namedOutput 不会参与。
4. 我把它称为 Writable 方法
这个方法我在《hadoop 开发者第二期》中“Nutch 中 mapreduce 应用的几个特殊点”中提到过,即 Nutch 的 FetcherOutputFormat 展示给我们的方法。
FetcherOutputFormat 的要求是在 reducer 之后分开存储 3 种不同的数据结构:Content, CrawlDatum 即 ParseImpl,但传统的 MapReduce 的job 输出不能实现这个要求,所以就借用了 NutchWritable 这个统一的对象,以此来囊括前面 3 个对象:
public class NutchWritable extends GenericWritableConfigurable { private static Class<? extends Writable>[] CLASSES = null; static { CLASSES = (Class<? extends Writable>[]) new Class[] { org.apache.hadoop.io.NullWritable.class, …… org.apache.nutch.crawl.CrawlDatum.class, …… org.apache.nutch.parse.ParseImpl.class, …… }; } public NutchWritable() { } public NutchWritable(Writable instance) { set(instance); } }
在真正的 RecordWriter 写数据的时候,在将上述封装的NutchWritable 还原成封装之前的 Content, CrawlDatum, ParseImpl 对象,并根据对象的不同写到不同的文件中去,从而实现多文件的写入。
public void write(Text key, NutchWritable value) throws IOException { Writable w = value.get(); if (w instanceof CrawlDatum) fetchOut.append(key, w); else if (w instanceof Content) contentOut.append(key, w); else if (w instanceof Parse) parseOut.write(key, (Parse)w); }
由于本文前面举的例子中的输入数据结构单一,Writable 方法不太适合,所以就没有实验结果。
最后我们借鉴《Hadoop, the Definite Guide》中的一个表格区分 3种方法的不同
MultipleOutputFormat | MultipleOuputs | Writable | |
可灵活设置输出文件名称 | yes | no | yes |
输出类型可不一致 | no | yes | yes |
可以用在 mapper 或reducer 过程中 | no | yes | no |
OutputFormat 有各种格式 | No,除了TextOutputFormat 及 SequenceOutputFormat, 其他的需要自定义 | yes | yes |
每个 record 都有多个输出 | No, 实际上是对record 的分割 | yes | no |