1/29兄弟连Go语言+区块链技术培训以太坊源码分析(34)eth-downloader源码分析downloader主要负责区块链最开始的同步工作,当前的同步有两种模式,一种是传统的fullmode,这种模式通过下载区块头,和区块体来构建区块链,同步的过程就和普通的区块插入的过程一样,包括区块头的验证,交易的验证,交易执行,账户状态的改变等操作,这其实是一个比较消耗CPU和磁盘的一个过程。另一种模式就是快速同步的fastsync模式,这种模式有专门的文档来描述。请参考fastsync的文档。简单的说fastsync的模式会下载区块头,区块体和收据,插入的过程不会执行交易,然后在一个区块高度(最高的区块高度-1024)的时候同步所有的账户状态,后面的1024个区块会采用fullmode的方式来构建。这种模式会加区块的插入时间,同时不会产生大量的历史的账户信息。会相对节约磁盘,但是对于网络的消耗会更高。因为需要下载收据和状态。##downloader数据结构typeDownloaderstruct{modeSyncMode//Synchronisationmodedefiningthestrategyused(persynccycle)mux*event.TypeMux//Eventmultiplexertoannouncesyncoperationevents//queue对象用来调度区块头,交易,和收据的下载,以及下载完之后的组装queue*queue//Schedulerforselectingthehashestodownload//对端的集合peers*peerSet//SetofactivepeersfromwhichdownloadcanproceedstateDBethdb.Database//fastsync中的Pivotpoint区块的头fsPivotLock*types.Header//Pivotheaderoncriticalsectionentry(cannotchangebetweenretries)fsPivotFailsuint32//Numberofsubsequentfastsyncfailuresinthecriticalsection//下载的往返时延rttEstimateuint64//RoundtriptimetotargetfordownloadrequestsrttConfidenceuint64//ConfidenceintheestimatedRTT(unit:millionthstoallowatomicops)估计RTT的信心(单位:允许原子操作的百万分之一)//Statistics统计信息,2/29syncStatsChainOriginuint64//OriginblocknumberwheresyncingstartedatsyncStatsChainHeightuint64//HighestblocknumberknownwhensyncingstartedsyncStatsStatestateSyncStatssyncStatsLocksync.RWMutex//LockprotectingthesyncstatsfieldslightchainLightChainblockchainBlockChain//CallbacksdropPeerpeerDropFn//Dropsapeerformisbehaving//StatussynchroniseMockfunc(idstring,hashcommon.Hash)error//Replacementforsynchroniseduringtestingsynchronisingint32notifiedint32//ChannelsheaderChchandataPack//[eth/62]Channelreceivinginboundblockheadersheader的输入通道,从网络下载的header会被送到这个通道bodyChchandataPack//[eth/62]Channelreceivinginboundblockbodiesbodies的输入通道,从网络下载的bodies会被送到这个通道receiptChchandataPack//[eth/63]Channelreceivinginboundreceiptsreceipts的输入通道,从网络下载的receipts会被送到这个通道bodyWakeChchanbool//[eth/62]Channeltosignaltheblockbodyfetcherofnewtasks用来传输bodyfetcher新任务的通道receiptWakeChchanbool//[eth/63]Channeltosignalthereceiptfetcherofnewtasks用来传输receiptfetcher新任务的通道headerProcChchan[]*types.Header//[eth/62]Channeltofeedtheheaderprocessornewtasks通道为header处理者提供新的任务//forstateFetcherstateSyncStartchan*stateSync//用来启动新的statefetchertrackStateReqchan*stateReq//TODOstateChchandataPack//[eth/63]Channelreceivinginboundnodestatedatastate的输入通道,从网络下载的state会被送到这个通道//CancellationandterminationcancelPeerstring//Identifierofthepeercurrentlybeingusedasthemaster(cancelondrop)3/29cancelChchanstruct{}//Channeltocancelmid-flightsyncscancelLocksync.RWMutex//LocktoprotectthecancelchannelandpeerindeliversquitChchanstruct{}//QuitchanneltosignalterminationquitLocksync.RWMutex//Locktopreventdoublecloses//TestinghookssyncInitHookfunc(uint64,uint64)//MethodtocalluponinitiatinganewsyncrunbodyFetchHookfunc([]*types.Header)//MethodtocalluponstartingablockbodyfetchreceiptFetchHookfunc([]*types.Header)//MethodtocalluponstartingareceiptfetchchainInsertHookfunc([]*fetchResult)//Methodtocalluponinsertingachainofblocks(possiblyinmultipleinvocations)}构造方法//Newcreatesanewdownloadertofetchhashesandblocksfromremotepeers.funcNew(modeSyncMode,stateDbethdb.Database,mux*event.TypeMux,chainBlockChain,lightchainLightChain,dropPeerpeerDropFn)*Downloader{iflightchain==nil{lightchain=chain}dl:=&Downloader{mode:mode,stateDB:stateDb,mux:mux,queue:newQueue(),peers:newPeerSet(),rttEstimate:uint64(rttMaxEstimate),rttConfidence:uint64(1000000),blockchain:chain,lightchain:lightchain,dropPeer:dropPeer,headerCh:make(chandataPack,1),bodyCh:make(chandataPack,1),receiptCh:make(chandataPack,1),4/29bodyWakeCh:make(chanbool,1),receiptWakeCh:make(chanbool,1),headerProcCh:make(chan[]*types.Header,1),quitCh:make(chanstruct{}),stateCh:make(chandataPack),stateSyncStart:make(chan*stateSync),trackStateReq:make(chan*stateReq),}godl.qosTuner()//简单主要用来计算rttEstimate和rttConfidencegodl.stateFetcher()//启动stateFetcher的任务监听,但是这个时候还没有生成statefetcher的任务。returndl}##同步下载Synchronise试图和一个peer来同步,如果同步过程中遇到一些错误,那么会删除掉Peer。然后会被重试。//Synchronisetriestosyncupourlocalblockchainwitharemotepeer,both//addingvarioussanitychecksaswellaswrappingitwithvariouslogentries.func(d*Downloader)Synchronise(idstring,headcommon.Hash,td*big.Int,modeSyncMode)error{err:=d.synchronise(id,head,td,mode)switcherr{casenil:caseerrBusy:caseerrTimeout,errBadPeer,errStallingPeer,errEmptyHeaderSet,errPeersUnavailable,errTooOld,errInvalidAncestor,errInvalidChain:log.Warn(Synchronisationfailed,droppingpeer,peer,id,err,err)d.dropPeer(id)default:log.Warn(Synchronisationfailed,retrying,err,err)}returnerr}synchronise5/29//synchronisewillselectthepeeranduseitforsynchronising.Ifanemptystringisgiven//itwillusethebestpeerpossibleandsynchronizeifit'sTDishigherthanourown.Ifanyofthe//checksfailanerrorwillbereturned.Thismethodissynchronousfunc(d*Downloader)synchronise(idstring,hashcommon.Hash,td*big.Int,modeSyncMode)error{//Mockoutthesynchronisationiftestingifd.synchroniseMock!=nil{returnd.synchroniseMock(id,hash)}//Makesureonlyonegoroutineiseverallowedpastthispointa