正文之前
感觉自己有一个世纪没写过文章了似的。不管了,今天看数据算法,那就拿这个开刀
另外,小生不才体真的超棒啊 !!
小生不才,
冒昧地喜欢了姑娘这么久,
甚是打扰,望多包涵,
今日愿以山上一草一木为誓,
今日你我二人就此别过,
若有重逢,我必待你眉眼如初,岁月如故,
若姑娘与有缘人终成眷属,
小生独羡其幸,
愿他宠你入骨,惜你如命,
好圆小生半事幽梦,
事已至此,缘尽,人散去了罢……
正文
算法梗概:
import scale.Tuple2;
import java.util.List;
import java.util.TreeMap;
import java.util.SortedMap;
import <your-package>.T; //此处是指你用到的自己定义的数据结构类型
static SortedMap<Integer, T> topN(List<Tuple<T,Integer>> L, int N){
if((L == NULL) ||(L.isEmpty())){
return null;
}
SortedMap<Integer, T> topN = new TreeMap<Integer, T>();
for(Tuple2<T,Integer> element : L){
// element._1 's type is T;
// element._2 's type is Integer;
topN.put(element._2,element._1);
// only leave top N;
if(topN.size() > N){
topN.remove(topN.firstKey());
}
}
return topN;
}
这就是TopN算法的精华所在,对一个排序Map进行依次输入,当该Map的size等于我们需要的N时,再进来一个元素,那么就必须把整合该元素后的Map最小值删除掉。也就是上面的firstKey。
我昨天肝了一天的程序,结果最后卡在一个小BUG身上,真是要死要活。。。去他么的!!!!书里面也没说这个地方,要不是我瞎鸡儿机灵,还真的没法解决这个BUG。。他么原理压根看不懂好嚒!!
这是我整个的文件架构
// hadoopClear.java
//import java.io.IOException;
//
//import org.apache.hadoop.conf.Configuration;
//import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
//import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapreduce.Job;
//import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.mapreduce.Reducer;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.hadoop.util.GenericOptionsParser;
//
//public class hadoopClear {
// //map将输入中的value复制到输出数据的key上,并直接输出
// public static class Map extends Mapper<Object,Text,Text,Text>{
// //每行数据
// private static Text line=new Text();
// //实现map函数
// public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
// String line = value.toString();
// String[] values = line.split(" ");
// line = "";
// for(int i=0; i<values.length -1 ;++i)
// {
// line += values[i];
// }
// context.write(new Text(line), new Text(""));
// }
// }
// //reduce将输入中的key复制到输出数据的key上,并直接输出
// public static class Reduce extends Reducer<Text,Text,Text,Text>{
// //实现reduce函数
// public void reduce(Text key,Iterable<Text> values,Context context)
// throws IOException,InterruptedException{
// context.write(key, new Text(""));
// }
// }
//
// public static void main(String[] args) throws Exception{
// Configuration conf = new Configuration();
// //这句话很关键
// conf.set("mapred.job.tracker", "node61:9001");
// String[] ioArgs=new String[]{"dedup_in","dedup_out"};
// String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
// if (otherArgs.length != 2) {
// System.err.println("Usage: Data Deduplication <in> <out>");
// System.exit(2);
// }
// Job job = new Job(conf, "Data Deduplication");
// job.setJarByClass(hadoopClear.class);
// //设置Map、Combine和Reduce处理类
// job.setMapperClass(Map.class);
// job.setCombinerClass(Reduce.class);
// job.setReducerClass(Reduce.class);
// //设置输出类型
// job.setOutputKeyClass(Text.class);
// job.setOutputValueClass(Text.class);
// //设置输入和输出目录
// FileInputFormat.addInputPath(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
// System.exit(job.waitForCompletion(true) ? 0 : 1);
// }
//}
//
//
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Created by ZZB on 2018/6/10.
*/
public class hadoopClear {
public static void main(String[] args)throws Exception{
//创建配置对象
Configuration conf = new Configuration();
//创建job对象
Job job = Job.getInstance(conf,"hadoopClear");
//设置运行job的类
job.setJarByClass(hadoopClear.class);
//设置mapper 类
job.setMapperClass(ZZB_Mapper.class);
//设置reduce 类
job.setReducerClass(ZZB_Reducer.class);
//设置map输出的key value
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
//设置reduce 输出的 key value
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
//设置输入输出的路径
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
//提交job
boolean b = job.waitForCompletion(true);
if(!b){
System.out.println("wordcount task fail!");
}
}
}
第二个Java代码:
//ZZB_Mapper.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.util.TreeMap;
import java.util.SortedMap;
import java.io.IOException;
/**
* Created by ZZB on 2018/6/10.
*/
public class ZZB_Mapper extends Mapper<LongWritable, Text, IntWritable,Text>{
private SortedMap<Double, Text> top10cats = new TreeMap<>();
private int N = 10;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
//得到输入的每一行数据
String line = value.toString();
//通过空格分隔
String[] values = line.split(",");
Double weight = Double.parseDouble(values[0]);
Text x = new Text(line);
top10cats.put(weight,x);
if (top10cats.size()>N){
top10cats.remove(top10cats.firstKey());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
int s = 4;
for (Text catAttr : top10cats.values()){
s++;
context.write(new IntWritable(s),catAttr);
}
}
}
第三个Java代码:
//ZZB_Reducer.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;
/**
* Created by ZZB on 2018/6/10.
*/
public class ZZB_Reducer extends Reducer<IntWritable, Text,IntWritable, Text> {
private int N = 10;
private int s = 0;
private SortedMap<Double, Text> finaltop10 = new TreeMap<>();
protected void reduce(IntWritable key, Text catRecord, Context context) throws IOException, InterruptedException {
String cat = catRecord.toString();
String[] tokens = cat.split(",");
finaltop10.put(1.0, catRecord);
if (finaltop10.size() > N) {
finaltop10.remove(finaltop10.firstKey());
}
for (Text text : finaltop10.values()){
s++;
context.write(new IntWritable(s),text);
}
}
}
其中的一个Bug就是,如下图所示的这样,如果我不给一个X做新的序列化字符串载体,那么直接put原本的value进入的话,就会显示是空字符串,报错是empty String。。
我他么也很绝望啊!!!为毛啊!!难道splits方法会要干掉原本的对象中的这个内容吗???我日哦!!
不过不管如何,最后我反正是成功了!!很欣慰!!
下面是过程:
[zbzhang@node61 ~]$ ./test.sh
18/08/17 16:31:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted /output/result/_SUCCESS
Deleted /output/result/part-r-00000
18/08/17 16:31:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/08/17 16:31:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/08/17 16:31:56 INFO client.RMProxy: Connecting to ResourceManager at node61/11.11.0.61:8032
18/08/17 16:31:56 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/08/17 16:31:56 INFO input.FileInputFormat: Total input files to process : 1
18/08/17 16:31:57 INFO mapreduce.JobSubmitter: number of splits:1
18/08/17 16:31:57 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/08/17 16:31:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1534317717839_0032
18/08/17 16:31:57 INFO impl.YarnClientImpl: Submitted application application_1534317717839_0032
18/08/17 16:31:57 INFO mapreduce.Job: The url to track the job: http://node61:8088/proxy/application_1534317717839_0032/
18/08/17 16:31:57 INFO mapreduce.Job: Running job: job_1534317717839_0032
18/08/17 16:32:05 INFO mapreduce.Job: Job job_1534317717839_0032 running in uber mode : false
18/08/17 16:32:05 INFO mapreduce.Job: map 0% reduce 0%
18/08/17 16:32:10 INFO mapreduce.Job: map 100% reduce 0%
18/08/17 16:32:15 INFO mapreduce.Job: map 100% reduce 100%
18/08/17 16:32:15 INFO mapreduce.Job: Job job_1534317717839_0032 completed successfully
18/08/17 16:32:15 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=226
FILE: Number of bytes written=395195
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1669
HDFS: Number of bytes written=185
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2784
Total time spent by all reduces in occupied slots (ms)=3008
Total time spent by all map tasks (ms)=2784
Total time spent by all reduce tasks (ms)=3008
Total vcore-milliseconds taken by all map tasks=2784
Total vcore-milliseconds taken by all reduce tasks=3008
Total megabyte-milliseconds taken by all map tasks=2850816
Total megabyte-milliseconds taken by all reduce tasks=3080192
Map-Reduce Framework
Map input records=100
Map output records=10
Map output bytes=200
Map output materialized bytes=226
Input split bytes=97
Combine input records=0
Combine output records=0
Reduce input groups=10
Reduce shuffle bytes=226
Reduce input records=10
Reduce output records=10
Spilled Records=20
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=120
CPU time spent (ms)=1760
Physical memory (bytes) snapshot=505204736
Virtual memory (bytes) snapshot=5761363968
Total committed heap usage (bytes)=343932928
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1572
File Output Format Counters
Bytes Written=185
18/08/17 16:32:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
下面是结果:
5 919,cat17,cat17
6 926,cat21,cat21
7 947,cat61,cat61
8 950,cat10,cat10
9 952,cat13,cat13
10 958,cat52,cat52
11 976,cat92,cat92
12 977,cat83,cat83
13 987,cat23,cat23
14 993,cat39,cat39
可见还是很靠谱的。。。我下面把我的测试脚本以及测试数据都贴上来给大家伙瞧一瞧哈!!
[zbzhang@node61 ~]$ cat test.sh
hdfs dfs -rm /output/result/*
hdfs dfs -rmdir /output/result
hadoop jar hadoopClear.jar hadoopClear /input/cat.txt /output/result
hdfs dfs -cat /output/result/*
rm result.txt
hdfs dfs -get /output/result/part* result.txt
下面是一百只猫的数据,大概就是根据这一百只猫的随机体重进行排序。在数据量小的情况下当然可以直接读取,但是如果是过亿条数据??我估计你的电脑直接会卡死。但是Hadoop就不会了对吧?!!肯定不会的撒!
说的我都想试试了!!
正文之后
今天七夕哎。。。可惜。。。别人七夕秀恩爱。。我呢。。。Sufriendo de romperse !! それは本当に悲しいです! しかし、しばらくの間、あなた自身を運動させるのは良いことです!