对于任意矩阵M和N,若矩阵M的列数等于矩阵N的行数,则记M和N的乘积为P=M*N,其中mik 记做矩阵M的第i行和第k列,nkj记做矩阵N的第k行和第j列,则矩阵P中,第i行第j列的元素可表示为公式(1-1):
pij=(M*N)ij=∑miknkj=mi1*n1j+mi2*n2j+……+mik*nkj (公式1-1)
由公式(1-1)可以看出,最后决定pij是(i,j),所以可以将其作为Reducer的输入key值。为了求出pij分别需要知道mik和nkj,对于mik,其所需要的属性有矩阵M,所在行数i、所在列数k,和其本身的数值大小mik;同样对于nkj,其所需要的属性有矩阵N,所在行数k、所在列数j,和其本身数值大小nkj,这些属性值可由Mapper处理得到
Map函数:对于矩阵M中的每个元素mik ,产生一系列的key-value对<(i,j),(M,k,mik)>,其中,k=1,2……直到矩阵N的总列数,对于矩阵N的每个元素nkj,产生一系列的key-value对,<(i,j),(N,k,nkj)>,其中i=1,2……直到矩阵M的总行数
Reduce函数:对于每个键(i,j)相关联的值(M,k,mik)及(N,k,nkj),根据相同的k值将mik和nkj分别放入不同的数组中,然后将两者的第k个元素抽取出来分别相乘,再累加,即可得到pij的值
有M和N两个文件分别存放两个矩阵,文件内容的每一行的形式是“行号,列号\t元素值”,本例中,使用shell脚本生成数据
代码1-2
root@lejian:/data# cat matrix #!/bin/bash for i in `seq 1 $1` do for j in `seq 1 $2` do s=$(($RANDOM % 100)) echo -e "$i,$j\t$s" >> M_$1_$2 done done for i in `seq 1 $2` do for j in `seq 1 $3` do s=$(($RANDOM%100)) echo -e "$i,$j\t$s" >> N_$2_$3 done done
代码1-3,执行matrix脚本,生成一个2行3列和3行3列的矩阵,并在HDFS下新建一个data文件夹,将生成的两个矩阵放入data文件夹下
代码1-3
root@lejian:/data# ./matrix 2 3 3 root@lejian:/data# cat M_2_3 1,1 6 1,2 84 1,3 40 2,1 51 2,2 37 2,3 97 root@lejian:/data# cat N_3_3 1,1 97 1,2 34 1,3 95 2,1 93 2,2 10 2,3 70 3,1 71 3,2 24 3,3 47 root@lejian:/data# hadoop fs -mkdir /data root@lejian:/data# hadoop fs -put /data/M_2_3 /data/ root@lejian:/data# hadoop fs -put /data/N_3_3 /data/ root@lejian:/data# hadoop fs -ls -R /data -rw-r--r-- 1 root supergroup 41 2017-01-07 11:57 /data/M_2_3 -rw-r--r-- 1 root supergroup 63 2017-01-07 11:57 /data/N_3_3
矩阵乘法Mapper类程序如代码1-4
代码1-4
package com.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> { private int columnN = 0; private int rowM = 0; private Text mapKey = new Text(); private Text mapValue = new Text(); protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); columnN = Integer.parseInt(conf.get("columnN")); rowM = Integer.parseInt(conf.get("rowM")); }; protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit file = (FileSplit) context.getInputSplit(); String fileName = file.getPath().getName(); String line = value.toString(); String[] tuple = line.split(","); if (tuple.length != 2) { throw new RuntimeException("MatrixMapper tuple error"); } int row = Integer.parseInt(tuple[0]); String[] tuples = tuple[1].split("\t"); if (tuples.length != 2) { throw new RuntimeException("MatrixMapper tuples error"); } if (fileName.contains("M")) { matrixM(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context); } else { matrixN(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context); } }; private void matrixM(int row, int column, int value, Context context) throws IOException, InterruptedException { for (int i = 1; i < columnN + 1; i++) { mapKey.set(row + "," + i); mapValue.set("M," + column + "," + value); context.write(mapKey, mapValue); } } private void matrixN(int row, int column, int value, Context context) throws IOException, InterruptedException { for (int i = 1; i < rowM + 1; i++) { mapKey.set(i + "," + column); mapValue.set("N," + row + "," + value); context.write(mapKey, mapValue); } } }
矩阵乘法Reducer类程序如代码1-5
代码1-5
package com.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MatrixReducer extends Reducer<Text, Text, Text, Text> { private int columnM = 0; protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); columnM = Integer.parseInt(conf.get("columnM")); }; protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int sum = 0; int[] m = new int[columnM + 1]; int[] n = new int[columnM + 1]; for (Text val : values) { String[] tuple = val.toString().split(","); if (tuple.length != 3) { throw new RuntimeException("MatrixReducer tuple error"); } if ("M".equals(tuple[0])) { m[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]); } else { n[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]); } } for (int i = 1; i < columnM + 1; i++) { sum += m[i] * n[i]; } context.write(key, new Text(sum + "")); }; }
矩阵乘法主函数如代码1-5
package com.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Matrix { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args == null || args.length != 5) { throw new RuntimeException("请输入输入路径、输出路径、矩阵M的行数、矩阵M的列数、矩阵N的列数"); } Configuration conf = new Configuration(); conf.set("rowM", args[2]); conf.set("columnM", args[3]); conf.set("columnN", args[4]); Job job = Job.getInstance(conf); job.setJobName("Matrix"); job.setJarByClass(Matrix.class); job.setMapperClass(MatrixMapper.class); job.setReducerClass(MatrixReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
运行代码1-5,运行结果如代码1-6所示(注:代码1-6省略部分MapReduce执行内容)
代码1-6
root@lejian:/data# hadoop jar matrix.jar com.hadoop.mapreduce.Matrix /data/ /output/ 2 3 3 ………… root@lejian:/data# hadoop fs -ls -R /output -rw-r--r-- 1 root supergroup 0 2017-01-07 12:04 /output/_SUCCESS -rw-r--r-- 1 root supergroup 57 2017-01-07 12:04 /output/part-r-00000 root@lejian:/data# hadoop fs -cat /output/part-r-00000 1,1 11234 1,2 2004 1,3 8330 2,1 15275 2,2 4432 2,3 11994