一、客户端Map-Reduce的过程首先是由客户端提交一个任务开始的。提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:publicstaticRunningJobrunJob(JobConfjob)throwsIOException{//首先生成一个JobClient对象JobClientjc=newJobClient(job);……//调用submitJob来提交一个任务running=jc.submitJob(job);JobIDjobId=running.getID();……while(true){//while循环中不断得到此任务的状态,并打印到客户端console中}returnrunning;}其中JobClient的submitJob函数实现如下:publicRunningJobsubmitJob(JobConfjob)throwsFileNotFoundException,InvalidJobConfException,IOException{//从JobTracker得到当前任务的idJobIDjobId=jobSubmitClient.getNewJobId();//准备将任务运行所需要的要素写入HDFS://任务运行程序所在的jar封装成job.jar//任务所要处理的inputsplit信息写入job.split//任务运行的配置项汇总写入job.xmlPathsubmitJobDir=newPath(getSystemDir(),jobId.toString());PathsubmitJarFile=newPath(submitJobDir,job.jar);PathsubmitSplitFile=newPath(submitJobDir,job.split);//此处将-libjars命令行指定的jar上传至HDFSconfigureCommandLineOptions(job,submitJobDir,submitJarFile);PathsubmitJobFile=newPath(submitJobDir,job.xml);……//通过inputformat的格式获得相应的inputsplit,默认类型为FileSplitInputSplit[]splits=job.getInputFormat().getSplits(job,job.getNumMapTasks());//生成一个写入流,将inputsplit得信息写入job.split文件FSDataOutputStreamout=FileSystem.create(fs,submitSplitFile,newFsPermission(JOB_FILE_PERMISSION));try{//写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个inputsplit的信息。//对于每一个inputsplit写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split在文件中的起始位置),split的location信息(即在那个DataNode上)。writeSplitsFile(splits,out);}finally{out.close();}job.set(mapred.job.split.file,submitSplitFile.toString());//根据split的个数设定maptask的个数job.setNumMapTasks(splits.length);//写入job的配置信息入job.xml文件out=FileSystem.create(fs,submitJobFile,newFsPermission(JOB_FILE_PERMISSION));try{job.writeXml(out);}finally{out.close();}//真正的调用JobTracker来提交任务JobStatusstatus=jobSubmitClient.submitJob(jobId);……}二、JobTrackerJobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:调用静态函数startTracker(newJobConf())创建一个JobTracker对象调用JobTracker.offerService()函数提供服务在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:JobQueueJobInProgressListenerjobQueueJobInProgressListener用于监控job的运行状态EagerTaskInitializationListenereagerTaskInitializationListener用于对Job进行初始化EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:synchronized(jobs){synchronized(taskScheduler){jobs.put(job.getProfile().getJobID(),job);//对JobTracker的每一个listener都调用jobAdded函数for(JobInProgressListenerlistener:jobInProgressListeners){listener.jobAdded(job);}}}EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:publicsynchronizedvoidinitTasks()throwsIOException{……//从HDFS中读取job.split文件从而生成inputsplitsStringjobFile=profile.getJobFile();PathsysDir=newPath(this.jobtracker.getSystemDir());FileSystemfs=sysDir.getFileSystem(conf);DataInputStreamsplitFile=fs.open(newPath(conf.get(mapred.job.split.file)));JobClient.RawSplit[]splits;try{splits=JobClient.readSplitFile(splitFile);}finally{splitFile.close();}//maptask的个数就是inputsplit的个数numMapTasks=splits.length;//为每个maptasks生成一个TaskInProgress来处理一个inputsplitmaps=newTaskInProgress[numMapTasks];for(inti=0;inumMapTasks;++i){inputLength+=splits[i].getDataLength();maps[i]=newTaskInProgress(jobId,jobFile,splits[i],jobtracker,conf,this,i);}//对于maptask,将其放入nonRunningMapCache,是一个MapNode,ListTaskInProgress,也即对于maptask来讲,其将会被分配到其inputsplit所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配maptask的时候使用。if(numMapTasks0){nonRunningMapCache=createCache(splits,maxLevel);}//创建reducetaskthis.reduces=newTaskInProgress[numReduceTasks];for(inti=0;inumReduceTasks;i++){reduces[i]=newTaskInProgress(jobId,jobFile,numMapTasks,i,jobtracker,conf,this);//reducetask放入nonRunningReduces,其将在JobTracker向TaskTracker分配reducetask的时候使用。nonRunningReduces.add(reduces[i]);}//创建两个cleanuptask,一个用来清理map,一个用来清理reduce.cleanup=newTaskInProgress[2];cleanup[0]=newTaskInProgress(jobId,jobFile,splits[0],jobtracker,conf,this,numMapTasks);cleanup[0].setJobCleanupTask();cleanup[1]=newTaskInProgress(jobId,jobFile,numMapTasks,numReduceTasks,jobtracker,conf,this);cleanup[1].setJobCleanupTask();//创建两个初始化task,一个初始化map,一个初始化reduce.setup=newTaskInProgress[2];setup[0]=newTaskInProgress(jobId,jobFile,splits[0],jobtracker,conf,this,numMapTasks+1);setup[0].setJobSetupTask();setup[1]=newTaskInProgress(jobId,jobFile,numMapTasks,numReduceTasks+1,jobtracker,conf,this);setup[1].setJobSetupTask();tasksInited.set(true);//初始化完毕……}三、TaskTrackerTaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了newTaskTracker(conf).run(),其中run函数主要调用了:StateofferService()throwsException{longlastHeartbeat=0;//TaskTracker进行是一直存在的while(running&&!shuttingDown){……longnow=System.currentTimeMillis();//每隔一段时间就向JobTracker发送heartbeatlongwaitTime=heartbeatInterval-(now-lastHeartbeat);if(waitTime0){synchronized(finishedCount){if(finishedCount[0]==0)