SPARKSHUFFLEINTRODUCTION天火@蘑菇街AboutMeSpark/Hadoop/Hbase/PhoenixcontributorForsparkmainlycontributesin:• Yarn• Shuffle• BlockManager• Scala2.10update• StandaloneHA• Variousotherfixes.tianhuo@mogujie.comWeibo@冷冻蚂蚁blog.csdn.net/colorantWhySparkisfast(er)• Whomdowecompareto?• Whatdowemeanbyfast?• fasttowrite• fasttorunWhySparkisfast(er)cont.• Butthefigureinpreviouspageissomehowmisleading.• Thekeyistheflexibleprogrammingmode.• Whichleadtomorereasonabledataflow.• WhichleadtolessIOoperation.• EspeciallyforiterativeheavyworkloadslikeML.• Whichpotentiallycutoffalotofshuffleoperationsneeded.• But,youwon’talwaysbelucky.• Manyapplogicdidneedtoexchangealotofdata.• Intheend,youwillstillneedtodealwithshuffle• Andwhichusuallyimpactperformancealot.Whatisshuffle7ShuffleoverviewAggregatorAggregatorAggregatorAggregatorAggregatorAggregatorHowdoesshufflecomeintothepicture• Sparkrunjobstagebystage.• StagesarebuildupbyDAGScheduleraccordingtoRDD’sShuffleDependency• e.g.ShuffleRDD/CoGroupedRDDwillhaveaShuffleDependency• ManyoperatorwillcreateShuffleRDD/CoGroupedRDDunderthehook.• Repartition/CombineByKey/GroupBy/ReduceByKey/cogroup• manyotheroperatorwillfurthercallintotheaboveoperators• e.g.variousjoinoperatorwillcallcogroup.• EachShuffleDependencymapstoonestageinSparkJobandthenwillleadtoashuffle.Soeveryoneshouldhaveseenthisbeforejoin union groupBy map Stage 3 Stage 1 Stage 2 A: B: C: D: E: F: G: whyshuffleisexpensive• Whendoingshuffle,datanolongerstayinmemoryonly• Forspark,shuffleprocessmightinvolve• datapartition:whichmightinvolveveryexpensivedatasortingworksetc.• dataser/deser:toenabledatabeentransferthroughnetworkoracrossprocesses.• datacompression:toreduceIObandwidthetc.• DISKIO:probablymultipletimesononesingledatablock• E.g.ShuffleSpill,MergecombineHistory• Spark0.6-0.7,samecodepathwithRDD’spersistentmethod,canchooseMEMORY_ONLYandDISK_ONLY(default).• Spark0.8-0.9:• separateshufflecodepathfromBMandcreateShuffleBlockManagerandBlockObjectWriteronlyforshuffle,nowshuffledatacanonlybewrittentodisk.• Shuffleoptimization:Consolidateshufflewrite.• Spark1.0,pluggableshuffleframework.• Spark1.1,sort-basedshuffleimplementation.• Spark1.2nettytransferservicereimplementation.sort-basedshufflebydefault• Spark1.2+onthego:externalshuffleserviceetc.LOOKINSIDE13PluggableShuffleFramework• ShuffleManager• Manageshufflerelatedcomponents,registeredinSparkEnv,configuredthroughSparkConf,defaultissort(pre1.2ishash),• ShuffleWriter• Handleshuffledataoutputlogics.WillreturnMapStatustobetrackedbyMapOutputTracker.• ShuffleReader• Fetchshuffledatatobeusedbye.g.ShuffleRDD• ShuffleBlockManager• Managethemappingrelationbetweenabstractbucketandmaterializeddatablock.HighleveldataflowBlockManagerHashShuffleManagerDiskBlockManagerFileShuffleBlockManagerLocalFileSystemSortShuffleManagerIndexShuffleBlockManagerGetBlockDataBlockTransferServiceGetBlockDataDirectmappingormappingbyFileGroupsMaptoOneDataFileandOneIndexFilepermapIdJustdoone-oneFilemapping15HashBasedShuffle-ShuffleWriter• BasicshufflewriterMap TaskMap TaskMap TaskMap TaskFileFileFileFileFileFileFileFileFileFileFileFileFileFileFileFileAggregatorAggregatorAggregatorAggregatorEachbucketismappingtoasinglefile16HashBasedShuffle-ShuffleWriter• ConsolidateShuffleWriterEachbucketismappingtoasegmentoffileAggregatorAggregatorAggregatorAggregator17HashBasedShuffle-ShuffleWriter• BasicShuffleWriter• M*Rshufflespillfiles• ConcurrentC*Ropenedshufflefiles.• Ifshufflespillenabled,couldgeneratemoretmpspillfilessayN.• ConsolidateShuffleWriter• ReducethetotalspilledfilesintoC*Rif(MC)• Concurrentopenedisthesameasthebasicshufflewriter.• Memoryconsumption• ThusConcurrentC*R+Nfilehandlers.• Eachfilehandlercouldtakeupto32~100KB+Memoryforvariousbuffersacrossthewriterstreamchain.18SortBasedShuffle-ShuffleWriter• SortShuffleWriterMap TaskMap TaskMap TaskMap TaskFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileFileFileFileFileFileFileFileExternalSorterExternalSorterExternalSorterExternalSorter19SortBasedShuffle-ShuffleWriter• Eachmaptaskgenerates1shuffledatafile+1indexfile• UtilizeExternalSortertodothesortworks.• Ifmap-sidecombineisrequired,datawillbesortedbykeyandpartitionforaggregation.Otherwisedatawillonlybesortedbypartition.• Ifreducernumber=200andnoneedtodoaggregationorordering,datawillnotbesortedatall.• Willgowithhashwayandspilltoseparatefilesforeachreducepartition,thenmergethemintoonepermapforfinaloutput.20HashBasedShuffle-ShuffleReader• Actually,atpresent,SortBasedShufflealsogowithHashShuffleReaderBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketReduce TaskReduce TaskReduce TaskReduce TaskReduce TaskAggregatorAggregatorAggregatorAggregatorBLOCKTRANSFERSERVICERelatedconceptions• BlockTransferService• ProvideageneralinterfaceforShuffleFetcherandworkingwithBlockDataManagertogetlocaldata.• ShuffleClient• Wrapupthefetchingda