第七章MapReduce提纲•7.1概述•7.2MapReduce体系结构•7.3MapReduce工作流程•7.4实例分析:WordCount•7.5MapReduce的具体应用•7.6MapReduce编程实践7.1概述7.1.1分布式并行编程7.1.2MapReduce模型简介7.1.3Map和Reduce函数7.1.1分布式并行编程•“摩尔定律”,CPU性能大约每隔18个月翻一番•从2005年开始摩尔定律逐渐失效,需要处理的数据量快速增加,人们开始借助于分布式并行编程来提高程序性能•分布式程序运行在大规模计算机集群上,可以并行执行大规模数据处理任务,从而获得海量的计算能力•谷歌公司最先提出了分布式并行编程模型MapReduce,HadoopMapReduce是它的开源实现,后者比前者使用门槛低很多7.1.1分布式并行编程问题:在MapReduce出现之前,已经有像MPI这样非常成熟的并行计算框架了,那么为什么Google还需要MapReduce?MapReduce相较于传统的并行计算框架有什么优势?传统并行计算框架MapReduce集群架构/容错性共享式(共享内存/共享存储),容错性差非共享式,容错性好硬件/价格/扩展性刀片服务器、高速网、SAN,价格贵,扩展性差普通PC机,便宜,扩展性好编程/学习难度what-how,难what,简单适用场景实时、细粒度计算、计算密集型批处理、非实时、数据密集型7.1.2MapReduce模型简介•MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce•编程容易,不需要掌握分布式并行编程细节,也可以很容易把自己的程序运行在分布式系统上,完成海量数据的计算•MapReduce采用“分而治之”策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个Map任务并行处理•MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销•MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave。Master上运行JobTracker,Slave上运行TaskTracker•Hadoop框架是用Java实现的,但是,MapReduce应用程序则不一定要用Java来写7.1.3Map和Reduce函数函数输入输出说明Mapk1,v1如:行号,”abc”List(k2,v2)如:“a”,1“b”,1“c”,11.将小数据集进一步解析成一批key,value对,输入Map函数中进行处理2.每一个输入的k1,v1会输出一批k2,v2。k2,v2是计算的中间结果Reducek2,List(v2)如:“a”,1,1,1k3,v3“a”,3输入的中间结果k2,List(v2)中的List(v2)表示是一批属于同一个k2的value7.2MapReduce的体系结构ClientClientClientTaskSchedulerJobTrackerTaskTrackerMapTaskReduceTaskMapTaskTaskTrackerMapTaskReduceTaskMapTaskTaskTrackerMapTaskReduceTaskMapTaskMapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task7.2MapReduce的体系结构MapReduce主要有以下4个部分组成:1)Client•用户编写的MapReduce程序通过Client提交到JobTracker端•用户可通过Client提供的一些接口查看作业运行状态2)JobTracker•JobTracker负责资源监控和作业调度•JobTracker监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点•JobTracker会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源7.2MapReduce的体系结构3)TaskTracker•TaskTracker会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)•TaskTracker使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task获取到一个slot后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为Mapslot和Reduceslot两种,分别供MapTask和ReduceTask使用4)TaskTask分为MapTask和ReduceTask两种,均由TaskTracker启动7.3MapReduce工作流程7.3.1工作流程概述7.3.2MapReduce各个执行阶段7.3.3Shuffle过程详解7.3.1工作流程概述图7-1MapReduce工作流程分片0map()分片2map()分片1map()分片3map()分片4map()reduce()reduce()reduce()输出0输出1输出2输入Map任务Reduce任务输出Shuffle7.3.1工作流程概述•不同的Map任务之间不会进行通信•不同的Reduce任务之间也不会发生任何信息交换•用户不能显式地从一台机器向另一台机器发送消息•所有的数据交换都是通过MapReduce框架自身去实现的7.3.2MapReduce各个执行阶段InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat输入key,value中间结果key,value文件文件写入到分布式文件系统(如HDFS)InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat文件文件从分布式文件系统中加载文件从分布式文件系统中加载文件节点1节点2写入到分布式文件系统(如HDFS)最终结果key,value最终结果key,value中间结果key,value输入key,valuekey,value-listkey,value-list7.3.2MapReduce各个执行阶段HDFS以固定大小的block为基本单位存储数据,而对于MapReduce而言,其处理单位是split。split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。关于Split(分片)7.3.2MapReduce各个执行阶段Reduce任务的数量•最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目•通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)Map任务的数量•Hadoop为每个split创建一个Map任务,split的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块7.3.3Shuffle过程详解输入缓存溢写(分区、排序、合并)数据被Reduce任务取走磁盘文件归并多个分区归并归并输出Reduce任务Map任务其他Map任务其他Reduce任务MapReduce图7-3Shuffle过程1.Shuffle过程简介7.3.3Shuffle过程详解2.Map端的Shuffle过程Map任务缓存12输入数据和执行Map任务写入缓存3溢写(分区、排序、合并)4文件归并•每个Map任务分配一个缓存•MapReduce默认100MB缓存•设置溢写比例0.8•分区默认采用哈希函数•排序是默认的操作•排序后可以合并(Combine)•合并不能改变最终结果•在Map任务全部结束之前进行归并•归并得到一个大的文件,放在本地磁盘•文件归并时,如果溢写文件数量大于预定值(默认是3)则可以再次启动Combiner,少于3不需要•JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据合并(Combine)和归并(Merge)的区别:两个键值对“a”,1和“a”,1,如果合并,会得到“a”,2,如果归并,会得到“a”,1,17.3.3Shuffle过程详解3.Reduce端的Shuffle过程•Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取数据•Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘•多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的•当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce7.3.3Shuffle过程详解3.Reduce端的Shuffle过程缓存Map任务Reduce任务磁盘磁盘文件归并其他Map任务“领取”数据归并数据把数据输入给Reduce任务123其他Reduce任务分区分区其他Reduce任务图7-5Reduce端的Shuffle过程7.3.4MapReduce应用程序执行过程输出文件0输出文件1Worker分片0分片1分片2……分片MWorkerWorkerWorkerWorkerMaster用户程序(1)程序部署(1)程序部署(1)程序部署(2)分配Map任务(2)分配Reduce任务(4)本地写数据(5)远程读数据(3)读数据(6)写数据输入文件Map阶段中间文件(位于本地磁盘)Reduce阶段输出文件7.4实例分析:WordCount7.4.1WordCount程序任务7.4.2WordCount设计思路7.4.3一个WordCount执行过程的实例7.4.1WordCount程序任务表7-2WordCount程序任务程序WordCount输入一个包含大量单词的文本文件输出文件中每个单词及其出现次数(频数),并按照单词字母顺序排序,每个单词和其频数占一行,单词和频数之间有间隔表7-3一个WordCount的输入和输出实例输入输出HelloWorldHelloHadoopHelloMapReduceHadoop1Hello3MapReduce1World17.4.2WordCount设计思路首先,需要检查WordCount程序任务是否可以采用MapReduce来实现其次,确定MapReduce程序的设计思路最后,确定MapReduce程序的执行过程7.4.3一个WordCount执行过程的实例Map输入1.“HelloWorldByeWorld”2.“HelloHadoopByeHadoop”3.“ByeHadoopHelloHadoop”Map输出Hello,1World,1Bye,1World,1Hello,1Hadoop,1Bye,1Hadoop,1Bye,1Hadoop,1Hello,1Hadoop,1MapMapMap图7-7Map过程示意图7.4.3一个WordCount执行过程的实例Map输出的结果Hello,1World,1Bye,1World,1Hello,1Hadoop,1Bye,1Hadoop,1Bye,1Hadoop,1Hello,1Hadoop,1Map端Shuffle后的结果发送给Reduce任务Reduce任务的输出Bye,3Hadoop,4Hello,3World,2ShuffleHello,1World,1,1Bye,1Hel