WordCount是一个入门的MapReduce程序(从src\examples\org\apache\hadoop\examples粘贴过来的):
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MapReduce即将一个计算任务分为两个阶段:Map、Reduce。为什么要这么分解?
为了理解其含义,我们先不管MapReduce这一套框架,从一个简单的问题来看,如果对于100T的日志文件,需要统计其中出现的"ERROR"这个单词的次数,怎么办?
最简单的方法:单机处理,逐行读入每一行文本,统计并累加,则得到其值。问题是:因为数据量太大,速度太慢,怎么办?自然,多机并行处理就是一个自然的选择。
那么,这个文件怎么切分到多个机器呢?假定有100台机器,可以写一个主程序,将这个100T大文件按照每个机器存储1T的原则,在100台机器上分布存储,再把原来单机上的程序拷贝100份(无需修改)至100台机器上运行得到结果,此时得到的结果只是一个中间结果,最后需要写一个汇总程序,将统计结果进行累加,则完成计算。
将大文件分解后,对单机1T文件计算的过程就相当于Map,而Map的结果就相当于"ERROR"这个单词在本机1T文件中出现的次数,而最后的汇总程序就相当于Reduce,Reduce的输入来源于100台机器。在这个简单的例子中,有100个Map任务,1个Reduce任务。
100台机器计算后的中间结果需要传递到Reduce任务所在机器上,这个过程就是Shuffle,Shuffle这个单词的含义是”洗牌“,也就是将中间结果从Map所在机器传输到Reduce所在机器,在这个过程中,存在网络传输。
此时,我们利用上面的例子已经理解了Map-Shuffle-Reduce的基本含义,在上面的例子中,如果还需要对”WARNING“这个单词进行统计,那么怎么办呢?此时,每个Map任务就不仅需要统计本机1T文件中ERROR的个数,还需要统计WARNING的次数,然后在Reduce程序中分别进行统计。如果需要对所有单词进行统计呢?一个道理,每个Map任务对1T文件中所有单词进行统计计数,然后 Reduce对所有结果进行汇总,得到所有单词在100T大文件中出现的次数。此时,问题可能出现了,因为单词数量可能很多,Reduce用单机处理也可能存在瓶颈了,于是我们需要考虑用多台机器并行计算Reduce,假如用2台机器,因为Reduce只是对单词进行计数累加,所有可以按照这样简单的规则进行:大写字母A-Z开头的单词由Reduce 1累加;小写字母a-z开头的单词由Reduce 2累加。
在这种情况下,100个Map任务执行后的结果,都需要分为两部分,一部分准备送到Reduce 1统计,一部分准备送到Reduce 2统计,这个功能称为Partitioner,即将Map后的结果(比如一个文本文件,记录了各个单词在本机文件出现的次数)分解为两部分(比如两个文本 文件),准备送到两个Reduce任务。
因此,Shuffle在这里就是从100个Map任务和2个Reduce任务之间传输中间结果的过程。
我们继续考虑几个问题:
1、 如果Map后的中间结果数据量较大,Shuffle过程对网络带宽要求较高,因此需要将Map后的结果尽可能减小,这个功能当然可以在Map内自己搞 定,不过MapReduce将这个功能单独拎出来,称为Combiner,即合并,这个Combiner,指的是Map任务后中间结果的合并,相比于 Reduce的最终合并,这里相当于先进行一下局部汇总,减小中间结果,进而减小网络传输量。所以,在上面的例子中,假如Map并不计数,只是记录单词出现这个信息,输出结果是<ERROR,1>,<WARNING,1>,<WARNING,1>.....这样一个Key-Value序列,Combiner可以进行局部汇总,将Key相同的Value进行累加,形成一个新的Key-Value序列:<ERROR,14>,<WARNING,27>,.....,这样就大大减小了Shuffle需要的网络带宽,要知道现在数据中心一般使用千兆以太网,好些的使用万兆以太网,TCP/IP传输的效率不太高。这里Combiner汇总函数实际上可以与Reduce的汇总函数一致,只是输入数据不同。
2、 来自100个Map任务后的结果分别送到两个Reduce任务处理。对于任何一个Reduce任务,输入是一堆<ERROR,14>这样的 Key-Value序列,因为100个Map任务都有可能统计到ERROR的次数,因此这里会先进行一个归并,即将相同单词的归并到一起,形 成<ERROR, <14,36,.....>>,<WARNING,<27,45,...>>这样一个仍然是Key-Value的 序列,14、36、。。。分别表示第1、2、。。。台机器中ERROR的统计次数,这个归并过程在MapReduce中称为Merge。如果merge后 再进行Reduce,那么就只需要统计即可;如果事先没有merge,那么Reduce自己完成这一功能也行,只是两种情况下Reduce的输入Key- Value形式不同。
3、如果要求最后的单词统计结果还要形成字典序怎么办呢?可以自己在 Reduce中进行全排序,也可以100个Map任务中分别进行局部排序,然后将结果发到Reduce任务时,再进行归并排序。这个过程 MapReduce也内建支持,因此不需要用户自己去写排序程序,这个过程在MapReduce中称为Sort。
到这里,我们理解了MapReduce中的几个典型步骤:Map、Sort、Partitioner、 Combiner、Shuffle、Merge、Reduce。MapReduce程序之所以称为MapReduce,就说明Map、Reduce这两个 步骤对于一个并行计算来说几乎是必须的,你总得先分开算吧,所以必须有Map;你总得汇总吧,所以有Reduce。当然,理论上也可以不需要 Reduce,如果Map后就得到你要的结果的话。
Sort对于不需要顺序的程序里没意义(但MapReduce默认做了排序);
Partitioner对于Reduce只有一个的时候没意义,如果有多个Reduce,则需要,至于怎么分,用户可以继承Partitioner标准类,自己实现分解函数。控制中间结果如何传输。MapReduce提供的标准的Partitioner是 一个接口,用户可以自己实现getPartition()函数,MapReduce也提供了几个基本的实现,最典型的HashPartitioner是根 据用户设定的Reduce任务数量(注意,MapReduce中,Map任务的个数最终取决于数据分布,Reduce则是用户直接指定),按照哈希进行计算的:
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
这里,numReduceTasks就是用户设定的Reduce任务数量;
K2 key, V2 value 就是Map计算后的中间结果。
Combiner可以选择性放弃,但考虑到网络带宽,可以自己写相应的函数实现局部合并功能。很多情况下,直接利用Reduce那个程序即可,WordCount这个标准程序里就是这么用的。
Shuffle自然是必须的,不用写,根据Partitioner逻辑,框架自己去执行结果传输。
Merge也不是必须的,可以揉到Reduce里面实现等等也可以。因为这些操作的数据结构都是Key-Value,Reduce的输入只要是一个Key-Value即可,相当灵活。
我们再来看WordCount,这个MapReduce程序中定义了一个类:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
而Mapper是Hadoop中的一个接口,其定义为:
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable { /** * Maps a single input key/value pair into an intermediate key/value pair. * * <p>Output pairs need not be of the same types as input pairs. A given * input pair may map to zero or many output pairs. Output pairs are * collected with calls to * {@link OutputCollector#collect(Object,Object)}.</p> * * <p>Applications can use the {@link Reporter} provided to report progress * or just indicate that they are alive. In scenarios where the application * takes an insignificant amount of time to process individual key/value * pairs, this is crucial since the framework might assume that the task has * timed-out and kill that task. The other way of avoiding this is to set * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout"> * mapred.task.timeout</a> to a high-enough value (or even zero for no * time-outs).</p> * * @param key the input key. * @param value the input value. * @param output collects mapped keys and values. * @param reporter facility to report progress. */
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException; }
因此,Mapper里面并没有规定输入输出的类型是什么,只要是KeyValue的即可,K1、V1、K2、V2是什么由用户指定,反正只是实现K1、V1到K2、V2的映射即可。
在WordCount中实现了继承于Mapper<Object, Text, Text, IntWritable>的一个TokenizerMapper类,实现了map函数:map(Object key, Text value, Context context ) ;
TokenizerMapper中,输入的Key-Value是<Object, Text>,输出是<Text, IntWritable>,在WordCount程序里,K1代表一行文本的起始位置,V1代表这一行文本;
K2代表单词,V2代表"1",用于后面的累和。
同样,在MapReduce中,Reducer也是一个接口,其声明为:
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable { /** * <i>Reduces</i> values for a given key. * * <p>The framework calls this method for each * <code><key, (list of values)></code> pair in the grouped inputs. * Output values must be of the same type as input values. Input keys must * not be altered. The framework will <b>reuse</b> the key and value objects * that are passed into the reduce, therefore the application should clone * the objects they want to keep a copy of. In many cases, all values are * combined into zero or one value. * </p> * * <p>Output pairs are collected with calls to * {@link OutputCollector#collect(Object,Object)}.</p> * * <p>Applications can use the {@link Reporter} provided to report progress * or just indicate that they are alive. In scenarios where the application * takes an insignificant amount of time to process individual key/value * pairs, this is crucial since the framework might assume that the task has * timed-out and kill that task. The other way of avoiding this is to set * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout"> * mapred.task.timeout</a> to a high-enough value (or even zero for no * time-outs).</p> * * @param key the key. * @param values the list of values to reduce. * @param output to collect keys and combined values. * @param reporter facility to report progress. */
void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException; }
Reducer的输入为K2, V2(这个对应于Mapper输出的经过Shuffle到达Reducer端的K2,V2,), 输出为K3, V3。
在WordCount中,K2为单词,V2为1这个固定值(或者为局部出现次数,取决于是否有Combiner);K3还是单词,V3就是累和值。
而WordCount里存在继承于Reducer<Text, IntWritable, Text, IntWritable>的IntSumReducer类,完成单词计数累加功能。
对于Combiner,实际上MapReduce没有Combiner这个基类(WordCount自然也没有实现),从任务的提交函数来看:
public void setCombinerClass(Class<? extends Reducer> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); }
可以看出,Combiner使用的类实际上符合Reducer。两者是一样的。
再来看作业提交代码:
在之前先说一下Job和Task的区别,一个MapReduce运行流程称为一个Job,中文称“作业”。
在传统的分布式计算领域,一个Job分为多个Task运行。Task中文一般称为任务,在Hadoop中,这种任务有两种:Map和Reduce
所以下面说到Map和Reduce时,指的是任务;说到整个流程时,指的是作业。不过由于疏忽,可能会将作业称为任务的情况。
根据上下文容易区分出来。
1 Job job = new Job(conf, "word count"); 2 job.setJarByClass(WordCount.class); 3 job.setMapperClass(TokenizerMapper.class); 4 job.setCombinerClass(IntSumReducer.class); 5 job.setReducerClass(IntSumReducer.class); 6 job.setOutputKeyClass(Text.class); 7 job.setOutputValueClass(IntWritable.class); 8 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 9 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 10 System.exit(job.waitForCompletion(true) ? 0 : 1);
第1行创建一个Job对象,Job是MapReduce中提供的一个作业类,其声明为:
public class Job extends JobContext { public static enum JobState {DEFINE, RUNNING}; private JobState state = JobState.DEFINE; private JobClient jobClient; private RunningJob info; .......
之后,设置该作业的运行类,也就是WordCount这个类;
然后设置Map、Combiner、Reduce三个实现类;
之后,设置输出Key和Value的类,这两个类表明了MapReduce作业完毕后的结果。
Key即单词,为一个Text对象,Text是Hadoop提供的一个可以序列化的文本类;
Value为计数,为一个IntWritable对象,IntWritable是Hadoop提供的一个可以序列化的整数类。
之所以不用普通的String和int,是因为输出Key、 Value需要写入HDFS,因此Key和Value都要可写,这种可写能力在Hadoop中使用一个接口Writable表示,其实就相当于序列化,换句话说,Key、Value必须得有可序列化的能力。Writable的声明为:
public interface Writable { /** * Serialize the fields of this object to <code>out</code>. * * @param out <code>DataOuput</code> to serialize this object into. * @throws IOException */
void write(DataOutput out) throws IOException; /** * Deserialize the fields of this object from <code>in</code>. * * <p>For efficiency, implementations should attempt to re-use storage in the * existing object where possible.</p> * * @param in <code>DataInput</code> to deseriablize this object from. * @throws IOException */
void readFields(DataInput in) throws IOException; }
在第8、9行,还设置了要计算的文件在HDFS中的路径,设定好这些配置和参数后,执行作业提交:job.waitForCompletion(true)
waitForCompletion是Job类中实现的一个方法:
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { jobClient.monitorAndPrintJob(conf, info); } else { info.waitForCompletion(); } return isSuccessful(); }
即执行submit函数:
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); // Connect to the JobTracker and submit the job
connect(); info = jobClient.submitJobInternal(conf); super.setJobID(info.getID()); state = JobState.RUNNING; }
其中,调用jobClient对象的submitJobInternal方法进行作业提交。jobClient是 JobClient对象,在执行connect()的时候即创建出来:
private void connect() throws IOException, InterruptedException { ugi.doAs(new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { jobClient = new JobClient((JobConf) getConfiguration()); return null; } }); }
创建JobClient的参数是这个作业的配置信息,JobClient是MapReduce作业的客户端部分,主要用于提交作业等等。而具体的作业提交在submitJobInternal方法中实现,关于submitJobInternal的具体实现,包括MapReduce的作业执行流程,较为复杂,留作下一节描述。
关于MapReduce的这一流程,我们也可以看出一些特点:
1、 Map任务之间是不通信的,这与传统的MPI(Message Passing Interface)存在本质区别,这就要求划分后的任务具有独立性。这个要求一方面限制了MapReduce的应用场合,但另一方面对于任务执行出错后的处理十分方便,比如执行某个Map任务的机器挂掉了,可以不管其他Map任务,重新在另一台机器上执行一遍即可。因为底层的数据在HDFS里面,有3 份备份,所以数据冗余搭配上Map的重执行这一能力,可以将集群计算的容错性相比MPI而言大大增强。后续博文会对MPI进行剖析,也会对 MapReduce与传统高性能计算中的并行计算框架进行比较。
2、Map任务的分配与数据的分布关系十分密切,对于上面的例子,这个100T的大文件分布在多台机器上,MapReduce框架会根据文件的实际存储位置分配Map任务,这一过程需要对HDFS有好的理解,在后续博文中会对HDFS中进行剖析。到时候,能更好滴理解MapReduce框架。因为两者是搭配起来使用的。
3、 MapReduce的输入数据来自于HDFS,输出结果也写到HDFS。如果一个事情很复杂,需要分成很多个MapReduce作业反复运行,那么就需要来来回回地从磁盘中搬移数据的过程,速度很慢,后续博文会对Spark这一内存计算框架进行剖析,到时候,能更好滴理解MapReduce性能。
4、MapReduce的输入数据和输出结果也可以来自于HBase,HBase本身搭建于HDFS之上(理论上也可以搭建于其他文件系统),这种应用场合大多需要MapReduce处理一些海量结构化数据。后续博文会对HBase进行剖析。