本文主要描述了hadooprpc服务端的初始化和调用过程,相比客户端的初始化,rpc服务端感觉会简单点,但是调用过程却比客户端复杂一些。本文还是以namenode为例,namenode会在执行main方法的时候,创建一个namenode实例,及完成一系列的初始化过程,其中就包括了rpc的初始化过程。rpc服务端的初始化上面已经提到我们这里主要借用了namenode的远程服务,先来看看相关代码:?12345678910111213141516171819publicclassNameNodeimplementsNameNodeStatusMXBean{publicstaticvoidmain(Stringargv[])throwsException{NameNodenamenode=createNameNode(argv,null);}protectedNameNode(Configurationconf,NamenodeRolerole)throwsIOException{initialize(conf);}protectedvoidinitialize(Configurationconf)throwsIOException{rpcServer=createRpcServer(conf);startCommonServices(conf);//相当重要}protectedNameNodeRpcServercreateRpcServer(Configurationconf)throwsIOException{returnnewNameNodeRpcServer(conf,this);}}我们的linux的终端执行hadoop的启动命令的时候,最终的命令是调用NameNode的main方法,所以我们追踪代码的切入点是NameNode的main方法,方法比较简单,就是调用NameNode的构造函数创建一个NameNode,然后执行初始化方法initialize,这个方法相对来说,是我们关注的重点,包括rpc服务在内的初始化操作都放在这个方法里面。特定于rpc,他执行了两个相关的方法createRpcServer和startCommonServices,第一个方法见名思意,不多说,先简单介绍下后面的方法,该方法的作用就是启动namenode的rpc服务,稍后我给出代码。好的,从上面的代码可以看到,我们的rpcServer功能都放在了类NameNodeRpcServer里面,现在让我们来看看这个类里面相关的代码:?12345678910111213141516171819202122232425262728293031323334classNameNodeRpcServerimplementsNamenodeProtocols{publicNameNodeRpcServer(Configurationconf,NameNodenn)throwsIOException{RPC.setProtocolEngine(conf,ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);ClientNamenodeProtocolServerSspan/spanideTranslatorPBclientProtocolServerTranslator=newClientNamenodeProtocolServerSideTranslatorPB(this);BlockingServiceclientNNPbService=ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);InetSocketAddressrpspan/spancAddr=nn.getRpcServerAddress(conf);//fs.defaultFSStringbindHost=nn.getRpcServerBindHost(conf);if(bindHost==null){bindHost=rpcAddr.getHostName();}LOG.info(RPCserverisbindingto+bindHost+:+rpcAddr.getPort());this.clientRpcServer=newRPC.Builder(conf).setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class).setInstance(clientNNPbService).setBindAddress(bindHost).setPort(rpcAddr.getPort()).setNumHandlers(handlerCount).setVerbose(false).setSecretManager(namesystem.getDelegationTokenSecretManager()).build();//AddalltheRPCprotocolsthatthenamenodeimplementsDFSUtil.addPBProtocol(conf,HAServiceProtocolPB.class,haPbService,clientRpcServer);DFSUtil.addPBProtocol(conf,NamenodeProtocolPB.class,NNPbService,clientRpcServer);DFSUtil.addPBProtocol(conf,DatanodeProtocolPB.class,dnProtoPbService,3536clientRpcServer);}}在NameNodeRpcServer的构造函数里面最重要的一件事情是实例化clientRpcServer,这里面我最想说明的是,NameNode宣称自己实现了三个协议:ClientProtocol、DatanodeProtocol和NamenodeProtocol,在服务端的实现基本上就靠ClientNamenodeProtocolServerSideTranslatorPB之类的类型了,特别在实例化ClientNamenodeProtocolServerSideTranslatorPB的时候有传入一个形参,这个形参就是NameNodeRpcServer实例,看代码:?12345678910111213141516171819202122publicClientNamenodeProtocolServerSideTranslatorPB(ClientProtocolserver)throwsIOException{this.server=server;}@OverridepublicGetBlockLocationsResponseProtogetBlockLocations(RpcControllercontroller,GetBlockLocationsRequestProtoreq)throwsServiceException{try{LocatedBlocksb=server.getBlockLocations(req.getSrc(),req.getOffset(),req.getLength());Builderbuilder=GetBlockLocationsResponseProto.newBuilder();if(b!=null){builder.setLocations(PBHelper.convert(b)).build();}returnbuilder.build();}catch(IOExceptione){thrownewServiceException(e);}}上面代码中的getBlockLocations也一定程度上说明了刚才的观点。现在让我们回过头看看NameNode中initialize方法中执行的startCommonServices方法,这个方法用来启动clientRpcServer下面的线程,包括listener,handler、response,具体看代码:?123456789101112131415161718192021222324252627publicclassNameNodeimplementsNameNodeStatusMXBean{privatevoidstartCommonServices(Configurationconf)throwsIOException{rpcServer.start();}}classNameNodeRpcServerimplementsNamenodeProtocols{voidstart(){clientRpcServer.start();if(serviceRpcServer!=null){serviceRpcServer.start();}}}publicabstractclassServer{publicsynchronizedvoidstart(){responder.start();listener.start();handlers=newHandler[handlerCount];for(inti=0;ihandlerCount;i++){handlers[i]=newHandler(i);handlers[i].start();}}}代码看到这里,启动过程中rpc相关的代码就结束了。rpc服务端的调用过程现在让我们来看看rpc被调用的过程,先来认识下Server的关键结构:?1publicabstractclassServer{234567891011121314151617privateListenerlistener=null;privateResponderresponder=null;privateHandler[]handlers=null;privateclassResponderextendsThread{}privateclassListenerextendsThread{}privateclassHandlerextendsThread{}}在初始化的时候,就启动listener、responder和handlers下面的所有线程。其中listener线程里面启动了一个socker服务,专门用来接受客户端的请求,handler下面的线程用来处理具体的请求,responder写请求结果,具体过程可以看下下面的代码:?123456789101112131415161718publicabstractclassServer{privateListenerlistener=null;privateResponderresponder=null;privateHandler[]handlers=null;privateclassListenerextendsThread{publicListener()throwsIOException{address=newInetSocketAddress(bindAddress,port);//CreateanewserversocketandsettononblockingmodeacceptChannel=ServerSocketChannel.open();acceptChannel.configureBlocking(false);//Bindtheserversockettothelocalhostandportbind(acceptChannel.socket(),address,backlogLength,conf,portRangeConfig);port=acceptChannel.socket().getLocalPort(