Mapper类4个函数的解析
Mapper有setup(),map(),cleanup()和run()四个方法。其中setup()一般是用来进行一些map()前的准备工作,map()则一般承担主要的处理工作,cleanup()则是收尾工作如关闭文件或者执行map()后的K-V分发等。run()方法提供了setup->map->cleanup()的执行模板。
在MapReduce中,Mapper从一个输入分片中读取数据,然后经过Shuffle and Sort阶段,分发数据给Reducer,在Map端和Reduce端我们可能使用设置的Combiner进行合并,这在Reduce前进行。Partitioner控制每个K-V对应该被分发到哪个reducer[我们的Job可能有多个reducer],Hadoop默认使用HashPartitioner,HashPartitioner使用key的hashCode对reducer的数量取模得来。
protected void setup(Mapper.Context context) throws IOException,InterruptedException //Called once at the beginning of the task
protected void cleanup(Mapper.Context context)throws IOException,InterruptedException //Called once at the end of the task.
protected void map(KEYIN key, VALUEIN value Mapper.Context context)throws IOException,InterruptedException
{
context.write((KEYOUT) key,(VALUEOUT) value);
}
//Called once for each key/value pair in the input split. Most applications should override this, but the default is the identity function.
public void run(Mapper.Context context)throws IOException,InterruptedException
{
setup(context);
while(context.nextKeyValue())
{
map(context.getCurrentKey(),context.getCurrentValue(),context)
}
cleanup(context);
}
//Expert users can override this method for more complete control over the execution of the Mapper.
执行顺序:setup ---> map/run ----> cleanup
Mapper的三个子类,它们位于src\mapred\org\apache\hadoop\mapreduce\lib\map中(详解http://blog.csdn.net/posa88/article/details/7901304)
1、TokenCounterMapper
2、InverseMapper
3、MultithreadedMapper
同理在Reducer类中也存在4个函数
protected void setup(Mapper.Context context) throws IOException,InterruptedException //Called once at the beginning of the task
protected void cleanup(Mapper.Context context)throws IOException,InterruptedException //Called once at the end of the task.
protected void reduce(KEYIN key, VALUEIN value Reducer.Context context)throws IOException,InterruptedException
{
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
//This method is called once for each key. Most applications will define their reduce class by overriding this method. The default implementation is an identity function.
public void run(Reducer.Context context)throws IOException,InterruptedException
{
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
((ReduceContext.ValueIterator)
(context.getValues().iterator())).resetBackupStore();
}
cleanup(context);
}
}
//Advanced application writers can use therun(org.apache.hadoop.mapreduce.Reducer.Context)
method to control how the reduce task works
执行顺序:setup ---> map/run ----> cleanup