Hadoop技术内幕中指出Top K算法有两步,一是统计词频,二是找出词频最高的前K个词。在网上找了很多MapReduce的Top K案例,这些案例都只有排序功能,所以自己写了个案例。
这个案例分两个步骤,第一个是就是wordCount案例,二就是排序功能。
一,统计词频
1 package TopK; 2 import java.io.IOException; 3 import java.util.StringTokenizer; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 /** 16 * 统计词频 17 * @author zx 18 * zhangxian1991@qq.com 19 */ 20 public class WordCount { 21 22 /** 23 * 读取单词 24 * @author zx 25 * 26 */ 27 public static class Map extends Mapper<Object,Text,Text,IntWritable>{ 28 29 IntWritable count = new IntWritable(1); 30 31 @Override 32 protected void map(Object key, Text value, Context context) 33 throws IOException, InterruptedException { 34 StringTokenizer st = new StringTokenizer(value.toString()); 35 while(st.hasMoreTokens()){ 36 String word = st.nextToken().replaceAll("\"", "").replace("'", "").replace(".", ""); 37 context.write(new Text(word), count); 38 } 39 } 40 41 } 42 43 /** 44 * 统计词频 45 * @author zx 46 * 47 */ 48 public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{ 49 50 @SuppressWarnings("unused") 51 @Override 52 protected void reduce(Text key, Iterable<IntWritable> values,Context context) 53 throws IOException, InterruptedException { 54 int count = 0; 55 for (IntWritable intWritable : values) { 56 count ++; 57 } 58 context.write(key,new IntWritable(count)); 59 } 60 61 } 62 63 @SuppressWarnings("deprecation") 64 public static boolean run(String in,String out) throws IOException, ClassNotFoundException, InterruptedException{ 65 66 Configuration conf = new Configuration(); 67 68 Job job = new Job(conf,"WordCount"); 69 job.setJarByClass(WordCount.class); 70 job.setMapperClass(Map.class); 71 job.setReducerClass(Reduce.class); 72 73 // 设置Map输出类型 74 job.setMapOutputKeyClass(Text.class); 75 job.setMapOutputValueClass(IntWritable.class); 76 77 // 设置Reduce输出类型 78 job.setOutputKeyClass(Text.class); 79 job.setOutputValueClass(IntWritable.class); 80 81 // 设置输入和输出目录 82 FileInputFormat.addInputPath(job, new Path(in)); 83 FileOutputFormat.setOutputPath(job, new Path(out)); 84 85 return job.waitForCompletion(true); 86 } 87 88 }
二,排序 并求出频率最高的前K个词
1 package TopK; 2 3 import java.io.IOException; 4 import java.util.Comparator; 5 import java.util.Map.Entry; 6 import java.util.Set; 7 import java.util.StringTokenizer; 8 import java.util.TreeMap; 9 import java.util.regex.Pattern; 10 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.IntWritable; 14 import org.apache.hadoop.io.Text; 15 import org.apache.hadoop.mapreduce.Job; 16 import org.apache.hadoop.mapreduce.Mapper; 17 import org.apache.hadoop.mapreduce.Reducer; 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 20 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 21 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 22 23 /** 24 * 以单词出现的频率排序 25 * 26 * @author zx 27 * zhangxian1991@qq.com 28 */ 29 public class Sort { 30 31 /** 32 * 读取单词(词频 word) 33 * 34 * @author zx 35 * 36 */ 37 public static class Map extends Mapper<Object, Text, IntWritable, Text> { 38 39 // 输出key 词频 40 IntWritable outKey = new IntWritable(); 41 Text outValue = new Text(); 42 43 @Override 44 protected void map(Object key, Text value, Context context) 45 throws IOException, InterruptedException { 46 47 StringTokenizer st = new StringTokenizer(value.toString()); 48 while (st.hasMoreTokens()) { 49 String element = st.nextToken(); 50 if (Pattern.matches("\\d+", element)) { 51 outKey.set(Integer.parseInt(element)); 52 } else { 53 outValue.set(element); 54 } 55 } 56 57 context.write(outKey, outValue); 58 } 59 60 } 61 62 /** 63 * 根据词频排序 64 * 65 * @author zx 66 * 67 */ 68 public static class Reduce extends 69 Reducer<IntWritable, Text, Text, IntWritable> { 70 71 private static MultipleOutputs<Text, IntWritable> mos = null; 72 73 //要获得前K个频率最高的词 74 private static final int k = 10; 75 76 //用TreeMap存储可以利用它的排序功能 77 //这里用 MyInt 因为TreeMap是对key排序,且不能唯一,而词频可能相同,要以词频为Key就必需对它封装 78 private static TreeMap<MyInt, String> tm = new TreeMap<MyInt, String>(new Comparator<MyInt>(){ 79 /** 80 * 默认是从小到大的顺序排的,现在修改为从大到小 81 * @param o1 82 * @param o2 83 * @return 84 */ 85 @Override 86 public int compare(MyInt o1, MyInt o2) { 87 return o2.compareTo(o1); 88 } 89 90 }) ; 91 92 /* 93 * 以词频为Key是要用到reduce的排序功能 94 */ 95 @Override 96 protected void reduce(IntWritable key, Iterable<Text> values, 97 Context context) throws IOException, InterruptedException { 98 for (Text text : values) { 99 context.write(text, key); 100 tm.put(new MyInt(key.get()),text.toString()); 101 102 //TreeMap以对内部数据进行了排序,最后一个必定是最小的 103 if(tm.size() > k){ 104 tm.remove(tm.lastKey()); 105 } 106 107 } 108 } 109 110 @Override 111 protected void cleanup(Context context) 112 throws IOException, InterruptedException { 113 String path = context.getConfiguration().get("topKout"); 114 mos = new MultipleOutputs<Text, IntWritable>(context); 115 Set<Entry<MyInt, String>> set = tm.entrySet(); 116 for (Entry<MyInt, String> entry : set) { 117 mos.write("topKMOS", new Text(entry.getValue()), new IntWritable(entry.getKey().getValue()), path); 118 } 119 mos.close(); 120 } 121 122 123 124 } 125 126 @SuppressWarnings("deprecation") 127 public static void run(String in, String out,String topKout) throws IOException, 128 ClassNotFoundException, InterruptedException { 129 130 Path outPath = new Path(out); 131 132 Configuration conf = new Configuration(); 133 134 //前K个词要输出到哪个目录 135 conf.set("topKout",topKout); 136 137 Job job = new Job(conf, "Sort"); 138 job.setJarByClass(Sort.class); 139 job.setMapperClass(Map.class); 140 job.setReducerClass(Reduce.class); 141 142 // 设置Map输出类型 143 job.setMapOutputKeyClass(IntWritable.class); 144 job.setMapOutputValueClass(Text.class); 145 146 // 设置Reduce输出类型 147 job.setOutputKeyClass(Text.class); 148 job.setOutputValueClass(IntWritable.class); 149 150 //设置MultipleOutputs的输出格式 151 //这里利用MultipleOutputs进行对文件输出 152 MultipleOutputs.addNamedOutput(job,"topKMOS",TextOutputFormat.class,Text.class,Text.class); 153 154 // 设置输入和输出目录 155 FileInputFormat.addInputPath(job, new Path(in)); 156 FileOutputFormat.setOutputPath(job, outPath); 157 job.waitForCompletion(true); 158 159 } 160 161 }
自己封装的Int
1 package TopK; 2 3 public class MyInt implements Comparable<MyInt>{ 4 private Integer value; 5 6 public MyInt(Integer value){ 7 this.value = value; 8 } 9 10 public int getValue() { 11 return value; 12 } 13 14 public void setValue(int value) { 15 this.value = value; 16 } 17 18 @Override 19 public int compareTo(MyInt o) { 20 return value.compareTo(o.getValue()); 21 } 22 23 24 }
运行入口
1 package TopK; 2 3 import java.io.IOException; 4 5 /** 6 * 7 * @author zx 8 *zhangxian1991@qq.com 9 */ 10 public class TopK { 11 public static void main(String args[]) throws ClassNotFoundException, IOException, InterruptedException{ 12 13 //要统计字数,排序的文字 14 String in = "hdfs://localhost:9000/input/MaDing.text"; 15 16 //统计字数后的结果 17 String wordCout = "hdfs://localhost:9000/out/wordCout"; 18 19 //对统计完后的结果再排序后的内容 20 String sort = "hdfs://localhost:9000/out/sort"; 21 22 //前K条 23 String topK = "hdfs://localhost:9000/out/topK"; 24 25 //如果统计字数的job完成后就开始排序 26 if(WordCount.run(in, wordCout)){ 27 Sort.run(wordCout, sort,topK); 28 } 29 30 } 31 }