报告题目:MapReduce实现大矩阵乘法1、题目要求题目介绍:请使用MapReduce编程模型实现矩阵相乘。要求如下:1.按照MapReduce框架,对该问题进行剖析,并给出解决思路,要求图文并茂。2.展示运行代码并要求注释清晰。3.展示算法运行性能(不需要打印最终的矩阵输出结果图),要求分别给出在10^3,10^4,10^5维度的矩阵乘法运行时间,mapworker和reduceworker的实时运行数量等。2、系统环境操作系统环境:LinuxCentos6.5Hadoop环境:hadoop2.6.5完全分布式系统(1个名称节点,1个数据节点)3、解决方案3.1算法分析假设有两个矩阵A和矩阵B,如下图所示:矩阵乘法的数学公式如下:下图是两个矩阵A和B:A,B在文件中的存储可以是每一行元素占文件的一行,如下图:也可以是列表的形式,如下图:考虑到大矩阵的稀疏且数据量大,HDFS中的文件大小有限制,无法保证一个矩阵只保存在一个文件中,因此采用列表的方式来存储矩阵,其格式为i,j,Aij。在Map阶段,Mapper按行读取矩阵A,B所在文件的数据,并将其转化为某种格式的键值对保存到上下文。在Shffle阶段,Hadoop自动将相同key的value合并成key,Iterable(values)的格式,再将该格式的数据传递给reduce。在Reduce阶段,Reducer接收Shuffle传递过来的key,Iterable(values),处理数据得到最终的结果,并将结果输出到目标矩阵文件中。这里的问题关键在于,如何设计Mapper—Shffle—Reducer之间传递的键值对数据。考虑从目标矩阵出发,将Reducer处理的键值对的key,Iterable(values)中的key设置为目标矩阵的行坐标和列坐标,即”i,j”;那么Iterable(values)迭代器中的数据就应该是得到Cij需要的数据,即得到这些数据还需要标注其来自那个矩阵,另外根据公式:相乘的数据需要对应,因此,还需要标记其位置,因此有如下设计:矩阵A:key,values={key:“目标矩阵的行序,目标矩阵的列序”,value:“源矩阵a,目标矩阵的行序,源矩阵a的值”}如:{key:”1,1”,value:”a,1,1”}矩阵B:key,values={key:“目标矩阵的行序,目标矩阵的列序”,value:“源矩阵b,标矩阵的列序,源矩阵b的值”}如:{key:”1,1”,value:”b,1,10”}。因此,该格式也是Mapper传递给Shuffle的键值对格式,Shuffle再将相同key的元素合并,得到计算Ci,j的所有数据及计算顺序,Mapper读取键值对,从迭代器中读取出每一项value,从而计算Ci,j。Mapper读矩阵所在的文件,得到的键值对格式为:矩阵A:key,value={key:“行号”,value:“i,j,Aij”}矩阵B:key,value={key:“行号”,value:“i,j,Bij”}如果要得到Mapper的输出格式:key,values={key:“目标矩阵的行序,目标矩阵的列序”,value:“源矩阵a,目标矩阵的行序,源矩阵a的值”}只需要得到目标矩阵的列序即可输出,而列序可以通过循环得到,因为该元素需要参与目标矩阵对应行的所有目标列的计算。3.2算法图示矩阵在文件中的存储格式如下:第一行为矩阵的形状在mapper之前,先判断两个矩阵是否能够进行乘法运算。3.3代码一、矩阵乘法代码如下:packagemain.matrix;importjava.io.BufferedReader;importjava.io.File;importjava.io.FileInputStream;importjava.io.FileNotFoundException;importjava.io.IOException;importjava.io.InputStreamReader;importjava.util.HashMap;importjava.util.Iterator;importjava.util.Map;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.FSDataInputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;publicclassMatrix_MultiextendsConfiguredimplementsTool{publicstaticclassMatrixMapperextendsMapperLongWritable,Text,Text,Text{privateStringflag=null;staticintm=0;//矩阵A的行数privateintn=0;//矩阵A的列数staticintp=0;//矩阵B的列数privateintq=0;//矩阵B的行数privateinta=0,b=0;//矩阵A,B文件的行数记录器//计数器enumCounter{LINESKIP,//出错的行}@Overrideprotectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{FileSplitsplit=(FileSplit)context.getInputSplit();flag=split.getPath().getName();}/***map阶段:读取矩阵a和b,将矩阵中的元素按指定格式存入上下文*得到的键值对格式为:key:value_1,value_2...value_n*/@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{try{String[]val=value.toString().split(,);System.out.println(map:\t+flag+\t+value.toString());if(test_ma.txt.equals(flag)){a++;if(a==1){//第一行为矩阵形状return;}//矩阵B的列数循环,表示当前元素要参与运算的次数以及运算的目标位置//写入上下文的键值对格式为{key:“目标矩阵的行序,目标矩阵的列序”,value:源矩阵a,目标矩阵的行序,源矩阵a的值}for(inti=1;i=p;i++){context.write(newText(val[0]+,+i),newText(a,+val[1]+,+val[2]));//写入到上下文中的形式{key:行号}System.out.println(key:+newText(val[0]+,+i).toString()+\tvalue:+newText(a,+val[1]+,+val[2]).toString());}}elseif(test_mb.txt.equals(flag)){b++;if(b==1){//第一行为矩阵形状return;}//矩阵A的行数循环,表示当前元素要参与运算的次数以及运算的目标位置//写入上下文的键值对格式为{key:“目标矩阵的行序,目标矩阵的列序”,value:源矩阵b,标矩阵的列序,源矩阵b的值}for(inti=1;i=m;i++){context.write(newText(i+,+val[1]),newText(b,+val[0]+,+val[2]));System.out.println(key:+newText(i+,+val[1]).toString()+\tvalue:+newText(b,+val[0]+,+val[2]).toString());}}}catch(java.lang.ArrayIndexOutOfBoundsExceptione){//记录不符合输入规则的行context.getCounter(Counter.LINESKIP).increment(1);return;}}}publicstaticclassMatrixReducerextendsReducerText,Text,Text,IntWritable{/***reduce阶段:从上下文读取键值对,key:value_1,value_2...value_n*key为目标矩阵的元素位置*将key对应的values进行整理,将矩阵A的元素放入mapA,矩阵B的元素放入mapB*矩阵mapA的键值对格式为{key:“目标矩阵的行序”,value:“源矩阵A的值”}*矩阵mapB的键值对格式为{key:“目标矩阵的列序”,value:源矩阵B的值}*mapA和mapB中key相同的value相乘,在相加,得到目标矩阵在该位置的值*/@Overrideprotectedvoidreduce(Textkey,IterableTextvalues,Contextcontext)throwsIOException,InterruptedException{MapString,StringmapA=newHashMapString,String();MapString,StringmapB=newHashMapString,String();for(Textvalue:values){System.out.println(reduce:\t+value.toString());String[]val=value.toString().split(,);if(a.equals(val[0])){mapA.put(val[1],val[2]);}elseif(b.equals(val[0])){mapB.put(val[1],val[2]);}}intresult=0;IteratorStringmKeys=mapA.keySet().iterator();while(mKeys.hasNext()){Stringmkey=mKeys.next();if(mapB.get(mkey)==nul