1. 实例描述
单表关联这个实例要求从给出的数据中寻找出所关心的数据,它是对原始数据所包含信息的挖掘。
实例中给出child-parent 表, 求出grandchild-grandparent表。
输入数据 file01:
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Marry
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
希望输出为:
grandchild grandparent
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
Tom Marry
Tom Ben
Jone Marry
Jone Ben
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
2. 设计思路
1. 在map阶段,将原数据进行分割,将parent作为map输出的key值,child作为map输出的value值,这样形成左表。
2. 同时在map阶段过程中,将child作为map输出的key值,parent作为map输出的value值,这样形成右表。
3. 连接左表的paren列和右表的child列。
3. 具体实现
package tablerelation; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * @author Amei 单表链接,求grandchild grandparent表 */ public class SingleTableRelation { public static int time = 0; /** * * @author Amei 左表的paren 和 右表的 child 做链接 */ public static class Map extends Mapper<LongWritable, Text, Text, Text> { protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
// 左右表的标识 int relation; StringTokenizer tokenizer = new StringTokenizer(value.toString()); String child = tokenizer.nextToken(); String parent = tokenizer.nextToken(); if (child.compareTo("child") != 0) { // 左表 relation = 1; context.write(new Text(parent), new Text(relation + "+" + child)); // 右表 relation = 2; context.write(new Text(child), new Text(relation + "+" + parent)); } }; } public static class Reduce extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context output) throws java.io.IOException, InterruptedException { int grandchildnum = 0; int grandparentnum = 0; List<String> grandchilds = new ArrayList<>(); List<String> grandparents = new ArrayList<>(); /** 输出表头 */ if (time == 0) { output.write(new Text("grandchild"), new Text("grandparent")); time++; } for (Text val : values) { String record = val.toString(); char relation = record.charAt(0); // 取出此时key所对应的child if (relation == '1') { String child = record.substring(2); grandchilds.add(child); grandchildnum++; } // 取出此时key所对应的parent else { String parent = record.substring(2); grandparents.add(parent); grandparentnum++; } } if (grandchildnum != 0 && grandparentnum != 0) { for (int i = 0; i < grandchildnum; i++) for (int j = 0; j < grandparentnum; j++) output.write(new Text(grandchilds.get(i)), new Text( grandparents.get(j))); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf,"single tale relation"); job.setJarByClass(SingleTableRelation.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("/user/hadoop_admin/singletalein")); FileOutputFormat.setOutputPath(job, new Path("/user/hadoop_admin/singletableout")); System.exit((job.waitForCompletion(true) ? 0 : 1)); } }