目录:
MapReduce基础概述
MapReduce实例以及编程规范
MapReduce运行方式
MapReduce并行以及切片机制
1.MapReduce基础概述
Hadoop有四大组件分别是:
HDFS:分布式存储系统
MapReduce:分布式计算系统
YARN: hadoop 的资源调度系统
Common: 以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等
Mapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析 应用”的核心框架
Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算程序,并发运行在一个 hadoop 集群上
2.MapReduce实例以及编程规范
在MapReduce中,官方开发团队给我们提供了一些事例,统计单词个数WordCount就是一个比较经典的例子,jar包在${HADOOP_HOME}/share/hadoop/mapreduce 目录下,博主用的Hadoop版本是2.8.5所以用到的jar包为:hadoop-mapreduce-examples-2.8.5.jar ,启动的方法为:hadoop jar hadoop-mapreduce-examples-2.8.5.jar wordcount /input /output ,参数用空格分开,在wordcount后的参数可以很多个,但最后一个是MapReduce输出目录,其余的为输入目录、或者输入文件。为什么是hadoop-mapreduce-examples-2.8.5.jar后的jar包为wordcount呢,因为我们输入hadoop jar命令后,会在包 org.apache.hadoop.exapmles找到ExampleDriver类,有一个主方法,设置了根据关键词来启动相应的类,比如wordcount对应的是WordCount.class
1 public class ExampleDriver { 2 3 public static void main(String argv[]){ 4 int exitCode = -1; 5 ProgramDriver pgd = new ProgramDriver(); 6 try { 7 pgd.addClass("wordcount", WordCount.class, 8 "A map/reduce program that counts the words in the input files."); 9 pgd.addClass("wordmean", WordMean.class, 10 "A map/reduce program that counts the average length of the words in the input files."); 11 pgd.addClass("wordmedian", WordMedian.class, 12 "A map/reduce program that counts the median length of the words in the input files."); 13 14 exitCode = pgd.run(argv); 15 } 16 catch(Throwable e){ 17 e.printStackTrace(); 18 } 19 20 System.exit(exitCode); 21 } 22 }
在MapReduce编程中,用户需要继承Mapper和Reduce类,然后重写map和reduce方法,在Job对象中设置setMapperClass和setReduceClass,MapReduce框架通过加载Map和Reduce的class字节码,然后通过Class.newInstance来实例化一个对象。
让我们看看WordCount中Mapper和Reduce类的详细代码。
1 public static class MapClass implements Mapper<LongWritable, Text, Text, IntWritable> { 2 3 private final static IntWritable one = new IntWritable(1); 4 private Text word = new Text(); 5 6 public void map(LongWritable key, Text value, 7 OutputCollector<Text, IntWritable> output, 8 Reporter reporter) throws IOException { 9 String line = value.toString(); 10 StringTokenizer itr = new StringTokenizer(line); 11 while (itr.hasMoreTokens()) { 12 word.set(itr.nextToken()); 13 output.collect(word, one); 14 } 15 } 16 } 17 18 public static class Reduce implements Reducer<Text, IntWritable, Text, IntWritable> { 19 20 public void reduce(Text key, Iterator<IntWritable> values, 21 OutputCollector<Text, IntWritable> output, 22 Reporter reporter) throws IOException { 23 int sum = 0; 24 while (values.hasNext()) { 25 sum += values.next().get(); 26 } 27 output.collect(key, new IntWritable(sum)); 28 } 29 }
在Mapper<LongWritable, Text, Text, IntWritable>中,有4个参数,其中前面两个是Mapper的输入参数,后面两个是在Mapper类中context.write(word,one)的输出参数类型也就是Reduce类中的前面两个输入参数类型,在Mapper中默认的输入InputFormat是TextInputFormat,通过getRecordReader来实例化一个LineRecordReader,该类以换行符号\n为分割符读取数据,所以第一个LongWritable参数是第一行在文件中的位置,而第二个参数Text则是LineRecorder读取到一行的内容
在Reduce中也有4个参数,前面两个Text,IntWritable是Mapper传过来的参数类型,但是Mapper框架会对key进行排序和shuffle,后面是一个迭代器,就是Mapper中context.write(word,one)中one的一个集合(混洗,后面源码解析会讲到)。输出的就是单词以及数目了,在outputFormat中通过getRecordWriter来实例化一个LineRecordWriter对象,然后默认\t为分隔符写到缓存文件中。这就完成了一个MapReduce计算过程。
3.MapReduce运行方式
MapReduce有多种运行方式,大概可以分为两种,一种是本地运行模式,另外一种是集群运行模式。
本地的运行模式是开发环境中使用,好处是方便调试和运行,本地运行需要设置的变量是
1)在Configuration中设置mapreduce.framework.name为local
2)在本地安装Hadoop,并且配置环境变量HADOOP_HOME
而在集群中运行,把开发好的代码和依赖包打包,上次Hadoop服务器,并且通过hadoop jar进行提交
4.MapReduce并行以及切片机制
1)并行原理
MapReduce并行是通过创建不同的Mapper来处理数据,并且通过shuffle来合并不同的数据,Mapper读取的数据是需要通过切割分片来完成了,我们把它称作InputSpilt,在运行作业区MapReduce框架将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多 个 split),然后每一个 split 分配一个 mapTask 并行实例处理这段逻辑及形成的切片规划描述文件,是由 FileInputFormat实现类的 getSplits()方法完成的。该方法返回的是 List<InputSplit>, InputSplit 封装了每一个逻辑切片的信息,包括长度和位置 信息,而 getSplits()方法返回一组 InputSplit。然后每个Mapper通过对应的InputSplit来获取相应的文件信息来读取数据进行处理。
2)切片机制
在切割文件时候,有一个切片大小,splitSize,MapReduce通过splitSize来切割数据,它的计算是通过
1 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 2 protected long computeSplitSize(long goalSize, long minSize, long blockSize) 3 { 4 return Math.max(minSize, Math.min(goalSize, blockSize)); 5 } 6
它的意思是minSize,blockSize,maxSize这3个变量中间的那个,其中
blockSize:块大小,默认128M,通过dfs.blocksize修改
minSize:默认1,通过mapreuce.input.fileinputformat.split.minszie修改
maxSize:默认Long.MaxValue,通过mapreduce.input.fileinputformat.split.maxsize修改