【大数据算法】Top N 算法的Hadoop实现

正文之前

感觉自己有一个世纪没写过文章了似的。不管了,今天看数据算法,那就拿这个开刀

《【大数据算法】Top N 算法的Hadoop实现》

另外,小生不才体真的超棒啊 !!

小生不才,
冒昧地喜欢了姑娘这么久,
甚是打扰,望多包涵,
今日愿以山上一草一木为誓,
今日你我二人就此别过,
若有重逢,我必待你眉眼如初,岁月如故,
若姑娘与有缘人终成眷属,
小生独羡其幸,
愿他宠你入骨,惜你如命,
好圆小生半事幽梦,
事已至此,缘尽,人散去了罢……

正文

算法梗概:

import scale.Tuple2;
import java.util.List;
import java.util.TreeMap;
import java.util.SortedMap;
import <your-package>.T; //此处是指你用到的自己定义的数据结构类型

static SortedMap<Integer, T> topN(List<Tuple<T,Integer>> L, int N){
    if((L == NULL) ||(L.isEmpty())){
        return null;
    }
    SortedMap<Integer, T> topN = new TreeMap<Integer, T>();
    for(Tuple2<T,Integer> element : L){
        // element._1 's type is T;
        // element._2 's type is Integer;
        topN.put(element._2,element._1);
        // only leave top N;
        if(topN.size() > N){
            topN.remove(topN.firstKey());
        }
    }
    return topN;
}

这就是TopN算法的精华所在,对一个排序Map进行依次输入,当该Map的size等于我们需要的N时,再进来一个元素,那么就必须把整合该元素后的Map最小值删除掉。也就是上面的firstKey。

我昨天肝了一天的程序,结果最后卡在一个小BUG身上,真是要死要活。。。去他么的!!!!书里面也没说这个地方,要不是我瞎鸡儿机灵,还真的没法解决这个BUG。。他么原理压根看不懂好嚒!!

《【大数据算法】Top N 算法的Hadoop实现》 这是我整个的文件架构

// hadoopClear.java
//import java.io.IOException;
//
//import org.apache.hadoop.conf.Configuration;
//import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
//import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapreduce.Job;
//import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.mapreduce.Reducer;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.hadoop.util.GenericOptionsParser;
//
//public class hadoopClear {
//    //map将输入中的value复制到输出数据的key上,并直接输出
//    public static class Map extends Mapper<Object,Text,Text,Text>{
//        //每行数据
//        private static Text line=new Text();
//        //实现map函数
//        public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
//            String line = value.toString();
//            String[] values = line.split(" ");
//            line = "";
//            for(int i=0; i<values.length -1 ;++i)
//            {
//                line += values[i];
//            }
//            context.write(new Text(line), new Text(""));
//        }
//    }
//    //reduce将输入中的key复制到输出数据的key上,并直接输出
//    public static class Reduce extends Reducer<Text,Text,Text,Text>{
//        //实现reduce函数
//        public void reduce(Text key,Iterable<Text> values,Context context)
//                throws IOException,InterruptedException{
//            context.write(key, new Text(""));
//        }
//    }
//
//    public static void main(String[] args) throws Exception{
//        Configuration conf = new Configuration();
//        //这句话很关键
//        conf.set("mapred.job.tracker", "node61:9001");
//        String[] ioArgs=new String[]{"dedup_in","dedup_out"};
//        String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
//        if (otherArgs.length != 2) {
//            System.err.println("Usage: Data Deduplication <in> <out>");
//            System.exit(2);
//        }
//        Job job = new Job(conf, "Data Deduplication");
//        job.setJarByClass(hadoopClear.class);
//        //设置Map、Combine和Reduce处理类
//        job.setMapperClass(Map.class);
//        job.setCombinerClass(Reduce.class);
//        job.setReducerClass(Reduce.class);
//        //设置输出类型
//        job.setOutputKeyClass(Text.class);
//        job.setOutputValueClass(Text.class);
//        //设置输入和输出目录
//        FileInputFormat.addInputPath(job, new Path(args[0]));
//        FileOutputFormat.setOutputPath(job, new Path(args[1]));
//        System.exit(job.waitForCompletion(true) ? 0 : 1);
//    }
//}
//
//

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * Created by  ZZB on 2018/6/10.
 */
public class hadoopClear {
    public static void main(String[] args)throws Exception{
        //创建配置对象
        Configuration conf = new Configuration();
        //创建job对象
        Job job = Job.getInstance(conf,"hadoopClear");
        //设置运行job的类
        job.setJarByClass(hadoopClear.class);
        //设置mapper 类
        job.setMapperClass(ZZB_Mapper.class);
        //设置reduce 类
        job.setReducerClass(ZZB_Reducer.class);
        //设置map输出的key value
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        //设置reduce 输出的 key value
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        //设置输入输出的路径
        FileInputFormat.setInputPaths(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        //提交job
        boolean b = job.waitForCompletion(true);
        if(!b){
            System.out.println("wordcount task fail!");
        }
    }
}

第二个Java代码:

//ZZB_Mapper.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.util.TreeMap;
import java.util.SortedMap;

import java.io.IOException;

/**
 * Created by  ZZB on 2018/6/10.
 */
public class ZZB_Mapper extends Mapper<LongWritable, Text, IntWritable,Text>{
    private SortedMap<Double, Text> top10cats = new TreeMap<>();
    private int N = 10;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
    }

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        //得到输入的每一行数据  
        String line = value.toString();
        //通过空格分隔  
        String[] values = line.split(",");
        Double weight = Double.parseDouble(values[0]);
        Text x = new Text(line);
        top10cats.put(weight,x);
        if (top10cats.size()>N){
            top10cats.remove(top10cats.firstKey());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        int s = 4;
        for (Text catAttr : top10cats.values()){
            s++;
            context.write(new IntWritable(s),catAttr);
        }
    }
}

第三个Java代码:

//ZZB_Reducer.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;

/**
 * Created by  ZZB on 2018/6/10.
 */
public class ZZB_Reducer extends Reducer<IntWritable, Text,IntWritable, Text> {
    private int N = 10;
    private int s = 0;
    private SortedMap<Double, Text> finaltop10 = new TreeMap<>();
    protected void reduce(IntWritable key, Text catRecord, Context context) throws IOException, InterruptedException {

            String cat = catRecord.toString();
            String[] tokens = cat.split(",");
            finaltop10.put(1.0, catRecord);
            if (finaltop10.size() > N) {
                finaltop10.remove(finaltop10.firstKey());
            }

        for (Text text : finaltop10.values()){
            s++;
            context.write(new IntWritable(s),text);
        }
    }
}

其中的一个Bug就是,如下图所示的这样,如果我不给一个X做新的序列化字符串载体,那么直接put原本的value进入的话,就会显示是空字符串,报错是empty String。。

我他么也很绝望啊!!!为毛啊!!难道splits方法会要干掉原本的对象中的这个内容吗???我日哦!!

《【大数据算法】Top N 算法的Hadoop实现》

不过不管如何,最后我反正是成功了!!很欣慰!!

下面是过程:

[zbzhang@node61 ~]$ ./test.sh
18/08/17 16:31:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted /output/result/_SUCCESS
Deleted /output/result/part-r-00000
18/08/17 16:31:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/08/17 16:31:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/08/17 16:31:56 INFO client.RMProxy: Connecting to ResourceManager at node61/11.11.0.61:8032
18/08/17 16:31:56 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/08/17 16:31:56 INFO input.FileInputFormat: Total input files to process : 1
18/08/17 16:31:57 INFO mapreduce.JobSubmitter: number of splits:1
18/08/17 16:31:57 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/08/17 16:31:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1534317717839_0032
18/08/17 16:31:57 INFO impl.YarnClientImpl: Submitted application application_1534317717839_0032
18/08/17 16:31:57 INFO mapreduce.Job: The url to track the job: http://node61:8088/proxy/application_1534317717839_0032/
18/08/17 16:31:57 INFO mapreduce.Job: Running job: job_1534317717839_0032
18/08/17 16:32:05 INFO mapreduce.Job: Job job_1534317717839_0032 running in uber mode : false
18/08/17 16:32:05 INFO mapreduce.Job:  map 0% reduce 0%
18/08/17 16:32:10 INFO mapreduce.Job:  map 100% reduce 0%
18/08/17 16:32:15 INFO mapreduce.Job:  map 100% reduce 100%
18/08/17 16:32:15 INFO mapreduce.Job: Job job_1534317717839_0032 completed successfully
18/08/17 16:32:15 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=226
                FILE: Number of bytes written=395195
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=1669
                HDFS: Number of bytes written=185
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=2784
                Total time spent by all reduces in occupied slots (ms)=3008
                Total time spent by all map tasks (ms)=2784
                Total time spent by all reduce tasks (ms)=3008
                Total vcore-milliseconds taken by all map tasks=2784
                Total vcore-milliseconds taken by all reduce tasks=3008
                Total megabyte-milliseconds taken by all map tasks=2850816
                Total megabyte-milliseconds taken by all reduce tasks=3080192
        Map-Reduce Framework
                Map input records=100
                Map output records=10
                Map output bytes=200
                Map output materialized bytes=226
                Input split bytes=97
                Combine input records=0
                Combine output records=0
                Reduce input groups=10
                Reduce shuffle bytes=226
                Reduce input records=10
                Reduce output records=10
                Spilled Records=20
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=120
                CPU time spent (ms)=1760
                Physical memory (bytes) snapshot=505204736
                Virtual memory (bytes) snapshot=5761363968
                Total committed heap usage (bytes)=343932928
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=1572
        File Output Format Counters
                Bytes Written=185
18/08/17 16:32:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

下面是结果:

5       919,cat17,cat17
6       926,cat21,cat21
7       947,cat61,cat61
8       950,cat10,cat10
9       952,cat13,cat13
10      958,cat52,cat52
11      976,cat92,cat92
12      977,cat83,cat83
13      987,cat23,cat23
14      993,cat39,cat39

可见还是很靠谱的。。。我下面把我的测试脚本以及测试数据都贴上来给大家伙瞧一瞧哈!!

[zbzhang@node61 ~]$ cat test.sh
hdfs dfs -rm /output/result/*
hdfs dfs -rmdir /output/result
hadoop jar hadoopClear.jar hadoopClear /input/cat.txt /output/result
hdfs dfs -cat /output/result/*
rm result.txt
hdfs dfs -get /output/result/part* result.txt

下面是一百只猫的数据,大概就是根据这一百只猫的随机体重进行排序。在数据量小的情况下当然可以直接读取,但是如果是过亿条数据??我估计你的电脑直接会卡死。但是Hadoop就不会了对吧?!!肯定不会的撒!

说的我都想试试了!!

正文之后

今天七夕哎。。。可惜。。。别人七夕秀恩爱。。我呢。。。Sufriendo de romperse !! それは本当に悲しいです! しかし、しばらくの間、あなた自身を運動させるのは良いことです!

    原文作者:HustWolf
    原文地址: https://www.jianshu.com/p/4e46c6a45076
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞