云软件组陈修恒SPARK大数据处理引擎Spark一、ApacheSpark项目三、Spark技术架构四、Spark核心技术五、部署方式六、运行流程七、配置要求Hadoop生态系统Ambari(安装、部署、配置和管理工具)zookeeper分布式协作服务HBase(实时分布式数据库)Hive(数据仓库)Pig(数据流处理)Mahout(数据挖掘库)MapReduce(分布式计算框架)HDFS(分布式文件系统)Flume(日志收集工具)Sqoop(数据库ETL工具)ApacheHadoop项目CommonHDFS一个部署在廉价的机器上、具有高度容错性的文件系统YARN资源调度引擎MapReduce基于YARN调度引擎的大数据并行处理系统AYARN-basedsystemforparallelprocessingoflargedatasets.ApachSpark项目Spark是一个快速通用的大规模数据处理框架。具有Hadoop的批处理能力,而且性能更佳。可以用于流处理、Sql统计、机器学习和图计算。ApachSpark项目ApacheSparkTMisafastandgeneralengineforlarge-scaledataprocessingApacheSparkTM是一个快速、通用的大数据处理引擎ApacheSparkTM是HadoopMapReduce的改进版SparkVSHadoopMapReduceSparkHadoopMapRecuce架构Spark+RDDRDD:由Spark内部维护的、基于内存的分布式数据集MapReduce+HDFSHDFS:分布式文件系统工作量面向函数编程需要提供Map/Reduce函数。面向对象编程需要提供Map/Reduce类。数据处理RDD保存Map操作的结果,支持多次Map迭代。Map计算懒加载,用到时才发生计算Map、Reduce成对出现。Reduce结果落地后才能被下次Map使用故障处理多主多备集成HDFS不会有数据丢失,其他情况会有丢失情况;standalone启动模式Driver节点不能自动恢复,任务需要重新提交;依赖HDFS能快速恢复计算节点Spark技术架构Kafka/HDFS/TCP/Flume/ZeroMQ/MQTT/TwiterSparkRDDMapReduce函数式编程接口AmazonEC2/Mesos/YARN由Scala编写,支持函数式编程。支持多种数据源接入。RDD-弹性分布式数据集,Spark将数据分布到多台机器的内存中进行并行计算。Spark不具备集群管理能力,需要别的软件进行管理。支持流式运算,可以从kafka等数据源不断的获取数据,并按时间切片处理。Spark核心技术MapReduce编程模型SparkRDDSpark运行流程SparkTransformation&ActionSparkShuffleSparkStreamingSparkSQLSparkMllibSparkGraphXMapReduce编程模型任何运算都可以分解成Map(映射)和Reduce(归约)两类操作MapReduce编程模型词频统计tobeornottobeto:2be:2or:1not:1统计算法tobeornottobeMapReduce编程模型示例:词频统计to,be,or,not,to,be数据切割to,1,be,1,or,1,not,1,to,1,be,1构造运算单元to,2,be,2,or,1,not,1发生计算ReduceMapMapReduce代码预览to,be,or,not,to,beto,1,be,1,or,1,not,1,to,1,be,1to,2,be,2,or,1,not,1tobeornottobe输出结果MapReduce编程模型海量数据结算结果数据划分中间结果mapmapmapmap……(k1,val)(k2,val)(k2,val)(k1,val)(k2,val)(k3,val)(k1,val)(k2,val)(k3,val)aggregation&shufflereducereducereduce(k1,values)(k2,values)(k3,values)(K1,val)(K3,val)(K2,val)MapReduce编程模型任何运算都可以分解成Map(映射)和Reduce(归约)两类操作MapReduce编程模型任何运算都可以分解成Map(映射)和Reduce(归约)两类操作MapReduce系统数据划分和计算任务调度出错检测和恢复数据/代码互定位系统优化MapReduce的实现GoogleMapReduceHadoopMapReduceSparkSparkRDDRDD(ResilientDistributedDataset,弹性分布式数据集),他具高度的容错性,允许开发人员在大型集群上执行基于内存的计算。RDD是一个只读的分区存储集合。只能基于稳定物理存储中的数据集或在已有的RDD上执行转换命令(Transformation)来创建。RDD不需要物化。在创建RDD时Spark会维护转换算法。需要使用时,可以从物理存储的数据计算出最终的RDD。Spark操纵数据的一个高度抽象,是数据抽取和处理的基础。workerworkerworker——————Spark运行流程RDD(分布式数据集)第20/40页master注册任务注册任务clientsubmitdriverexecutorexecutor执行main作业解析生成Stage调度Task作业执行者接收Driver的LaunchTask命令可执行一个或多个Task作业执行者接收Driver的LaunchTask命令可执行多个Tasklaunchdriverlaunchexecutorlaunchexecutorworkerworkerworker——————Spark运行流程RDD(分布式数据集)第21/40页master注册任务注册任务clientsubmitdriverexecutorexecutor执行main作业解析生成Stage调度Task作业执行者接收Driver的LaunchTask命令可执行一个或多个Task作业执行者接收Driver的LaunchTask命令可执行多个TasklaunchdriverlaunchexecutorlaunchexecutorSparkTransformation&ActionTransformationsActions将一个已经存在的RDD中转换成一个新的RDD,所有的转换操作都是lazy执行的。一般用于对RDD中的元素进行实际的计算,然后返回相应的值。SparkShuffle1、每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。2、Mapper产生的结果会根据设置的partition算法填充到每个bucket中去。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中去。3、当Reducer启动时,它会根据自己task的id和所依赖的Mapper的id从远端或是本地的blockmanager中取得相应的bucket作为Reducer的输入进行处理。SparkStreaming持续的从输入源读取数据根据数据推送的时间,按时间段切片把切片包装成RDD,执行Map+Reduce计算通过RDD.collect()函数收集计算结果SparkStreaming设置批量处理频率:1s一次打开kafka输入SparkStreamingJavaStreamingContextjssc;jssc=newJavaStreamingContext(conf,Durations.milliseconds(1000));SparkStreamingJavaStreamingContextjssc;jssc=newJavaStreamingContext(conf,Durations.milliseconds(300));SparkStreamingJavaStreamingContextjssc;jssc=newJavaStreamingContext(conf,Durations.milliseconds(3));SparkStreaming不适合即时计算SparkSQL处理结构化数据把结构数据抽象成DataFrame工作方式:分布式SQL查询引擎SparkMllib机器学习库目标:简化机器学习过程,提供可扩展性提供基本的机器学习算法和功能,包括:•分类、•回归、•聚类、•协同过滤、•降维。提供底层优化提供管道化APISparkGraphX并行的图计算三、部署方式Standalone模式独立模式,自己负责资源调度。单点故障借助zookeeper实现。SparkOnMesosSpark运行在Mesos上,支持CPU非独占,资源由Mesos负责管理。SparkOnYarn资源由Yarn负责管理,最有前景的部署模式,支持动态添加资源。但是限于YARN自身发展,目前仅支持粗粒度模式。SparkOncloud如AWS的EC2,使用这种模式,访问Amazon的S3很方便。standalone模式(Stondalone模式)启动多个Master并注册到Zookeeper集群中,并保存状态。其中一个会被选为Leader,其余的保持Standby模式,当Leader故障,则选择另一个Master为Leader,并从Zookeeper中读取状态恢复。Master节点可动态添加或删除四、配置要求项目要求磁盘[官方推荐]4-8块普通磁盘,不需要RAID。内存[官方推荐]8GB即可。Spark建议需要提供至少75%的内存空间分配给Spark,至于其余的内存空间,则分配给操作系统与buffercache。网络建议使用10G及以上的网络带宽CPUSpark可以支持一台机器扩展至数十个CPUcore,它实现的是线程之间最小共享。若内存足够大,则制约运算性能的就是网络带宽与CPU数。五、关键术语SparkContextStage&JobDriver&ExecutorRDDShuffleStorageLevelBroadcastAccumulatorOptionalQ&A提问答疑