实验3MapReduce编程初级实践实验3MapReduce编程初级实践1.实验目的1.通过实验掌握基本的MapReduce编程方法;2.掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。2.实验平台已经配置完成的Hadoop伪分布式环境。3.实验内容和要求1.编程实现文件合并和去重操作对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。下面是输入文件和输出文件的一个样例供参考。实验最终结果(合并的文件):代码如下:packagecom.Merge;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassMerge{publicstaticclassMapextendsMapperObject,Text,Text,Text{privatestaticTexttext=newText();实验3MapReduce编程初级实践publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{text=value;context.write(text,newText());}}publicstaticclassReduceextendsReducerText,Text,Text,Text{publicvoidreduce(Textkey,IterableTextvalues,Contextcontext)throwsIOException,InterruptedException{context.write(key,newText());}}publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();conf.set(fs.defaultFS,hdfs://localhost:9000);String[]otherArgs=newString[]{input,output};if(otherArgs.length!=2){System.err.println(Usage:Mergeandduplicateremovalinout);System.exit(2);}Jobjob=Job.getInstance(conf,Mergeandduplicateremoval);job.setJarByClass(Merge.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job,newPath(otherArgs[0]));FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));System.exit(job.waitForCompletion(true)?0:1);}}2.编写程序实现对输入文件的排序现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。实验结果截图:实验3MapReduce编程初级实践代码如下:packagecom.MergeSort;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassMergeSort{publicstaticclassMapextendsMapperObject,Text,IntWritable,IntWritable{privatestaticIntWritabledata=newIntWritable();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Stringline=value.toString();data.set(Integer.parseInt(line));context.write(data,newIntWritable(1));}}publicstaticclassReduceextendsReducerIntWritable,IntWritable,IntWritable,IntWritable{privatestaticIntWritablelinenum=newIntWritable(1);publicvoidreduce(IntWritablekey,IterableIntWritablevalues,Contextcontext)throwsIOException,InterruptedException{for(IntWritableval:values){context.write(linenum,key);linenum=newIntWritable(linenum.get()+1);}}}实验3MapReduce编程初级实践publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();conf.set(fs.defaultFS,hdfs://localhost:9000);String[]otherArgs=newString[]{input2,output2};/*直接设置输入参数*/if(otherArgs.length!=2){System.err.println(Usage:mergesortinout);System.exit(2);}Jobjob=Job.getInstance(conf,mergesort);job.setJarByClass(MergeSort.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,newPath(otherArgs[0]));FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));System.exit(job.waitForCompletion(true)?0:1);}}3.对给定的表格进行信息挖掘下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。实验最后结果截图如下:代码如下:packagecom.join;importjava.io.IOException;importjava.util.*;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Text;实验3MapReduce编程初级实践importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassSTjoin{publicstaticinttime=0;publicstaticclassMapextendsMapperObject,Text,Text,Text{publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Stringchild_name=newString();Stringparent_name=newString();Stringrelation_type=newString();Stringline=value.toString();inti=0;while(line.charAt(i)!=''){i++;}String[]values={line.substring(0,i),line.substring(i+1)};if(values[0].compareTo(child)!=0){child_name=values[0];parent_name=values[1];relation_type=1;context.write(newText(values[1]),newText(relation_type+++child_name+++parent_name));relation_type=2;context.write(newText(values[0]),newText(relation_type+++child_name+++parent_name));}}}publicstaticclassReduceextendsReducerText,Text,Text,Text{publicvoidreduce(Textkey,IterableTe