厦门大学数据库实验室MapReduce连接报告人:李雨倩导师:林子雨2014.07.12连接简介MapReduce连接策略连接连接是关系运算,可以用于合并关系。在数据库中,一般是表连接操作;在MapReduce中,连接可以用于合并两个或多个数据集。例如,用户基本信息和用户活动详情。用户基本信息来自于OLTP数据库。用户活动详情来自于日志文件。连接的类型最常用的两个连接类型是内连接和外连接。内连接比较两个关系中所有的数组,然后生成一个满足连接条件的结果集。外连接外连接并不需要两个关系的数组都满足连接条件。在连接条件不满足的时候,外连接可以将一方的数据保留在结果集中。内连接左外连接右外连接全连接连接关系图连接实例select*fromtable1leftjointable2ontable1.id=table2.id-------------结果-------------idnameidscore------------------------------1lee1902zhang21004wangNULLNULL------------------------------select*fromtable1jointable2ontable1.id=table2.id-------------结果-------------idnameidscore------------------------------1lee1902zhang2100select*fromtable1fulljointable2ontable1.id=table2.id-------------结果-------------idnameidscore------------------------------1lee1902zhang21004wangNULLNULLNULLNULL370select*fromtable1rightjointable2ontable1.id=table2.id-------------结果-------------idnameidscore------------------------------1lee1902zhang2100NULLNULL370------------------------------Table1|table2|-------------------------------------------------idname|idscore|1lee190|2zhang2100|4wang370|连接简介MapReduce连接策略连接连接是关系运算,可以用于合并关系。在数据库中,一般是表连接操作;在MapReduce中,连接可以用于合并两个或多个数据集。例如,用户基本信息和用户活动详情。用户基本信息来自于OLTP数据库。用户活动详情来自于日志文件。MapReduce的连接welcometousethesePowerPointtemplates,NewContentdesign,10yearsexperienceMapReduce连接的应用场景用户的人口统计信息的聚合操作(例如:青少年和中年人的习惯差异)当用户超过一定时间没有使用网站后,发邮件提醒他们。分析用户的浏览习惯,让系统可以提示用户有哪些网站特性还没有使用到,形成一个反馈循环。MapReduce中的连接策略重分区连接复制连接半连接——reduce端连接。使用场景:连接两个或多个大型数据集。——map端连接。使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。——map端连接。使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。重分区连接重分区连接利用MapReduce的排序-合并机制来分组数据。它被实现为使用一个单独的MapReduce任务,并支持多路连接(这里的多路指的是多个数据集)。Map阶段负责从多个数据集中读取数据,决定每个数据的连接值,将连接值作为输出键。输出值则包含将在reduce阶段被合并的值。Reduce阶段,一个reducer接收map函数传来的一个输出键的所有输出值,并将数据分为多个分区。在此之后,reducer对所有的分区进行笛卡尔积连接运算,并生成全部的结果集。在如下示例中,用户数据中有用户姓名,年龄和所在州•$cattest-data/ch4/users.txtanne22NYjoe39COalison35NYmike69VAmarie27ORjim21ORbob71CAmary53NYdave36VAdude50CA用户活动日志中有用户姓名,进行的动作,来源IP。这个文件一般都要比用户数据要大得多。•$cattest-data/ch4/user-logs.txtjimlogout93.24.237.12mikenew_tweet87.124.79.252bobnew_tweet58.133.120.100mikelogout55.237.104.36jimnew_tweet93.24.237.12marieview_user122.158.130.90$hadoopfs-puttest-data/ch4/user-logs.txtuser-logs.txt$bin/run.shcom.manning.hip.ch4.joins.improved.SampleMainusers.txt,user-logs.txtoutput$hadoopfs-catoutput/part*bob71CAnew_tweet58.133.120.100jim21ORlogout93.24.237.12jim21ORnew_tweet93.24.237.12jim21ORlogin198.184.237.49marie27ORlogin58.133.120.100marie27ORview_user122.158.130.90mike69VAnew_tweet87.124.79.252mike69VAlogout55.237.104.36重分区连接过滤(filter)指的是将map极端的输入数据中不需要的部分丢弃。投影(project)是关系代数的概念。投影用于减少发送给reducer的字段。优化重分区连接传统重分区方法的实现空间效率低下。它需要将连接的所有的输出值都读取到内存中,然后进行多路连接。事实上,如果仅仅将小数据集读取到内存中,然后用小数据集来遍历大数据集,进行连接,这样将更加高效。下图是优化后的重分区连接的流程图。Map输出的组合键和组合值上图说明了map输出的组合键和组合值。二次排序将会根据连接键(joinkey)进行分区,但会用整个组合键来进行排序。组合键包括一个标识源数据集(较大或较小)的整形值,因此可以根据这个整形值来保证较小源数据集的值先于较大源数据的值被reducer接收。优化重分区连接上图是实现的类图。类图中包含两个部分,一个通用框架和一些类的实现样例。使用这个连接框架需要实现抽象类OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。OptimizedDataJoinMapperBase•protectedabstractTextgenerateInputTag(StringinputFile);•protectedabstractbooleanisInputSmaller(StringinputFile);•publicvoidconfigure(JobConfjob){this.inputFile=job.get(map.input.file);•this.inputTag=generateInputTag(this.inputFile);•if(isInputSmaller(this.inputFile)){•smaller=newBooleanWritable(true);•outputKey.setOrder(0);•}else{•smaller=newBooleanWritable(false);•outputKey.setOrder(1);•}•}这个类的作用是辨认出较小的数据集,并生成输出键和输出值。Configure方法在mapper创建期调用。Configure方法的作用之一是标识每一个数据集,让reducer可以区分数据的源数据集。另一个作用是辨认当前的输入数据是否是较小的数据集。OptimizedDataJoinMapperBase(续)•protectedabstractOptimizedTaggedMapOutputgenerateTaggedMapOutput(Objectvalue);•protectedabstractStringgenerateGroupKey(Objectkey,OptimizedTaggedMapOutputaRecord);•publicvoidmap(Objectkey,Objectvalue,OutputCollectoroutput,Reporterreporter)•throwsIOException{•OptimizedTaggedMapOutputaRecord=generateTaggedMapOutput(value);if(aRecord==null){•return;}•aRecord.setSmaller(smaller);•StringgroupKey=generateGroupKey(aRecord);•if(groupKey==null){•return;}•outputKey.setKey(groupKey);•output.collect(outputKey,aRecord);}Map方法首先调用自定义的方法(generateTaggedMapOutput)来生成OutputValue对象。这个对象包含了在连接中需要使用的值,和一个标识较大或较小数据集的布尔值。如果map方法可以调用自定义的方法(generateGroupKey)来得到可以在连接中使用的键,那么这个键就作为map的输出键。OptimizedDataJoinReducerBase•publicvoidreduce(Objectkey,Iteratorvalues,OutputCollectoroutput,Reporterreporter)•throwsIOException{•CompositeKeyk=(CompositeKey)key;•Listsmaller=newArrayList();•while(values.hasNext()){•Objectvalue=values.next();•OptimizedTaggedMapOutputcloned=((OptimizedTaggedMapOutput)value).clone(job);•if(cloned.isSmaller().get()){•smaller.add(cloned);•}else{•joinAndCollect(k,smaller,cloned,output,reporter);•}•}•}Map端处理后已经可以保证较小源数据集的值将会先于较大源数据集的值被接收。这里就可以将所有的较小源数据集的值放到缓存中。在开始接收较大源数据集的值的时候,就开始和缓存中的值做连接操作。OptimizedDataJoinRuducerBase(续)•protectedabstractOptimizedTaggedMapOutputcombine(Stringkey,•OptimizedTaggedMapOutputvalue1,OptimizedTaggedMapOutputvalue2);•privatevoidjoinAndCollect(CompositeKeykey,Listsmaller,OptimizedTaggedMapOutpu