基于mapreduce的Hadoopjoin实现分析

整理文档很辛苦,赏杯茶钱您下走!

免费阅读已结束,点击下载阅读编辑剩下 ...

阅读已结束,您可以下载文档离线阅读编辑

资源描述

基于mapreduce的Hadoopjoin实现分析(一)对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现.我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:人员ID人员名称地址ID1张三12李四23王五14赵六35马七3另外一组为地址信息:地址ID地址名称1北京2上海3广州这里给出了一个很简单的例子,而且数据量很小,就这么用眼睛就能看过来的几行,当然,实际的情况可能是几十万上百万甚至上亿的数据量.要实现的功能很简单,就是将人员信息与地址信息进行join,将人员的地址ID完善成为地址名称.对于Hadoop文件系统的应用,目前看来,很多数据的存储都是基于文本的,而且都是将数据放在一个文件目录中进行处理.因此我们这里也采用这种模式来完成.对于mapreduce程序来说,最主要的就是将要做的工作转化为map以及reduce两个部分.我们可以将地址以及人员都采用同样的数据结构来存储,通过一个flag标志来指定该数据结构里面存储的是地址信息还是人员信息.经过map后,使用地址ID作为key,将所有的具有相同地址的地址信息和人员信息放入一个key-valuelist数据结构中传送到reduce中进行处理.在reduce过程中,由于key是地址的ID,所以valuelist中只有一个是地址信息,其他的都是人员信息,因此,找到该地址信息后,其他的人员信息的地址就是该地址所指定的地址名称.OK,我们的join算法基本搞定啦.剩下就是编程实现了,let’sgo.上面提到了存储人员和地址信息的数据结构,可以说这个数据结构是改程序的重要的数据载体之一.我们先来看看该数据结构:importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importorg.apache.hadoop.io.WritableComparable;publicclassRecordimplementsWritableComparable{inttype;//数据类型的定义,1为人员,2为地址StringempName=;StringempId=;StringlocId=;StringlocationName=;publicRecord(){super();}publicRecord(Recordrecord){this.type=record.type;this.empName=record.empName;this.empId=record.empId;this.locId=record.locId;this.locationName=record.locationName;}publicStringtoString(){if(type==1)returnempId+,+empName+,+locationName;elseif(type==2)returnlocId+,+locationName;returnuninitdata!;}publicvoidreadFields(DataInputin)throwsIOException{type=in.readInt();empName=in.readUTF();empId=in.readUTF();locId=in.readUTF();locationName=in.readUTF();}publicvoidwrite(DataOutputout)throwsIOException{out.writeInt(type);out.writeUTF(empName);out.writeUTF(empId);out.writeUTF(locId);out.writeUTF(locationName);}publicintcompareTo(Objectarg0){return0;}}上面的Record的实现了WritableComparable,对于Mapreduce的中间结果类来说,必须要实现Writable,从而在map完成输出中间结果时能够将中间结果写入到运行job的node文件系统中,至于Comparable接口的实现,对于作为Key的中间结果来说需要实现该接口,从而能够完成基于key的排序功能.接下来是Join的主程序,就是mapreduce的主程序.基本的主程序如下:importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.SequenceFile;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapred.FileInputFormat;importorg.apache.hadoop.mapred.FileOutputFormat;importorg.apache.hadoop.mapred.JobClient;importorg.apache.hadoop.mapred.JobConf;importorg.apache.hadoop.mapred.SequenceFileOutputFormat;publicclassJoin{publicstaticvoidmain(String[]args)throwsException{//TODOAuto-generatedmethodstubJobConfconf=newJobConf(Join.class);conf.setJobName(Join);FileSystemfstm=FileSystem.get(conf);PathoutDir=newPath(/Users/hadoop/outputtest);fstm.delete(outDir,true);conf.setOutputFormat(SequenceFileOutputFormat.class);conf.setMapOutputValueClass(Record.class);conf.setOutputKeyClass(LongWritable.class);conf.setOutputValueClass(Text.class);conf.setMapperClass(JoinMapper.class);conf.setReducerClass(JoinReducer.class);FileInputFormat.setInputPaths(conf,newPath(/user/hadoop/input/join));FileOutputFormat.setOutputPath(conf,outDir);JobClient.runJob(conf);PathoutPutFile=newPath(outDir,part-00000);SequenceFile.Readerreader=newSequenceFile.Reader(fstm,outPutFile,conf);org.apache.hadoop.io.TextnumInside=newText();LongWritablenumOutside=newLongWritable();while(reader.next(numOutside,numInside)){System.out.println(numInside.toString()++numOutside.toString());}reader.close();}}程序主体很简单,开始将输出目录删除,中间进行一系列的JobConf设定工作,将输出格式设为SequenceFile,最后读出程序结果到控制台.接下来我们看看Mapper的实现:importjava.io.IOException;importorg.apache.hadoop.mapred.MapReduceBase;importorg.apache.hadoop.mapred.Mapper;importorg.apache.hadoop.mapred.OutputCollector;importorg.apache.hadoop.mapred.Reporter;importorg.apache.hadoop.io.*;publicclassJoinMapperextendsMapReduceBaseimplementsMapperLongWritable,Text,LongWritable,Record{publicvoidmap(LongWritablekey,Textvalue,OutputCollectorLongWritable,Recordoutput,Reporterreporter)throwsIOException{Stringline=value.toString();String[]values=line.split(,);if(values.length==2){//这里使用记录的长度来区别地址信息与人员信息,当然可以通过其他方式(如文件名等)来实现Recordreco=newRecord();reco.locId=values[0];reco.type=2;reco.locationName=values[1];output.collect(newLongWritable(Long.parseLong(values[0])),reco);}else{Recordreco=newRecord();reco.empId=values[0];reco.empName=values[1];reco.locId=values[2];reco.type=1;output.collect(newLongWritable(Long.parseLong(values[2])),reco);}}}对于maper来说,就是从输入文件中读取相应数据构造key-value(地址id-地址或者人员对象)的数据对,并交给hadoop框架完成shuffle等工作.经过hadoop框架完成suffle之后便会将具有想同地址ID的人员信息以及地址信息交给reducer来进行处理.好啦,剩下就是最后一步了,其实也是最重要的一步就是reduce端的join工作了.还是来看看代码吧:importjava.io.IOException;importjava.util.Iterator;importjava.util.List;importjava.util.Vector;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapred.MapReduceBase;importorg.apache.hadoop.mapred.OutputCollector;importorg.apache.hadoop.mapred.Reducer;importorg.apache.hadoop.mapred.Reporter;publicclassJoinReducerextendsMapReduceBaseimplementsReducerLongWritable,Record,LongWritable,Text{publicvoidreduce(LongWritablekey,IteratorRecordvalues,OutputCollectorLongWritable,T

1 / 16
下载文档,编辑使用

©2015-2020 m.777doc.com 三七文档.

备案号:鲁ICP备2024069028号-1 客服联系 QQ:2149211541

×
保存成功