spark技术探秘技术部Spark是什么Spark是一个基于内存计算的开源的集群(分布式)计算系统Spark非常小巧玲珑,由加州伯克利大学AMP实验室的Matei为主的小团队所开发。使用的语言是Scala,项目的core部分的代码只有63个Scala文件,非常短小精悍。由于是基于内存计算,效率要高于拥有Hadoop,Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,节省了磁盘IO耗时,号称性能比Hadoop快100倍。Spark特性容错性:在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpointdata,一个是loggingtheupdates。用户可以控制采用哪种方式来实现容错。通用性:Spark通过提供丰富的Scala,Java,PythonAPI及交互式Shell来提高可用性。Spark架构生态体系Shark(SQL)shark与hive对比Bagel(Pregelonspark):Bagel是基于Spark的轻量级的Pregel(Pregel是Google鼎鼎有名的图计算框架)的实现。Shark(HiveonSpark)Shark是在Spark的框架基础上提供和Hive一样的HiveQL命令接口,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。Shark通过UDF实现特定的数据分析算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用Sparkstreaming流处理系统,实时计算系统构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。SparkStreaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。MLlib(machinelearninglibrary)机器学习库:提供高质量的算法,比MapReduce快100倍高性能:Spark擅长迭代计算,这可以使mllib运行的更快,另外,MLlib也包含高效的算法,利用spark的迭代优势,从而产生百倍效果MLlib(machinelearninglibrary)易部署:如果你有一个hadoop2集群,你可以在没有任何预装的情况下运行spark和MLlib。另外,spark也可以运行standalone或EC2或Mesos。可以读取hdfs、hbase或任何一个hadoop的数据源。RDDRDD(ResilientDistributedDataset,弹性分布式数据集)RDD是Spark操纵数据的一个高度抽象,即Spark所操作的数据集都是包装成RDD来进行操作的,例如Spark可以兼容处理Hadoop的HDFS数据文件,那么这个HDFS数据文件就是包装成Spark认识的RDD来完成数据抽取和处理的RDD是Spark的一个核心抽象,Spark的数据操作都是基于RDD来完成。MapReduceMR是Spark可以支撑的运算模式,比传统的HadoopMR的性能更好,并且操作集更加丰富。Spark的MR计算引擎的架构图FP函数式编程:functionalprogrammingGraphX(graph)图计算:alpha版本,已并入spark,暂时不做了解Spark运行模式Standalone模式即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。从一定程度上说,该模式是其他两种的基础。各个节点上的资源被抽象成粗粒度的slot,有多少slot就能同时运行多少task。不同的是,MapReduce将slot分为mapslot和reduceslot,它们分别只能供MapTask和ReduceTask使用,而不能共享,这是MapReduce资源利率低效的原因之一,而Spark则更优化一些,它不区分slot类型,只有一种slot,可以供各种类型的Task使用,这种方式可以提高资源利用率,但是不够灵活,不能为不同类型的Task定制slot资源。总之,这两种方式各有优缺点。SparkOnMesos模式SparkOnMesos模式这是很多公司采用的模式,官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。SparkOnYARN模式SparkOnYARN模式这是一种最有前景的部署模式。但限于YARN自身的发展,目前仅支持粗粒度模式(Coarse-grainedMode)。这是由于YARN上的Container资源是不可以动态伸缩的,一旦Container启动之后,可使用的资源不能再发生变化,不过这个已经在YARN计划(具体参考:)中了。Spark的适用场景Spark立足于内存计算,从而不再需要频繁的读写HDFS,这使得Spark能更好的适用于:迭代算法,包括大部分机器学习算法MachineLearning和比如PageRank的图形算法。交互式数据挖掘,用户大部分情况都会大量重复的使用导入RAM的数据(R、Excel、python)需要持续长时间维护状态聚合的流式计算。Spark数据的存储Spark支持多种数据底层存储,这点比Hadoop支持的数据文件格式广泛的多。Spark可以兼容HDFS,Hbase,AmazonS3等多种数据集,将这些数据集封装成RDD进行操作Spark核心概念RDDResilientDistributedDataset(RDD)弹性分布式数据集1、有一个分片列表。就是能被切分,和hadoop一样的,能够切分的数据才能并行计算。2、有一个函数计算每一个分片,afunctionforcomputingeachsplit每一个split调用一次map函数3、对其他的RDD的依赖列表,也就是rdd演变,进化。依赖分为宽依赖和窄依赖,并不是所有的RDD都有依赖。4、可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce。5、可选:为每一个分片的分配最优先计算位置(preferredlocations),比如HDFS的block的所在位置应该是优先计算的位置。集群配置Spark-env.sh配置文件ExportJAVA_HOME=/usr/local/jdkExportSPARK_MASTER_IP=192.168.1.177ExportSPARK_WORKER_CORES=1ExportSPARK_WORKER_INSTANCES=1ExportWORK_MEMORY=8gExportSPARK_MASTER_PORT=7077ExportSPARK_JAVA_OPTS=“-verbose:gc–XX:-PrintGCDetails–XX:+PrintGCTimeStamp”集群配置SlavesXx.xx.xx.2Xx.xx.xx.3Xx.xx.xx.4Xx.xx.xx.5流程示意针对RDD的操作Ps:RDD可以从集合直接转换而来,也可以从现存的任何HadoopInputFormat而来,亦或者是Hbase等等。加载数据集Action触发执行缓存策略classStorageLevelprivate(privatevaruseDisk_:Boolean,privatevaruseMemory_:Boolean,privatevardeserialized_:Boolean,privatevarreplication_:Int=1)缓存策略valNONE=newStorageLevel(false,false,false)valDISK_ONLY=newStorageLevel(true,false,false)valDISK_ONLY_2=newStorageLevel(true,false,false,2)valMEMORY_ONLY=newStorageLevel(false,true,true)Cache默认transformation&actionFirstDemoLines=sc.textFile(“hdfs://…”)//加载进来成为RDDErrors=lines.filter(_.startsWith(“ERROR”))//transformationErrors.persist()//缓存RDDMysql_errors=errors.fileter(_.contains(“MySQL”)).count//actionhttp_errors=errors.filter(_.contains(“Http”)).countTestCase启动spark:spark-shellvalrdd=sc.parallelize(List(1,2,3,4,5,6).map(2*_)).filter(_5).collectsc:sparkContext,在spark启动时已被实例化加载数据加载hdfs上的数据:sc.textFile(“hdfs://192.168.1.177:9000/tmp/SogouLabDic.dic”)加载本地数据:valrdd=sc.textFile(“/usr/local/SogouLabDic.dic”)结束Thankyou!2014年6月25日更多大数据技术知识请访问:宋亚飞.中国