MapReduce Join

Map Side Join

package MapJoin;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/*
 * 
 Table1
 011990-99999    SIHCCAJAVRI
 012650-99999    TYNSET-HANSMOEN


 Table2
 012650-99999    194903241200    111
 012650-99999    194903241800    78
 011990-99999    195005150700    0
 011990-99999    195005151200    22
 011990-99999    195005151800    -11
 * */

public class MapJoin {
    static class mapper extends Mapper<LongWritable, Text, Text, Text> {
        private Map<String, String> Table1Map = new HashMap<String, String>();

        // 将小表读到内存HashMap中
        protected void setup(Context context) throws IOException {
            URI[] paths = context.getCacheFiles();

            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            FSDataInputStream fsr = fs.open(new Path(paths[0].toString()));
            // BufferedReader br = new BufferedReader(new FileReader(
            // paths[0].toString()));
            String line = null;
            try {
                while ((line = fsr.readLine().toString()) != null) {
                    String[] vals = line.split("\\t");
                    if (vals.length == 2) {
                        Table1Map.put(vals[0], vals[1]);
                    }
                }
            } catch (Exception e) {
                // TODO: handle exception
                e.printStackTrace();
            } finally {
                fsr.close();
            }
        }

        // 对大表进行Map扫描
        protected void map(LongWritable key, Text val, Context context)
                throws IOException, InterruptedException {
            String[] vals = val.toString().split("\\t");
            if (vals.length == 3) {
                // 每条记录都用外键对HashMap get
                String Table1Vals = Table1Map.get(vals[0]);
                Table1Vals = (Table1Vals == null) ? "" : Table1Vals;
                context.write(new Text(vals[0]), new Text(Table1Vals + "\t"
                        + vals[1] + "\t" + vals[2]));
            }
        }
    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err
                    .println("Parameter number is wrong, please enter three parameters:<big table hdfs input> <small table local input> <hdfs output>");
            System.exit(-1);
        }

        Job job = new Job(conf, "MapJoin");

        job.setJarByClass(MapJoin.class);
        job.setMapperClass(mapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.addCacheFile((new Path(args[1]).toUri()));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Reduce Side Join

package ReduceJoin;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*user.csv文件:

"ID","NAME","SEX"
"1","user1","0"
"2","user2","0"
"3","user3","0"
"4","user4","1"
"5","user5","0"
"6","user6","0"
"7","user7","1"
"8","user8","0"
"9","user9","0"

order.csv文件:

"USER_ID","NAME"
"1","order1"
"2","order2"
"3","order3"
"4","order4"
"7","order7"
"8","order8"
"9","order9"
*/

public class ReduceJoin {

    public static class MapClass extends   
    Mapper<LongWritable, Text, Text, Text>  
{  

    //最好在map方法外定义变量,以减少map计算时创建对象的个数  
    private Text key = new Text();  
    private Text value = new Text();  
    private String[] keyValue = null;  
      
    @Override  
    protected void map(LongWritable key, Text value, Context context)  
        throws IOException, InterruptedException  
    {  
        //value是每一行的内容,Text类型,所有我们要把key从value中解析出来  
        keyValue = value.toString().split(",", 2);  
        this.key.set(keyValue[0]);  //把外键设为MapReduce key
        this.value.set(keyValue[1]);  
        context.write(this.key, this.value);  
    }  
      
}  
  
public static class Reduce extends Reducer<Text, Text, Text, Text>  
{  

    private Text value = new Text();  
      
    @Override  
    protected void reduce(Text key, Iterable<Text> values, Context context)  
            throws IOException, InterruptedException  
    {  
        StringBuilder valueStr = new StringBuilder();  
          
        //values中的每一个值是不同数据文件中的具有相同key的值  
        //即是map中输出的多个文件相同key的value值集合  
        for(Text val : values)  
        {  
            valueStr.append(val);  
            valueStr.append(",");  
        }  
          
        this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());  
        context.write(key, this.value);  
    }  
      
}  
    
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();  
        Job job = new Job(conf, "MyJoin");  
          
        job.setJarByClass(ReduceJoin.class);  
        job.setMapperClass(MapClass.class);  
        job.setReducerClass(Reduce.class);  
        //job.setCombinerClass(Reduce.class);  
          
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
          
        job.setInputFormatClass(TextInputFormat.class);  
        job.setOutputFormatClass(TextOutputFormat.class);  
          
        FileInputFormat.addInputPath(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
          
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }

}

    原文作者:小爷Souljoy
    原文地址: https://www.jianshu.com/p/b25b1158bb90
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞