map-reduce入门

整理文档很辛苦,赏杯茶钱您下走!

免费阅读已结束,点击下载阅读编辑剩下 ...

阅读已结束,您可以下载文档离线阅读编辑

资源描述

Hadoop学习总结之三:Map-Reduce入门1、Map-Reduce的逻辑过程假设我们需要处理一批有关天气的数据,其格式如下:按照ASCII码存储,每行一条记录每一行字符从0开始计数,第15个到第18个字符为年第25个到第29个字符为温度,其中第25位是符号+/-0067011990999991950051507+0000+0043011990999991950051512+0022+0043011990999991950051518-0011+0043012650999991949032412+0111+0043012650999991949032418+0078+0067011990999991937051507+0001+0043011990999991937051512-0002+0043011990999991945051518+0001+0043012650999991945032412+0002+0043012650999991945032418+0078+现在需要统计出每年的最高温度。Map-Reduce主要包括两个步骤:Map和Reduce每一步都有key-value对作为输入和输出:map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应对于上面的例子,在map过程,输入的key-value对如下:(0,0067011990999991950051507+0000+)(33,0043011990999991950051512+0022+)(66,0043011990999991950051518-0011+)(99,0043012650999991949032412+0111+)(132,0043012650999991949032418+0078+)(165,0067011990999991937051507+0001+)(198,0043011990999991937051512-0002+)(231,0043011990999991945051518+0001+)(264,0043012650999991945032412+0002+)(297,0043012650999991945032418+0078+)在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:(1950,0)(1950,22)(1950,-11)(1949,111)(1949,78)(1937,1)(1937,-2)(1945,1)(1945,2)(1945,78)在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入(1950,[0,22,–11])(1949,[111,78])(1937,[1,-2])(1945,[1,2,78])在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:(1950,22)(1949,111)(1937,1)(1945,78)其逻辑过程可用如下图表示:2、编写Map-Reduce程序编写Map-Reduce程序,一般需要实现两个函数:mapper中的map函数和reducer中的reduce函数。一般遵循以下格式:map:(K1,V1)-list(K2,V2)publicinterfaceMapperK1,V1,K2,V2extendsJobConfigurable,Closeable{voidmap(K1key,V1value,OutputCollectorK2,V2output,Reporterreporter)throwsIOException;}reduce:(K2,list(V))-list(K3,V3)publicinterfaceReducerK2,V2,K3,V3extendsJobConfigurable,Closeable{voidreduce(K2key,IteratorV2values,OutputCollectorK3,V3output,Reporterreporter)throwsIOException;}对于上面的例子,则实现的mapper如下:publicclassMaxTemperatureMapperextendsMapReduceBaseimplementsMapperLongWritable,Text,Text,IntWritable{@Overridepublicvoidmap(LongWritablekey,Textvalue,OutputCollectorText,IntWritableoutput,Reporterreporter)throwsIOException{Stringline=value.toString();Stringyear=line.substring(15,19);intairTemperature;if(line.charAt(25)=='+'){airTemperature=Integer.parseInt(line.substring(26,30));}else{airTemperature=Integer.parseInt(line.substring(25,30));}output.collect(newText(year),newIntWritable(airTemperature));}}实现的reducer如下:publicclassMaxTemperatureReducerextendsMapReduceBaseimplementsReducerText,IntWritable,Text,IntWritable{publicvoidreduce(Textkey,IteratorIntWritablevalues,OutputCollectorText,IntWritableoutput,Reporterreporter)throwsIOException{intmaxValue=Integer.MIN_VALUE;while(values.hasNext()){maxValue=Math.max(maxValue,values.next().get());}output.collect(key,newIntWritable(maxValue));}}欲运行上面实现的Mapper和Reduce,则需要生成一个Map-Reduce得任务(Job),其基本包括以下三部分:输入的数据,也即需要处理的数据Map-Reduce程序,也即上面实现的Mapper和Reducer此任务的配置项JobConf欲配置JobConf,需要大致了解Hadoop运行job的基本原理:Hadoop将Job分成task进行处理,共两种task:maptask和reducetaskHadoop有两类的节点控制job的运行:JobTracker和TaskTrackerJobTracker协调整个job的运行,将task分配到不同的TaskTracker上TaskTracker负责运行task,并将结果返回给JobTrackerHadoop将输入数据分成固定大小的块,我们称之inputsplitHadoop为每一个inputsplit创建一个task,在此task中依次处理此split中的一个个记录(record)Hadoop会尽量让输入数据块所在的DataNode和task所执行的DataNode(每个DataNode上都有一个TaskTracker)为同一个,可以提高运行效率,所以inputsplit的大小也一般是HDFS的block的大小。Reducetask的输入一般为MapTask的输出,ReduceTask的输出为整个job的输出,保存在HDFS上。在reduce中,相同key的所有的记录一定会到同一个TaskTracker上面运行,然而不同的key可以在不同的TaskTracker上面运行,我们称之为partitionpartition的规则为:(K2,V2)–Integer,也即根据K2,生成一个partition的id,具有相同id的K2则进入同一个partition,被同一个TaskTracker上被同一个Reducer进行处理。publicinterfacePartitionerK2,V2extendsJobConfigurable{intgetPartition(K2key,V2value,intnumPartitions);}下图大概描述了Map-Reduce的Job运行的基本原理:下面我们讨论JobConf,其有很多的项可以进行配置:setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable,value为TextsetNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的inputsplit的个数setMapperClass:设置Mapper,默认为IdentityMappersetMapRunnerClass:设置MapRunner,maptask是由MapRunner运行的,默认为MapRunnable,其功能为读取inputsplit的一个个record,依次调用Mapper的map函数setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reducetask处理,所以partition的个数等于reducetask的个数setReducerClass:设置Reducer,默认为IdentityReducersetOutputFormat:设置任务的输出格式,默认为TextOutputFormatFileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:publicclassMaxTemperature{publicstaticvoidmain(String[]args)throwsIOException{if(args.length!=2){System.err.println(Usage:MaxTemperatureinputpathoutputpath);System.exit(-1);}JobConfconf=newJobConf(MaxTemperature.class);conf.setJobName(Maxtemperature);FileInputFormat.addInputPath(conf,newPath(args[0]));FileOutputFormat.setOutputPath(conf,newPath(args[1]));conf.setMapperClass(

1 / 8
下载文档,编辑使用

©2015-2020 m.777doc.com 三七文档.

备案号:鲁ICP备2024069028号-1 客服联系 QQ:2149211541

×
保存成功