MapReduce实现矩阵乘法

说明

为实现matrix1 * matrix2矩阵相乘(matrix1每一个行的列元素分别与matrix2每一列的行元素进行相乘)。实现思路是将matrix2矩阵进行转置,以实现matrix1与转置后的matrix2的每一行元素对应相乘。

Step1:实现矩阵matrix2.txt转置

在hdfs根目录mkdir一个matrix/step1_input文件夹,上传matrix2.txt文件,文件内容如下:

matrix2.txt

1   1_0,2_3,3_-1,4_2,5_-3
2   1_1,2_3,3_5,4_-2,5_-1
3   1_0,2_1,3_4,4_-1,5_2
4   1_-2,2_2,3_-1,4_1,5_2

创建step1包,在此包内,首先创建Mapper1类
Mapper1.java

package step1;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class Mapper1 extends Mapper<LongWritable, Text, Text, Text>{
    private Text outKey = new Text();
    private Text outValue =new Text();
    
    /**
     * key:1
     * value:1  1_0,2_3,3_-1,4_2,5_-3
     */
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String[] rowAndLine = value.toString().split("\t");
        
        //matrix row number
        String row = rowAndLine[0];
        String[] lines = rowAndLine[1].split(",");
        
        //[1_0,2_3,3_-1,4_2,5_-3]
        for(int i=0;i<lines.length;i++) {
            String column = lines[i].split("_")[0];
            String valueStr = lines[i].split("_")[1];
            //key:column value:rownumber_value
            outKey.set(column);
            outValue.set(row+"_"+valueStr);
            context.write(outKey, outValue);
        }
    }

}

Reducer1.java

package step1;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Reducer1 extends Reducer<Text, Text, Text, Text>{
    private Text outKey = new Text();
    private Text outValue = new Text();
    
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for(Text text:values) {
            sb.append(text+",");
        }
        String line = null;
        if(sb.toString().endsWith(",")) {
            line = sb.substring(0,sb.length()-1);
        }
        
        outKey.set(key);
        outValue.set(line);
        
        context.write(outKey, outValue);
    }
    
    
}

MR1.java

package step1;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MR1 {
    private static String inPath = "/matrix/step1_input/matrix2.txt";
    
    private static String outPath = "/matrix/step1_output";
    
    private static String hdfs ="hdfs://ha-namenode-b:9000";
    
    public int run() {
        try {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", hdfs);
            Job job = Job.getInstance(conf,"step1");
            
            job.setJarByClass(MR1.class);
            job.setMapperClass(Mapper1.class);
            job.setReducerClass(Reducer1.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileSystem fs = FileSystem.get(conf);
            Path inputPath = new Path(inPath);
            if(fs.exists(inputPath)) {
                FileInputFormat.addInputPath(job, inputPath);
            }
            
            Path outputPath = new Path(outPath);
            fs.delete(outputPath,true);
            
            FileOutputFormat.setOutputPath(job, outputPath);
            
            return job.waitForCompletion(true)?1:-1;
        
        }catch(IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return -1;
    }
    public static void main(String[] args) {
        int result = -1;
        result = new MR1().run();
        if(result==1) {
            System.out.println("step1 success...");
        }else if(result==-1) {
            System.out.println("step1 failed...");
        }
    }
}

step2

hadoop fs -mkdir /matrix/step2_input
hadoop fs -put ~/demo/matrix1.txt /matrix/step2_input
1   1_1,2_2,3_-2,4_0
2   1_3,2_3,3_4,4_-3
3   1_-2,2_0,3_2,4_3
4   1_5,2_3,3_-1,4_2
5   1_-4,2_2,3_0,4_2

Mapper2.java

package step2;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Mapper2 extends Mapper<LongWritable, Text, Text, Text>{
    private Text outKey = new Text();
    private Text outValue =new Text();
    
    private List<String> cacheList = new ArrayList<String>();
    
    
    
    protected void setup(Context context)
            throws IOException, InterruptedException {
        super.setup(context);
        FileReader fr = new FileReader("matrix2");
        BufferedReader br = new BufferedReader(fr);
        String line = null;
        while((line=br.readLine())!=null) {
            cacheList.add(line);
        }
        fr.close();
        br.close();
    }



    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String row_matrix1 = value.toString().split("\t")[0];
        String[] column_value_array_matrix1 = value.toString().split("\t")[1].split(",");
        for(String line:cacheList) {
            String row_matrix2 = line.toString().split("\t")[0];
            String[] column_value_array_matrix2 = line.toString().split("\t")[1].split(",");
            
            int result = 0;
            for(String column_value_matrix1:column_value_array_matrix1) {
                String column_matrix1 = column_value_matrix1.split("_")[0];
                String value_matrix1 = column_value_matrix1.split("_")[1];
                
                for(String column_value_matrix2:column_value_array_matrix2) {
                    if(column_value_matrix2.startsWith(column_matrix1 + "_")) {
                        String value_matrix2 = column_value_matrix2.split("_")[1];
                        result += Integer.valueOf(value_matrix1) *Integer.valueOf(value_matrix2); 
                    }
                }
            }
            outKey.set(row_matrix1);
            outValue.set(row_matrix2+"_"+result);
            context.write(outKey, outValue);
        }
    }
}

Reducer2.java

package step2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Reducer2 extends Reducer<Text, Text, Text, Text> {
    private Text outKey = new Text();
    private Text outValue = new Text();
    
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for(Text value:values) {
            sb.append(value+",");
        }
        String result = null;
        if(sb.toString().endsWith(",")) {
            result = sb.substring(0,sb.length()-1);
        }
        
        outKey.set(key);
        outValue.set(result);
        context.write(outKey, outValue);
    }
}

MR2.java

package step2;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;




public class MR2 {
    private static String inPath = "/matrix/step2_input/matrix1.txt";
    
    private static String outPath = "/matrix/output";
    
    private static String cache = "/matrix/step1_output";
    
    private static String hdfs ="hdfs://localhost:9000";
    
        
    public int run() throws URISyntaxException {
        try {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", hdfs);
            Job job = Job.getInstance(conf,"step2");
            
            job.addCacheArchive(new URI(cache+"#matrix2"));
            
            job.setJarByClass(MR2.class);
            job.setMapperClass(Mapper2.class);
            job.setReducerClass(Reducer2.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileSystem fs = FileSystem.get(conf);
            Path inputPath = new Path(inPath);
            if(fs.exists(inputPath)) {
                FileInputFormat.addInputPath(job, inputPath);
            }
            
            Path outputPath = new Path(outPath);
            fs.delete(outputPath,true);
            
            FileOutputFormat.setOutputPath(job, outputPath);
            System.out.println("111111...");
            return job.waitForCompletion(true)?1:-1;
        
        } catch(IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch(URISyntaxException e) {
            e.printStackTrace();
        }
        return -1;                                                      
    }
    public static void main(String[] args) {
        try {
            int result=-1;
            result = new MR2().run();
        
            if(result == 1) {
                System.out.println("step2 success...");
            }
            else if(result == -1){
                System.out.println("step2 failed...");
            }
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }
}
    原文作者:火锅侠
    原文地址: https://www.jianshu.com/p/b5d2d53957de
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞