说明
为实现matrix1 * matrix2矩阵相乘(matrix1每一个行的列元素分别与matrix2每一列的行元素进行相乘)。实现思路是将matrix2矩阵进行转置,以实现matrix1与转置后的matrix2的每一行元素对应相乘。
Step1:实现矩阵matrix2.txt转置
在hdfs根目录mkdir一个matrix/step1_input文件夹,上传matrix2.txt文件,文件内容如下:
matrix2.txt
1 1_0,2_3,3_-1,4_2,5_-3
2 1_1,2_3,3_5,4_-2,5_-1
3 1_0,2_1,3_4,4_-1,5_2
4 1_-2,2_2,3_-1,4_1,5_2
创建step1包,在此包内,首先创建Mapper1类
Mapper1.java
package step1;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Mapper1 extends Mapper<LongWritable, Text, Text, Text>{
private Text outKey = new Text();
private Text outValue =new Text();
/**
* key:1
* value:1 1_0,2_3,3_-1,4_2,5_-3
*/
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] rowAndLine = value.toString().split("\t");
//matrix row number
String row = rowAndLine[0];
String[] lines = rowAndLine[1].split(",");
//[1_0,2_3,3_-1,4_2,5_-3]
for(int i=0;i<lines.length;i++) {
String column = lines[i].split("_")[0];
String valueStr = lines[i].split("_")[1];
//key:column value:rownumber_value
outKey.set(column);
outValue.set(row+"_"+valueStr);
context.write(outKey, outValue);
}
}
}
Reducer1.java
package step1;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Reducer1 extends Reducer<Text, Text, Text, Text>{
private Text outKey = new Text();
private Text outValue = new Text();
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for(Text text:values) {
sb.append(text+",");
}
String line = null;
if(sb.toString().endsWith(",")) {
line = sb.substring(0,sb.length()-1);
}
outKey.set(key);
outValue.set(line);
context.write(outKey, outValue);
}
}
MR1.java
package step1;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MR1 {
private static String inPath = "/matrix/step1_input/matrix2.txt";
private static String outPath = "/matrix/step1_output";
private static String hdfs ="hdfs://ha-namenode-b:9000";
public int run() {
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(MR1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inputPath = new Path(inPath);
if(fs.exists(inputPath)) {
FileInputFormat.addInputPath(job, inputPath);
}
Path outputPath = new Path(outPath);
fs.delete(outputPath,true);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true)?1:-1;
}catch(IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return -1;
}
public static void main(String[] args) {
int result = -1;
result = new MR1().run();
if(result==1) {
System.out.println("step1 success...");
}else if(result==-1) {
System.out.println("step1 failed...");
}
}
}
step2
hadoop fs -mkdir /matrix/step2_input
hadoop fs -put ~/demo/matrix1.txt /matrix/step2_input
1 1_1,2_2,3_-2,4_0
2 1_3,2_3,3_4,4_-3
3 1_-2,2_0,3_2,4_3
4 1_5,2_3,3_-1,4_2
5 1_-4,2_2,3_0,4_2
Mapper2.java
package step2;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Mapper2 extends Mapper<LongWritable, Text, Text, Text>{
private Text outKey = new Text();
private Text outValue =new Text();
private List<String> cacheList = new ArrayList<String>();
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
FileReader fr = new FileReader("matrix2");
BufferedReader br = new BufferedReader(fr);
String line = null;
while((line=br.readLine())!=null) {
cacheList.add(line);
}
fr.close();
br.close();
}
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String row_matrix1 = value.toString().split("\t")[0];
String[] column_value_array_matrix1 = value.toString().split("\t")[1].split(",");
for(String line:cacheList) {
String row_matrix2 = line.toString().split("\t")[0];
String[] column_value_array_matrix2 = line.toString().split("\t")[1].split(",");
int result = 0;
for(String column_value_matrix1:column_value_array_matrix1) {
String column_matrix1 = column_value_matrix1.split("_")[0];
String value_matrix1 = column_value_matrix1.split("_")[1];
for(String column_value_matrix2:column_value_array_matrix2) {
if(column_value_matrix2.startsWith(column_matrix1 + "_")) {
String value_matrix2 = column_value_matrix2.split("_")[1];
result += Integer.valueOf(value_matrix1) *Integer.valueOf(value_matrix2);
}
}
}
outKey.set(row_matrix1);
outValue.set(row_matrix2+"_"+result);
context.write(outKey, outValue);
}
}
}
Reducer2.java
package step2;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Reducer2 extends Reducer<Text, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for(Text value:values) {
sb.append(value+",");
}
String result = null;
if(sb.toString().endsWith(",")) {
result = sb.substring(0,sb.length()-1);
}
outKey.set(key);
outValue.set(result);
context.write(outKey, outValue);
}
}
MR2.java
package step2;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MR2 {
private static String inPath = "/matrix/step2_input/matrix1.txt";
private static String outPath = "/matrix/output";
private static String cache = "/matrix/step1_output";
private static String hdfs ="hdfs://localhost:9000";
public int run() throws URISyntaxException {
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
Job job = Job.getInstance(conf,"step2");
job.addCacheArchive(new URI(cache+"#matrix2"));
job.setJarByClass(MR2.class);
job.setMapperClass(Mapper2.class);
job.setReducerClass(Reducer2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inputPath = new Path(inPath);
if(fs.exists(inputPath)) {
FileInputFormat.addInputPath(job, inputPath);
}
Path outputPath = new Path(outPath);
fs.delete(outputPath,true);
FileOutputFormat.setOutputPath(job, outputPath);
System.out.println("111111...");
return job.waitForCompletion(true)?1:-1;
} catch(IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch(URISyntaxException e) {
e.printStackTrace();
}
return -1;
}
public static void main(String[] args) {
try {
int result=-1;
result = new MR2().run();
if(result == 1) {
System.out.println("step2 success...");
}
else if(result == -1){
System.out.println("step2 failed...");
}
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
}