mapreduce多文件输出的两方法

mapreduce多文件输出的两方法   package duogemap;   import java.io.IOException;   import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.MultipleOutputs; import org.apache.hadoop.util.GenericOptionsParser;   public class OldMulOutput {   public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, Text>{ private MultipleOutputs mos; private OutputCollector<NullWritable, Text> collector;     public void Configured(JobConf conf){ mos=new MultipleOutputs(conf); }   public void map(LongWritable key, Text value, OutputCollector<NullWritable, Text> output,Reporter reporter) throws IOException{ String[] arr=value.toString().split(“,”, -1); String chrono=arr[1]+”,”+arr[2]; String geo=arr[4]+”,”+arr[5]; collector=mos.getCollector(“chrono”, reporter); collector.collect(NullWritable.get(),new Text(chrono)); collector=mos.getCollector(“geo”, reporter); collector.collect(NullWritable.get(),new Text(geo)); }   public void close() throws IOException{ mos.close(); }     public static void main(String[] args) throws IOException { Configuration conf=new Configuration(); String[] remainingArgs=new GenericOptionsParser(conf, args).getRemainingArgs();   if (remainingArgs.length !=2) { System.err.println(“Error!”); System.exit(1); }   JobConf job=new JobConf(conf,OldMulOutput.class); Path in=new Path(remainingArgs[0]); Path out=new Path(remainingArgs[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out);   job.setJobName(“Multifile”); job.setMapperClass(MapClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);   job.setNumReduceTasks(0); MultipleOutputs.addNamedOutput(job, “chrono”, TextOutputFormat.class, NullWritable.class, Text.class); MultipleOutputs.addNamedOutput(job, “geo”, TextOutputFormat.class, NullWritable.class, Text.class); JobClient.runJob(job); }   } }       package duogemap;   import java.io.IOException;   import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.GenericOptionsParser;   import duogemap.OldMulOutput.MapClass;   public class MulOutput {   public static class MapClass extends Mapper<LongWritable, Text, NullWritable, Text>{   private MultipleOutputs mos;   @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); mos=new MultipleOutputs(context); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { mos.write(NullWritable.get(),value,generateFileName(value)); } private String generateFileName(Text value) { // TODO Auto-generated method stub String[] split=value.toString().split(“,”, -1); String country=split[4].substring(1, 3);   return country+”/”; } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.cleanup(context); mos.close(); }   public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); Job job=Job.getInstance(conf, “Muloutput”); String[] remainingArgs=new GenericOptionsParser(conf, args).getRemainingArgs();   if (remainingArgs.length !=2) { System.err.println(“Error!”); System.exit(1); }   Path in=new Path(remainingArgs[0]); Path out=new Path(remainingArgs[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out);   job.setMapperClass(MapClass.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);   job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true)?0:1); } } }                              

    原文作者:MapReduce
    原文地址: https://www.cnblogs.com/liquan-anran/p/6253087.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞