举例理解MapReduce—排序

例子实现目标

该代码实现的是在输入的数据对中,先以第一列由小到大排序,如果第一列值相等,以第二列由小到大排序。即:
添加cp.txt文件到input文件夹

$vim cp.txt
$hadoop fs -put cp.txt /input/

5,1
3,2
1,3
4,3
2,3
1,4
1,2
2,5

输出结果

1,2
1,3
1,4
2,3
2,5,
3,2
4,3
5,1

附图:

《举例理解MapReduce—排序》 image.png

实践例子

1.终端执行>start-all.sh
2.input文件夹下增加cp.txt文件
3.打开eclipse
4.新建mapreduce项目,新建包(命名mr),新建类(命名MySortClass )类代码如下:
5.右键,选择run as hadoop
6.右键refresh一下hadoop文件,成功后output下会出现成功排序的结果文件

package mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import mr.MyWordCount.MyMapper;
import mr.MyWordCount.MyReduce;

public class MySortClass {
    
    static class MySortMapper  extends  Mapper<LongWritable, Text, A, NullWritable>{  
        
         public void map(LongWritable k1, Text v1, Context context) 
                         throws java.io.IOException, java.lang.InterruptedException
         {
            String[]  lines= v1.toString().split(",");
             
                A  a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));
             
            context.write(a1, NullWritable.get());
            System.out.println("map......");
         }
        
    }
    
    static class  MySortReduce extends Reducer<A, NullWritable, Text, Text>{
         public void reduce(A k2, Iterable<NullWritable> v2, Context context) throws java.io.IOException, java.lang.InterruptedException
         {
              
              
             context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));    
             
             System.out.println("reduce......");
         }
            
    }
    
    private static class  A implements WritableComparable<A> {
        long firstNum;
        long secondNum;
 
        public A() {
        }
 
        public A(long first, long second) {
            firstNum = first;
            secondNum = second;
        }
 
       
        public void write(DataOutput out) throws IOException {
            out.writeLong(firstNum);
            out.writeLong(secondNum);
        }
    
        public void readFields(DataInput in) throws IOException {
            firstNum = in.readLong();
            secondNum = in.readLong();
        }
 
        /*
         * 当key进行排序时会调用以下这个compreTo方法
         */
        @Override
        public int compareTo(A anotherKey) {
            long min = firstNum - anotherKey.firstNum;
            if (min != 0) {
                // 说明第一列不相等,则返回两数之间小的数
                return (int) min;
            } else {
                return (int) (secondNum - anotherKey.secondNum);
            }
        }
    }
    private static String INPUT_PATH="hdfs://master:9000/input/cp.txt";
    private static String OUTPUT_PATH="hdfs://master:9000/output/c/";

    public static void main(String[] args) throws  Exception {
        Configuration  conf=new Configuration();
        FileSystem  fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
     
        if(fs.exists(new Path(OUTPUT_PATH)))
                fs.delete(new Path(OUTPUT_PATH));
        
        Job  job=new Job(conf,"myjob");
        
        job.setJarByClass(MySortClass.class);
        job.setMapperClass(MySortMapper.class);
        job.setReducerClass(MySortReduce.class);
        //job.setCombinerClass(MySortReduce.class);
         
    
        job.setMapOutputKeyClass(A.class);
        job.setMapOutputValueClass(NullWritable.class); 
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

    /*如果map和reduce的<key,value>类型是一样的,
则仅设置job.setOutputKeyClass();job.setOutputValueClass();即可*/

        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        
        job.waitForCompletion(true);

    }

}

部分代码理解

类A实现了WritableComparable,设置了两个属性firstNum; secondNum;

String[] lines= v1.toString().split(“,”);

读取一行(5,1)以逗号分隔,两个元素(5)(1)存入数组lines

A a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));

Long.parseLong(lines[0])将string类型的“5”转化为long类型,a1.firstNum=5;a1.secondNum=1;

context.write(a1, NullWritable.get());

写入上下文,设置map的输出为<key,空>,不能使用new NullWritable()来定义,获取空值只能NullWritable.get()来获取

context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));

reduce生成新的键值对,如:将<(5,1),null>转化为<5,1>

以map->reduce集群处理流程理解该例子(假设文件庞大)

1.首先对输入文件分片(inputSplit),假设分片大小为三行,那么分为三片:

5,1
3,2
1,3

4,3
2,3
1,4

1,2
2,5

2.三片交由三个map进程处理,生成键值对<a1,null>,为减少带宽负荷,在本地节点上做了排序,分区(partitioner,数据做了分区标记)输出结果:
(如果有需要在分区之前还可以进行combiner(本地reduce操作,详情请见文章《了解MapReduce》底部对combiner的解释),这里分区之前不需要combiner)

<(1,3),null>
<(3,2),null>
<(5,1),null>

<(1,4),null>
<(2,3),null>
<(4,3),null>

<(1,2),null>
<(2,5),null>

  1. 然后就是所有节点洗牌(shuffle),将各个节点上同个分区的数据放置到一个节点中,放置过去后做了排序:

<(1,2),null>
<(1,3),null>
<(1,4),null>

<(2,3),null>
<(2,5),null>

<(3,2),null>

<(4,3),null>

<(5,1),null>

  1. 最后就是reduce,生成新键值对并生成最后排序结果

(1,2)
(1,3)
(1,4)
(2,3)
(2,5)
(3,2)
(4,3)
(5,1)

总的来说就是:map(本地)->combiner(本地)->partitioner(本地)->shuffle(集群)->reduce(新本地),各部分又还有细节操作,combiner和partitioner属于map阶段的,shuffle属于reduce阶段的。
附图理解:

《举例理解MapReduce—排序》 Paste_Image.png

因为对hadoop源码还不是很熟悉,所以不能很好地解释代码,欢迎大家建议和指导。
    原文作者:VVictoriaLee
    原文地址: https://www.jianshu.com/p/c552be6849b7
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞