本文共 5463 字,大约阅读时间需要 18 分钟。
之前写了一篇分析MapReduce实现矩阵乘法算法的文章:
为了让大家更直观的了解程序执行,今天编写了实现代码供大家参考。
编程环境:
java version "1.7.0_40"
Eclipse Kepler
Windows7 x64
Ubuntu 12.04 LTS
Hadoop2.2.0
Vmware 9.0.0 build-812388
输入数据:
A矩阵存放地址:hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa
A矩阵内容:
3 4 6 4 0 8matrixa文件已处理为(x,y,value)格式:
0 0 3
0 1 4
0 2 6
1 0 4
1 1 0
1 2 8
B矩阵存放地址:hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixbB矩阵内容:
2 3 3 0 4 1matrixb文件已处理为(x,y,value)格式:
0 0 2
0 1 3
1 0 3
1 1 0
2 0 4
2 1 1
实现代码:
一共三个类:
大家可根据个人习惯合并成一个类使用。
package dataguru.matrixmultiply;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MMDriver { public static void main(String[] args) throws Exception { // set configuration Configuration conf = new Configuration(); // create job Job job = new Job(conf,"MatrixMultiply"); job.setJarByClass(dataguru.matrixmultiply.MMDriver.class); // specify Mapper & Reducer job.setMapperClass(dataguru.matrixmultiply.MMMapper.class); job.setReducerClass(dataguru.matrixmultiply.MMReducer.class); // specify output types of mapper and reducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // specify input and output DIRECTORIES Path inPathA = new Path("hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA"); Path inPathB = new Path("hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB"); Path outPath = new Path("hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC"); FileInputFormat.addInputPath(job, inPathA); FileInputFormat.addInputPath(job, inPathB); FileOutputFormat.setOutputPath(job,outPath); // delete output directory try{ FileSystem hdfs = outPath.getFileSystem(conf); if(hdfs.exists(outPath)) hdfs.delete(outPath); hdfs.close(); } catch (Exception e){ e.printStackTrace(); return ; } // run the job System.exit(job.waitForCompletion(true) ? 0 : 1); }}
package dataguru.matrixmultiply;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class MMMapper extends Mapper { private String tag; //current matrix private int crow = 2;// A矩阵的行数 private int ccol = 2;// B矩阵的列数 @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO get inputpath of input data, set to tag FileSplit fs = (FileSplit)context.getInputSplit(); tag = fs.getPath().getParent().getName(); } /** * input data include two matrix files */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer str = new StringTokenizer(value.toString()); if ("matrixA".equals(tag)) { //left matrix,output key:x,y while (str.hasMoreTokens()) { String currentx = str.nextToken(); //x,y,value of current item String currenty = str.nextToken(); String currentValue = str.nextToken(); for (int i = 0; i < ccol; i++) { Text outkey = new Text(currentx+","+i); Text outvalue = new Text("a,"+currenty+","+currentValue); context.write(outkey, outvalue); System.out.println(outkey+" | "+outvalue); } } }else if ("matrixB".equals(tag)) { while (str.hasMoreTokens()) { String currentx = str.nextToken(); //x,y,value of current item String currenty = str.nextToken(); String currentValue = str.nextToken(); for (int i = 0; i < crow; i++) { Text outkey = new Text(i+","+currenty); Text outvalue = new Text("b,"+currentx+","+currentValue); context.write(outkey, outvalue); System.out.println(outkey+" | "+outvalue); } } } }}
package dataguru.matrixmultiply;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.Reducer.Context;public class MMReducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Map matrixa = new HashMap (); Map matrixb = new HashMap (); for (Text val : values) { //values example : b,0,2 or a,0,4 StringTokenizer str = new StringTokenizer(val.toString(),","); String sourceMatrix = str.nextToken(); if ("a".equals(sourceMatrix)) { matrixa.put(str.nextToken(), str.nextToken()); //(0,4) } if ("b".equals(sourceMatrix)) { matrixb.put(str.nextToken(), str.nextToken()); //(0,2) } } int result = 0; Iterator iter = matrixa.keySet().iterator(); while (iter.hasNext()) { String mapkey = iter.next(); result += Integer.parseInt(matrixa.get(mapkey)) * Integer.parseInt(matrixb.get(mapkey)); } context.write(key, new Text(String.valueOf(result))); } }
最终输出结果:
0,0 42
0,1 15 1,0 40 1,1 20转载地址:http://vbcgi.baihongyu.com/