Spark知识分享Spark简介SparkSQL简介SparkStreaming简介Spark应用场景Spark知识分享3•Hadoop在2003年从Nutch发展到Lucene,在Yahoo成长,进入Apache孵化,2008年获得大量使用。但一直存在MR算法少、每次Reduce都需要磁盘读写、MR需要成对出现、Master节点调度慢、单节点等等问题。•Spark2007年在Yahoo起步,用于改善MR算法。2009年独立为一个项目,2010年开源,2013年进入Apache孵化。被称为以下一代计算平台。•Berkeley大学成为大数据技术中心,BerkeleyDataAnalysisStack(BDAS)逐步形成大数据平台。大数据架构的演进4Spark堆栈5从实例开始,漫游Spark内核lines=spark.textFile(“hdfs://...”)errors=lines.filter(_.startsWith(“ERROR”))messages=errors.map(_.split(‘\t’)(2))cachedMsgs=messages.cache()Block1Block2Block3WorkerWorkerWorkerDrivercachedMsgs.filter(_.contains(“foo”)).countcachedMsgs.filter(_.contains(“bar”)).count...tasksresultsCache1Cache2Cache3BaseRDDTransformedRDDActionResult:full-textsearchofWikipediain1sec(vs20secforon-diskdata)Result:scaledto1TBdatain5-7sec(vs170secforon-diskdata)•基于Hadoop的HDFS,Spark采用Driver、Worker的主从结构,由Driver节点调度,负责任务分配、资源安排、结果汇总、容错等处理。Worker节点主要是存放数据和进行计算。•第一次从外设读取数据,之后主要在内存计算。•案例中涉及到RDD、Tranformation、Action等操作,从中可以发现,处理方式是MR+SQL语法,包括map、reduce、count、groupby、join、union等6Spark和MapReduce比较iter.1iter.2...InputHDFSreadHDFSwriteHDFSreadHDFSwriteInputquery1query2query3result1result2result3...HDFSread•MapReduce每次读写,都需要序列化到磁盘。一个复杂任务,需要多次处理,几十次磁盘读写。•Spark只需要一次磁盘读写,大部分处理在内存中进行。SparkMapRI/Oandserializationcantake90%ofthetimeCacheCacheCacheTransfT..2Action7RDD操作–ResilientDistributedDataset所有的操作都是针对RDD,类似于MPPDB的技术实现:分布、并行、内存计算和压缩。优于MPP点在于毫秒级的调度,适用于复杂计算;逊于MPP点在于数据处理没有SQL方便和强大。•RDD是一个数据集,不可改变,分布在集群上;通过DAG来实现自动数据恢复;支持内存物化(Cache)和硬盘物化(Checkpoint),来保存中间结果;8Spark如何组成分布式网络YourapplicationSparkContextClustermanagerWorkerSparkexecutorHDFSorotherstorageWorkerSparkexecutorsc=newSparkContextf=sc.textFile(“…”)f.filter(…).count()...YourprogramSparkclient(appmaster)SparkworkerHDFS,HBase,…BlockmanagerTaskthreadsRDDgraphSchedulerBlocktrackerShuffletrackerClustermanager•RDDgraph记录各个RDD的来源;Scheduler进行快速调度;Blocktracker跟踪HDFS块位置;Shuffle记录RDD之间的数据分发。Cluster采用Yarn等产品。Task在线程上执行。磁盘数据本地化,内存数据本地化,计算本地化。9调度过程--DAGDirectAcyclicGraph有向无环图rdd1.join(rdd2).groupBy(…).filter(…)RDDObjectsbuildoperatorDAGagnostictooperators!doesn’tknowaboutstagesDAGSchedulersplitgraphintostagesoftaskssubmiteachstageasreadyDAGTaskSchedulerTaskSetlaunchtasksviaclustermanagerretryfailedorstragglingtasksClustermanagerWorkerexecutetasksstoreandserveblocksBlockmanagerThreadsTaskstagefailed•容错:Driver中记录每个RDD生成的图,在RDD失效的时候,能够根据这个链条,重新生成出RDD,确保所有的RDD都是可以再生。RDD在内存中做Partition,作为备份。•延迟调度:RDD包括Tranform、Action两类操作,只有在Action的时候才处理数据,成为延迟调度,是一种聪敏的方法。•包含FIFO和FairSchedule两种调度算法。Spark简介SparkSQL简介SparkStreaming简介Spark应用场景Spark知识分享11Shark/SparkSQL和Hive关系MetastoreHDFSClientDriverSQLParserQueryOptimizerPhysicalPlanExecutionCLIJDBCMapReducePhysicalPlanExecutionSparkCacheMgr.•Shark在Hive的架构基础上,改写了“内存管理”、“执行计划”和“执行模块”三个模块,使HQL从MapReduce转到Spark上。SparkSQL沿袭了Shark的架构,在原有架构上,重写了优化部分,并增加了RDD-Awareoptimizer和多语言接口。catalyst12SparkSQL的数据处理过程•支持Scala、Java和Python三种语言。•支持SQL-92规范和HQL增加了SchemaRDD,读取JSON、Nosql、RDBMS和HDFS数据。继续兼容Hive和Shark。13SparkSQL14SparkSQL的程序实例•Scala语言的案例,定义一个Person类型的schema,读取一个文本文件,将读取的值赋值给Person最终赋给people对象,通过resisterAsTable转换成表,供SQL使用•后台是将文本文件变成一个RDD,通过Spark来计算得出SQL的查询结果Shark只能通过HQL来操作数据,而sparksql即可以使用RDD,并提供了schema功能。Spark简介SparkSQL简介SparkStreaming简介Spark应用场景Spark知识分享16离散的流处理•1、不同于一般流处理软件,SparkStreaming采用一系列毫秒级的批量处理,实现快速计算。•2、将一个需要处理的任务,转化为多个RDD计算,运行在Spark上。Batchsizesaslowas½second,latencyofabout1secondSparkSparkStreamingbatchesofXsecondslivedatastreamprocessedresultsChopupthelivestreamintobatchesofXsecondsSparktreatseachbatchofdataasRDDsandprocessesthemusingRDDoperationsFinally,theprocessedresultsoftheRDDoperationsarereturnedinbatches17Streaming编程•1、不同于一般流处理软件,SparkStreaming采用一系列毫秒级的批量处理,实现快速计算。•2、将一个需要处理的任务,转化为多个RDD计算,运行在Spark上。valtweets=ssc.twitterStream()DStream:asequenceofRDDsrepresentingastreamofdatabatch@t+1batch@tbatch@t+2tweetsDStreamstoredinmemoryasanRDD(immutable,distributed)TwitterStreamingAPI18离散的流处理•1、不同于一般流处理软件,SparkStreaming采用一系列毫秒级的批量处理,实现快速计算。•2、将一个需要处理的任务,转化为多个RDD计算,运行在Spark上。valtweets=ssc.twitterStream()valhashTags=tweets.flatMap(status=getTags(status))flatMapflatMapflatMap…transformation:modifydatainoneDStreamtocreateanotherDStreamnewDStreamnewRDDscreatedforeverybatchbatch@t+1batch@tbatch@t+2tweetsDStreamhashTagsDstream[#cat,#dog,…]19离散的流处理•1、不同于一般流处理软件,SparkStreaming采用一系列毫秒级的批量处理,实现快速计算。•2、将一个需要处理的任务,转化为多个RDD计算,运行在Spark上。valtweets=ssc.twitterStream()valhashTags=tweets.flatMap(status=getTags(status))hashTags.saveAsHadoopFiles(hdfs://...)outputoperation:topushdatatoexternalstorageflatMapflatMapflatMapsavesavesavebatch@t+1batch@tbatch@t+2tweetsDStreamhashTagsDStreameverybatchsavedtoHDFS20离散的流处理•1、不同于一般流处理软件,SparkStreaming采用一系列毫秒级的批量处理,实现快速计算。•2、将一个需要处理的任务,转化为多个RDD计算,运行在Spark上。valtweets=ssc.twitterStream()valhashTags=tweets.flatMap(status=getTags(status))hashTags.foreach(hashTagRDD={...})foreach:dowhateveryouwantwiththeprocesseddataflatMapflatMapflatMapforeachforeachforeachbatch@t+1batch@tbatch@t+2tweetsDStreamhashTagsDStreamWritetodatabase,updateanalyticsUI,dowhateveryouwantSpark简介SparkSQL简介SparkStreaming简介Spark应用场景Spark知识分享