提到hadoop上运行的MR程序, 基本都知道由Map和Reduce两部分代码组成。这两部分代码是如何构成MR程序的基本框架, 以及两者是如何协助实现基本功能的, 本文在此做个基本的探索。
一、MR程序基本思路
以经典的word count为例,新建一个words文本:
hi, my name is justin
hello, this is a cat
how old are your
i am fine
thank you
hi, you are five old.
MR程序统计每个词出现的次数。MR程序实现的基本思路是:
MR程序会每次读取words文本中一行的单词, 按如下形式按行依次传递给Map阶段代码:
0 hi, my name is justin
22 hello, this is a cat
43 how old are your
…………………………….
第一个字段为本行首字符在文本中的偏移量
第二个字段为文本内容Map阶段处理,文本内容每个词处理成<word1, 1>的Map形式, 即以word字符串为key, value是次数1, 以第一行为例:<“hi”, 1> < “my”, 1> <“name”, 1> <“is”, 1>
Map的结果传送给Reduce, 得到的输入是:
<“my”, 1> <“is”, [1, 1]>, <“you”, [1, 1]>
输出结果是
hi 1
my 1
you 2
cat 1
……..
是 每个词出现的频率。
这里有两个没有涉及具体实现的点, 后续文章进步研究。
1.系统怎么获取文本内容<textoffset, textcontent>, 依次传递给Mapmap输出到reduce输入的转换, 是怎么实现的。
二、代码实现
看下具体的代码示例
//重点关注下Mapper4个参数类型:是两组key-value参数的类型,前两个是输入数据的key和value值的类型,后两个为输出数据的key, value数据类型。
//默认情况下,框架传递给输入的key-value分别是要处理的文本中一行的起始偏移量,以及这一行的内容,如下输入key(起始偏移量)类型longWritable, value是Text类型。
<pre><code>
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
//mapreduce框架每读一行数据就调用一次该方法
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//具体业务逻辑就写在这个方法体中,处理输入的数据以map kv形式 k:单词 v : 1输出
for (一行所有单词) {
context.write(new Text(word), new LongWritable(1));
}
}
</pre></code>
<pre><code>
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
//框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法 <hello,{1,1,1,1,1,1…..}>
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
long count = 0;
//遍历value的list,进行累加求和
for(LongWritable value:values){
count += value.get();
}
//输出这一个单词的统计结果
context.write(key, new LongWritable(count));
}
}
</pre></code>