mapreduce多文件输出的两方法 package duogemap; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.MultipleOutputs; import org.apache.hadoop.util.GenericOptionsParser; public class OldMulOutput { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, Text>{ private MultipleOutputs mos; private OutputCollector<NullWritable, Text> collector; public void Configured(JobConf conf){ mos=new MultipleOutputs(conf); } public void map(LongWritable key, Text value, OutputCollector<NullWritable, Text> output,Reporter reporter) throws IOException{ String[] arr=value.toString().split(“,”, -1); String chrono=arr[1]+”,”+arr[2]; String geo=arr[4]+”,”+arr[5]; collector=mos.getCollector(“chrono”, reporter); collector.collect(NullWritable.get(),new Text(chrono)); collector=mos.getCollector(“geo”, reporter); collector.collect(NullWritable.get(),new Text(geo)); } public void close() throws IOException{ mos.close(); } public static void main(String[] args) throws IOException { Configuration conf=new Configuration(); String[] remainingArgs=new GenericOptionsParser(conf, args).getRemainingArgs(); if (remainingArgs.length !=2) { System.err.println(“Error!”); System.exit(1); } JobConf job=new JobConf(conf,OldMulOutput.class); Path in=new Path(remainingArgs[0]); Path out=new Path(remainingArgs[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName(“Multifile”); job.setMapperClass(MapClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); MultipleOutputs.addNamedOutput(job, “chrono”, TextOutputFormat.class, NullWritable.class, Text.class); MultipleOutputs.addNamedOutput(job, “geo”, TextOutputFormat.class, NullWritable.class, Text.class); JobClient.runJob(job); } } } package duogemap; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.GenericOptionsParser; import duogemap.OldMulOutput.MapClass; public class MulOutput { public static class MapClass extends Mapper<LongWritable, Text, NullWritable, Text>{ private MultipleOutputs mos; @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); mos=new MultipleOutputs(context); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { mos.write(NullWritable.get(),value,generateFileName(value)); } private String generateFileName(Text value) { // TODO Auto-generated method stub String[] split=value.toString().split(“,”, -1); String country=split[4].substring(1, 3); return country+”/”; } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.cleanup(context); mos.close(); } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); Job job=Job.getInstance(conf, “Muloutput”); String[] remainingArgs=new GenericOptionsParser(conf, args).getRemainingArgs(); if (remainingArgs.length !=2) { System.err.println(“Error!”); System.exit(1); } Path in=new Path(remainingArgs[0]); Path out=new Path(remainingArgs[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(MapClass.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true)?0:1); } } }
mapreduce多文件输出的两方法
原文作者:MapReduce
原文地址: https://www.cnblogs.com/liquan-anran/p/6253087.html
本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
原文地址: https://www.cnblogs.com/liquan-anran/p/6253087.html
本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。