package com.timger.tools
/**
* Created by timger on 15-1-26.
*/
import com.timger.etl.TokenizerMapper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, NullWritable, IntWritable, Text}
//import org.apache.hadoop.mapred.lib.{MultipleOutputFormat, MultipleTextOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.{Job,InputFormat,OutputFormat}
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat, FileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.GenericOptionsParser
import scala.collection.JavaConversions._
import org.apache.hadoop.mapreduce.lib.output;
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat}
/**
* https://hadoop.apache.org/docs/r2.2.0//api/org/apache/hadoop/mapred/lib/MultipleOutputs.html
*/
// This class performs the map operation, translating raw input into the key-value
// pairs we will feed into our reduce operation.
class SplitMapper extends Mapper[Object,Text,Text,Text] {
//https://github.com/rystsov/learning-hadoop/blob/master/src/main/java/com/twitter/rystsov/mr/MultipulOutputExample.java
//var multipleOutputs :MultipleOutputs = null;
private var multipleOutputs: MultipleOutputs[Text, Text] = null
@throws(classOf[java.io.IOException])
@throws(classOf[java.lang.InterruptedException])
override
protected def setup(context:Mapper[Object, Text, Text, Text]#Context) {
//pattern = Pattern.compile("^http://([^/]+).+$");
multipleOutputs = new MultipleOutputs(context);
println("SetUp Ok ......................");
}
override
def map(key:Object, value:Text, context:Mapper[Object,Text,Text,Text]#Context) = {
//var conf :Configuration = context.getConfiguration();
//var splitkeys = conf.get("splitkeys");
var keys = "26/Jan/2015,27/Jan/2015".split(",");
//var keys =
val word = new Text;
var line = value.toString();
for (key <- keys) {
word.set(key)
if (line.contains(key)) {
var mkey :String = key.toString().replace("/", "");
context.write(word, value);
multipleOutputs.write(mkey,
word,
value);
}
}
}
@throws(classOf[java.io.IOException])
@throws(classOf[InterruptedException])
override
protected def cleanup( context:Mapper[Object,Text,Text,Text]#Context){
multipleOutputs.close();
}
}
/***
class MutiFIleReducer extends Reducer[Text,Text,Text,Text] {
override
def reduce(key:Text, values:java.lang.Iterable[Text], context:Reducer[Text,Text,Text,Text]#Context) = {
for (value <- values) {
context.write(key, value)
}
}
}
***/
// This class configures and runs the job with the map and reduce classes we've
// specified above.
object SplitFile {
def main(args:Array[String]):Int = {
val conf = new Configuration()
val otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs
var usage =
"""
|Usage: FileSplit <inputdir> <outdir> <tmpdir> <keeyoriginfile>[<key1> <key2> <key3> key4>...]
|inputdir is a file or dir
|outputdir is a dir need not blank
|tmpdir use for tmp
|keeyoriginfile is a bool
"""
print("\n\n");
print(args.toString());
print(otherArgs);
println()
if (args.length < 5) {
print(usage)
return 2
}
val job = new Job(conf, "com.timger.tools.SplitFile")
job.setJarByClass(classOf[SplitMapper])
job.setMapperClass(classOf[SplitMapper])
//job.setCombinerClass(classOf[IntSumReducer])
//job.setReducerClass(classOf[MutiFIleReducer])
job.setOutputKeyClass(classOf[Text])
job.setInputFormatClass(classOf[TextInputFormat])
var inputdir = new Path(otherArgs(0));
var outputdir = new Path(otherArgs(1));
var tmpdir = new Path(otherArgs(2));
var keeyoriginfile :String= otherArgs(3);
var splitkeys :Array[String]= otherArgs.slice(4, otherArgs.length);
var keys = splitkeys.mkString(",")
var test_str :String =
"""
|input: %s
|output: %s
|tmpdir: %s
|keys: %s
""".format(inputdir, outputdir, tmpdir, keys);
print(test_str);
//onf.set("splitkeys", keys);
print(splitkeys);
// val TextOutputFormatClass = classOf[TextOutputFormat].asInstanceOf[Class[T] forSome {type T <: OutputFormat[String, String]}]
for (key <- splitkeys) {
var mkey :String = key.toString().replace("/", "");
print(mkey);
MultipleOutputs.addNamedOutput(job, mkey,
classOf[TextOutputFormat[Text,Text]],
classOf[Text],
classOf[Text]);
}
//job.setNumReduceTasks(splitkeys.length);
job.setNumReduceTasks(0);
job.setOutputValueClass(classOf[Text])
FileInputFormat.addInputPath(job, inputdir)
FileOutputFormat.setOutputPath(job, tmpdir)
//job.setOutputFormatClass(classOf[MyMultipleOutputFormat]);
//job.setOutputFormatClass(classOf[CustomMultipleTextOutputFormat]);
//LazyOutputFormat.setOutputFormatClass(conf, classOf[MyMultipleOutputFormat])
if (job.waitForCompletion(true)) 0 else 1
}
}
scala 实现 hadoop 多重文件输出
原文作者:MapReduce
原文地址: https://segmentfault.com/a/1190000002517336
本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
原文地址: https://segmentfault.com/a/1190000002517336
本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。