Spark Shuffle Introduction

整理文档很辛苦,赏杯茶钱您下走!

免费阅读已结束,点击下载阅读编辑剩下 ...

阅读已结束,您可以下载文档离线阅读编辑

资源描述

SPARKSHUFFLEINTRODUCTION天火@蘑菇街AboutMeSpark/Hadoop/Hbase/PhoenixcontributorForsparkmainlycontributesin:• Yarn• Shuffle• BlockManager• Scala2.10update• StandaloneHA• Variousotherfixes.tianhuo@mogujie.comWeibo@冷冻蚂蚁blog.csdn.net/colorantWhySparkisfast(er)• Whomdowecompareto?• Whatdowemeanbyfast?• fasttowrite• fasttorunWhySparkisfast(er)cont.• Butthefigureinpreviouspageissomehowmisleading.• Thekeyistheflexibleprogrammingmode.• Whichleadtomorereasonabledataflow.• WhichleadtolessIOoperation.• EspeciallyforiterativeheavyworkloadslikeML.• Whichpotentiallycutoffalotofshuffleoperationsneeded.• But,youwon’talwaysbelucky.• Manyapplogicdidneedtoexchangealotofdata.• Intheend,youwillstillneedtodealwithshuffle• Andwhichusuallyimpactperformancealot.Whatisshuffle7ShuffleoverviewAggregatorAggregatorAggregatorAggregatorAggregatorAggregatorHowdoesshufflecomeintothepicture• Sparkrunjobstagebystage.• StagesarebuildupbyDAGScheduleraccordingtoRDD’sShuffleDependency• e.g.ShuffleRDD/CoGroupedRDDwillhaveaShuffleDependency• ManyoperatorwillcreateShuffleRDD/CoGroupedRDDunderthehook.• Repartition/CombineByKey/GroupBy/ReduceByKey/cogroup• manyotheroperatorwillfurthercallintotheaboveoperators• e.g.variousjoinoperatorwillcallcogroup.• EachShuffleDependencymapstoonestageinSparkJobandthenwillleadtoashuffle.Soeveryoneshouldhaveseenthisbeforejoin  union  groupBy  map  Stage  3  Stage  1  Stage  2  A:  B:  C:  D:  E:  F:  G:  whyshuffleisexpensive• Whendoingshuffle,datanolongerstayinmemoryonly• Forspark,shuffleprocessmightinvolve• datapartition:whichmightinvolveveryexpensivedatasortingworksetc.• dataser/deser:toenabledatabeentransferthroughnetworkoracrossprocesses.• datacompression:toreduceIObandwidthetc.• DISKIO:probablymultipletimesononesingledatablock• E.g.ShuffleSpill,MergecombineHistory• Spark0.6-0.7,samecodepathwithRDD’spersistentmethod,canchooseMEMORY_ONLYandDISK_ONLY(default).• Spark0.8-0.9:• separateshufflecodepathfromBMandcreateShuffleBlockManagerandBlockObjectWriteronlyforshuffle,nowshuffledatacanonlybewrittentodisk.• Shuffleoptimization:Consolidateshufflewrite.• Spark1.0,pluggableshuffleframework.• Spark1.1,sort-basedshuffleimplementation.• Spark1.2nettytransferservicereimplementation.sort-basedshufflebydefault• Spark1.2+onthego:externalshuffleserviceetc.LOOKINSIDE13PluggableShuffleFramework• ShuffleManager• Manageshufflerelatedcomponents,registeredinSparkEnv,configuredthroughSparkConf,defaultissort(pre1.2ishash),• ShuffleWriter• Handleshuffledataoutputlogics.WillreturnMapStatustobetrackedbyMapOutputTracker.• ShuffleReader• Fetchshuffledatatobeusedbye.g.ShuffleRDD• ShuffleBlockManager• Managethemappingrelationbetweenabstractbucketandmaterializeddatablock.HighleveldataflowBlockManagerHashShuffleManagerDiskBlockManagerFileShuffleBlockManagerLocalFileSystemSortShuffleManagerIndexShuffleBlockManagerGetBlockDataBlockTransferServiceGetBlockDataDirectmappingormappingbyFileGroupsMaptoOneDataFileandOneIndexFilepermapIdJustdoone-oneFilemapping15HashBasedShuffle-ShuffleWriter• BasicshufflewriterMap  TaskMap  TaskMap  TaskMap  TaskFileFileFileFileFileFileFileFileFileFileFileFileFileFileFileFileAggregatorAggregatorAggregatorAggregatorEachbucketismappingtoasinglefile16HashBasedShuffle-ShuffleWriter• ConsolidateShuffleWriterEachbucketismappingtoasegmentoffileAggregatorAggregatorAggregatorAggregator17HashBasedShuffle-ShuffleWriter• BasicShuffleWriter• M*Rshufflespillfiles• ConcurrentC*Ropenedshufflefiles.• Ifshufflespillenabled,couldgeneratemoretmpspillfilessayN.• ConsolidateShuffleWriter• ReducethetotalspilledfilesintoC*Rif(MC)• Concurrentopenedisthesameasthebasicshufflewriter.• Memoryconsumption• ThusConcurrentC*R+Nfilehandlers.• Eachfilehandlercouldtakeupto32~100KB+Memoryforvariousbuffersacrossthewriterstreamchain.18SortBasedShuffle-ShuffleWriter• SortShuffleWriterMap  TaskMap  TaskMap  TaskMap  TaskFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileSegmentFileFileFileFileFileFileFileFileExternalSorterExternalSorterExternalSorterExternalSorter19SortBasedShuffle-ShuffleWriter• Eachmaptaskgenerates1shuffledatafile+1indexfile• UtilizeExternalSortertodothesortworks.• Ifmap-sidecombineisrequired,datawillbesortedbykeyandpartitionforaggregation.Otherwisedatawillonlybesortedbypartition.• Ifreducernumber=200andnoneedtodoaggregationorordering,datawillnotbesortedatall.• Willgowithhashwayandspilltoseparatefilesforeachreducepartition,thenmergethemintoonepermapforfinaloutput.20HashBasedShuffle-ShuffleReader• Actually,atpresent,SortBasedShufflealsogowithHashShuffleReaderBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketBucketReduce  TaskReduce  TaskReduce  TaskReduce  TaskReduce  TaskAggregatorAggregatorAggregatorAggregatorBLOCKTRANSFERSERVICERelatedconceptions• BlockTransferService• ProvideageneralinterfaceforShuffleFetcherandworkingwithBlockDataManagertogetlocaldata.• ShuffleClient• Wrapupthefetchingda

1 / 33
下载文档,编辑使用

©2015-2020 m.777doc.com 三七文档.

备案号:鲁ICP备2024069028号-1 客服联系 QQ:2149211541

×
保存成功