usr:friend,friend,friend…
—————
仅代表个人意见,希望对你帮助
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
最终结果:
A,B C,E
A,C D,F
A,D F,E
A,F B,C,D,E,O
B,E C
C,F A,D
D,E L
D,F A,E
D,L E,F
E,L D
F,M E
H,O A
I,O A
================================================
提示:
1.对于这样一个usr(A), 他的好友数list为(A:B,C,D,F,E,O) 分组如下:
(A B)->(A B C D F E O)
(A C)->(A B C D F E O)
(A D)->(A B C D F E O)
(A F)->(A B C D F E O)
(A E)->(A B C D F E O)
(A O->(A B C D F E O)
2.假设有另一个usr(C), 她的好友数list为(C:A,B,E), 将其按照上面规则分组(注意需要compare每一个键和usr,小地放前边):
(A C)->(A B E)
(B C)->(A B E)
(E C)->(A B E)
3.倒排索引后,shuffle后的结果:
(A C)->(A B C D F E O)(A B E)
4.因为(A C)的values组的个数>1 ,则我们可以很确定,一定有一组的user是A,另一组的user是C
接下来找出(A B C D F E O)(A B E)的交集(A B E)。
排除掉两个user(A C)后集合的个数>=1,因此我们可以确定(A C)有共同好友E。
按照此逻辑,请写出mapreduce代码:
package com.bw.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class GX1 { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration config = new Configuration(); config.set("fs.defaultFS", "hdfs://192.168.0.100:9000"); config.set("yarn.resourcemanager.hostname", "192.168.0.100"); FileSystem fs = FileSystem.get(config); Job job = Job.getInstance(config); job.setJarByClass(GX1.class); //设置所用到的map类 job.setMapperClass(myMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置所用到的reduce类 job.setReducerClass(myRedu.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入输出地址 FileInputFormat.addInputPath(job, new Path("/input/10.txt")); Path path = new Path("/out14"); //job.setPartitionerClass(myPartitioner.class); //job.setNumReduceTasks(2); //判断目录文件是否存在,存在的话,删除 if(fs.exists(path)){ fs.delete(path, true); } //指定结果文件的输出地址 FileOutputFormat.setOutputPath(job,path); //启动处理任务job boolean completion = job.waitForCompletion(true); if(completion){ System.out.println("Job SUCCESS!"); GX2.main(args); } } public static class myMapper extends Mapper<LongWritable, Text, Text, Text>{ Text k=new Text(); Text v=new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(":"); String person = split[0]; String[] splits = split[1].split(","); for(String spl:splits){ k.set(spl); v.set(person); context.write(k, v); } } } public static class myRedu extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer stb = new StringBuffer(); for(Text value:values){ if(stb.length()!=0){ stb.append(","); } stb.append(value); } context.write(key, new Text(stb.toString())); } } }
package com.bw.hadoop; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class GX2 { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration config = new Configuration(); config.set("fs.defaultFS", "hdfs://192.168.0.100:9000"); config.set("yarn.resourcemanager.hostname", "192.168.0.100"); FileSystem fs = FileSystem.get(config); Job job = Job.getInstance(config); job.setJarByClass(GX2.class); //设置所用到的map类 job.setMapperClass(myMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置所用到的reduce类 job.setReducerClass(myRedu.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入输出地址 FileInputFormat.addInputPath(job, new Path("/out14/part-r-00000")); Path path = new Path("/out15"); //job.setPartitionerClass(myPartitioner.class); //job.setNumReduceTasks(2); //判断目录文件是否存在,存在的话,删除 if(fs.exists(path)){ fs.delete(path, true); } //指定结果文件的输出地址 FileOutputFormat.setOutputPath(job,path); //启动处理任务job boolean completion = job.waitForCompletion(true); if(completion){ System.out.println("Job SUCCESS!"); } } public static class myMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t"); String friend = split[0]; String[] persons = split[1].split(","); Arrays.sort(persons); for(int i=0; i<persons.length-2; i++){ for (int j=i+1; j<persons.length-1; j++){ context.write(new Text(persons[i]+"-"+persons[j]), new Text(friend)); } } } } public static class myRedu extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer stb = new StringBuffer(); for(Text value: values){ stb.append(value).append(" "); } context.write(key, new Text(stb.toString())); } } }