MapReduce:原理之Word Count 以及Java实现

MapReduce原理:

Hadoop的分布式计算框架(MapReduce)– 适合离线计算

核心思想: 移动计算而不移动数据。

MR是计算来自HDFS上的数据,可以看到,HDFS是大数据的存储,MR是大数据的计算。

《MapReduce:原理之Word Count 以及Java实现》

MapReduce流程:input->Splitting->Mapping->Shuffling->Reducing-> result

MapReduce程序读取的数据,都是存储在HDFS的数据,最后的结果,也是要保存在HDFS中,因此,MapReduce要解决的第一个问题就是数据的切分问题。

      MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce就是”任务的分解与结果的汇总”。

  在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker;另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracker是用于执行工作的。一个Hadoop集群中只有一台JobTracker。

  在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。

需要注意的是,用MapReduce来处理的数据集(或任务)必须具备这样的特点:

待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。

在Hadoop中,每个MapReduce任务都被初始化为一个Job,每个Job又可以分为两种阶段:map阶段和reduce阶段。这两个阶段分别用两个函数表示,即map函数和reduce函数。map函数接收一个形式的输入,然后同样产生一个形式的中间输出,reduce函数接收一个如此形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是形式的。

 

 

 

MapReduce编程模型

下面以Word Count 例子说明:

单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版”Hello World”,该程序的完整代码可以在Hadoop安装包的”src/examples”目录下找到。单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数,如下图所示。

下面我们通过示意图来分析 MapReduce 过程

  《MapReduce:原理之Word Count 以及Java实现》

  上图的流程大概分为以下几步:

  1、假设一个文件有三行英文单词作为 MapReduce 的Input(输入),这里经过 Splitting过程把文件分割为3块。分割后的3块数据就可以并行处理,每一块交给一个 map 线程处理。

  2、每个map线程中,以每个单词为key,以1作为词频数value,然后输出。

  3、每个map的输出要经过shuffling(混洗),将相同的单词key放在一个桶里面,然后交给reduce处理。

  4、reduce接受到shuffle后的数据,会将相同的单词进行合并,得到每个单词的词频数,最后将统计好的每个单词的词频数作为输出结果。

  上述就是 MapReduce 的大致流程,前两步可以看做map阶段,后两步可以看做reduce 阶段。

       参考:https://www.cnblogs.com/codeOfLife/p/5410423.html#MapReduce实例

 

 

 

Hadoop计算框架Shuffle

处于map和reduce阶段之间。

其中,最重要的是Shuffle阶段,这个阶段也是最难理解的阶段,以下是官方对Shuffle过程的描述。

《MapReduce:原理之Word Count 以及Java实现》

 

针对官方文档,更加细致的map段的描述如下:

《MapReduce:原理之Word Count 以及Java实现》

 

(1)input

        map task执行的时候,数据的来源是HDFS上的block,在MapReduce概念中,读取的是split,split和block是对应的。

    (2)map阶段

        经过自定义的map函数的处理后,结果将以key/value的形式输出,但是这些结果要送到以后的哪一个reduce去执行,需要partition来决定。

    (3)partition

        根据key或者value的值,以及reduce的数量,来决定当前的这对输出数据要交到那个reduce task去处理

        默认方法: 对key  hash以后再以reduce的数量取模。

        也可以由程序员自定义Partition函数

        partition只是对数据以后要被送到那个reduce去处理做了一个标注,而不是立马把数据进行分区

        数据倾斜和负载均衡:默认的partition是可能产生数据倾斜和负载均衡,如果产生数据倾斜,就需要重新定义partition的分区规则,就可以避免数据倾斜问题。

 

 

内存缓冲区的概念

        每个map任务都有一个内存缓冲区,用于存储任务的输出,默认缓冲区的大小是100M,这个值是可以通过io.sort.mb来调整。由于缓冲区的空间大小有限,所以,当map task的输出结果很多的时候,内存缓冲区就装不下这么多的数据,也就需要将数据写到磁盘去。因此需要一个阈值(io.sort.spill.percent,默认是80%),当内存达到阈值以后,就会有一个单独的后台线程,负责将内存中的数据写到磁盘,这个过程叫做溢写由于是由单独的线程来负责溢写,所以,溢写过程不会影响map结果的输出,但是,如果此期间缓冲区被写满,map就会阻塞知道写磁盘过程完成。

        这里,就有两个过程,一个写内存的过程,另外一个是写磁盘的过程。

    (4)sort

        这里sort是按照字典顺序进行排序,而不是按照数值大小进行排序,举个栗子,就是说,a一定排在d的后面。

    (5)combine

       将排好序的数据,按照键相同的合并在一起的规则,进行值的合并。比如两个<hello,1>合并以后,就变成了一个<hello,2>。

        map的输出结果在经过partition阶段的处理后,明确了要发给哪个reduce去做处理,当写入了内存后,需要将所有的键值对按照key进行sort,在sort的基础上,再对结果进行combine,最后,在写到磁盘文件上去。所以,在磁盘上的数据,是已经分好区的,并且已经排好序的。

溢写磁盘需要注意的地方:如果map的输出结果很大,有多次溢写发生的话,磁盘上就会存在多个溢写文件(每次溢写都会产生一个溢写文件),在map task真正的完成是时,会将所有的溢写文件都Merge到一个溢写文件中,这个过程就叫Merge。比如,从一个map读取过来的是<aaa, 5>,另外一个map读取的是<aaa,8>。相同的key,就会merge成一个group{aaa,[5, 8…]},这个数组中的不同的值,就是从不同的溢写文件中读取过来的,然后把这些值加起来。

 

为什么要设置内存缓冲区?   

批量收集map的结果,减少磁盘IO次数,提高效率。

 

磁盘文件要写到哪里?    

写磁盘将按照轮询方式写到mapred.local.dir属性指定的作业特定子目录的目录中。也就是存放在TaskTracker够得着的某个本地目录,每一个reduce task不断通过RPC从JobTracker中获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task完成,Shuffle的后半段就开始了。

 

 

所有的合并究竟是为了什么?

因为map节点和reduce节点之间的数据拷贝是通过网络进行拷贝的,数据量越小,拷贝的越快,相应的处理也就越快,那个,合并的目的就是减少map的输出数据量,是网络拷贝尽可能快。

需要特殊说明的是,以上的步骤,都是在本地机器上完成,并不需要通过网络进行数据的传输。  

《MapReduce:原理之Word Count 以及Java实现》

 

 

 

reduce段的Shuffle细节:

    (1)    copy阶段

           reduce进程启动一些数据的copy线程,这个线程叫做fetcher线程,通过http方式请求map task所在的TaskTracker,来获取map task的输出数据。

           reduce拷贝数据,不是进行随意的拷贝,之前的partition,已经将数据分好区,reduce只是拷贝各个map上分割给自己的那一部分数据,拷贝到本地后,从每一个map上拷贝过来的数据都是一个小文件,也是需要对这些小文件进行合并的。合并以后,输出到reduce进行处理。

Word Count 为例,结合代码分析:

Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。

通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。

然后StringTokenizer类将每一行拆分成为一个个的单词,并将作为map方法的结果输出,其余的工作都交有MapReduce框架处理。

Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。

Map过程输出中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。

在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。

此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。

任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。

Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。

 

package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
  public static class TokenizerMapper
      extends Mapper {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  public static class IntSumReducer
      extends Reducer {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable values,Context context)
           throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
           sum += val.get();
        }
      result.set(sum);
      context.write(key, result);
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println(“Usage: wordcount “);
      System.exit(2);
    }
    Job job = new Job(conf, “word count”);
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
 

 

 

    BooleanWritable:标准布尔型数值

    ByteWritable:单字节数值

    DoubleWritable:双字节数

    FloatWritable:浮点数

    IntWritable:整型数

    LongWritable:长整型数

    Text:使用UTF8格式存储的文本

    NullWritable:当中的key或value为空时使用

原文参考:https://blog.csdn.net/goodhuajun/article/details/39582605

                  https://blog.csdn.net/Jerome_s/article/details/26441151

 

    原文作者:花和尚也有春天
    原文地址: https://blog.csdn.net/weixin_38750084/article/details/82716601
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞