Spark实战,第1部分:使用Scala语言开发Spark应用程序本文旨在通过具有实际意义的案例向读者介绍如何使用Scala语言开发Spark应用程序并在Spark集群上运行。本文涉及的所有源数据都将从HDFS(HadoopDistributedFileSystem)读取,部分案例的输出结果也会写入到HDFS,所以通过阅读本文,读者也会学习到Spark和HDFS交互的一些知识。查看本系列更多内容|3评论:王龙,软件开发工程师,IBM2015年7月21日内容在IBMBluemix云平台上开发并部署您的下一个应用。开始您的试用引言在当前这个信息时代里,大数据所蕴含的价值已经被绝大多数的企业所认知。在IT的世界里,往往都是需求驱动技术的发展和革新。Hadoop在这个大背景下应运而生,它给我们提供了一个存储和处理大数据的良好的解决方案,短短的几年时间里,它已无处不在,事实上它已经成了大数据技术的代名词。然而在人们越来越多的使用Hadoop提供的MapReduce框架处理大数据的时候,却发现它存在许多天生的缺陷,如效率低,编程模型不够灵活,只适合做离线计算等。Spark的出现无疑让诸多大数据计算的从业者和爱好者眼前一亮,它基于内存,并且提供了更加丰富的算子使得我们可以更高效和灵活的处理大数据。本文将从实例出发,向读者介绍如何使用Scala语言(Spark框架的开发语言)开发Spark应用程序并且将其运行在Spark集群环境里。本文假设读者已经对Spark基本原理和编程模型有了基本的了解,并且已经掌握了Scala语言开发的基础知识,那么通过阅读本文,相信您一定会对Spark应用程序的开发有更深入的认识。接下来,就让我们开始Spark应用程序的开发之旅吧。回页首关于SparkSpark由加州大学伯克利分校AMP实验室(Algorithms,Machines,andPeopleLab)开发,可用来构建大型的、低延迟的大数据处理的应用程序。并且提供了用于机器学习(MLlib),流计算(Streaming),图计算(GraphX)等子模块,最新的1.4.0版本更是提供了与R语言的集成,这使得Spark几乎成为了多领域通吃的全能技术。Spark对数据的存储,转换,以及计算都是基于一个叫RDD(ResilientDistributedDataset)分布式内存的抽象,应用程序对需要计算的数据的操作都是通过对RDD的一系列转化(Transformation)和动作(Action)算子完成的,其中转化算子可以把一个RDD转成另一个RDD,如filter算子可以通过添加过滤条件生成一个只包含符合条件的数据的新的RDD。动作算子负责完成最终的计算,如count算子可以计算出整个RDD表示的数据集中元素的个数。关于Spark所支持的算子以及使用方法请参考Spark官方网站。本文所使用的Spark的发行版是1.3.1,读者可根据需要下载相应的版本。回页首关于ScalaScala语言是一门类Java的多范式语言,其设计初衷就是为了继承函数式编程的面向对象编程的各种特性,正如Scala语言官网描述的那样:Object-OrientedMeetsFunctional,就是给出了一个关于Scala语言特性的最简单明了的概括。Spark框架使用Scala语言开发,那么使用Scala语言开发Spark应用程序就变成一件很自然的事情,虽然Spark提供了面向Python,Java等语言的编程接口,但是从各个方面来看使用Scala编程都是最简单最容易理解的,特别是当程序出现异常或者是需要通过学习源码来定位问题时,您会发现学习Scala语言来编写Spark应用程序是多么有意义的事情。关于Scala语言,如果您还没有基础,请参考Scala语言官网Scala中文网Twitter提供的Scala课堂面向Java开发人员的Scala指南系列由于Spark1.3.1版本使用的是Scala2.10.x版本,所以本文将使用Scala2.10.5版本。回页首搭建开发环境1.安装ScalaIDE搭建Scala语言开发环境很容易,ScalaIDE官网下载合适的版本并解压就可以完成安装,本文使用的版本是4.1.0。2.安装Scala语言包如果下载的ScalaIDE自带的Scala语言包与Spark1.3.1使用的Scala版本(2.10.x)不一致,那么就需要下载和本文所使用的Spark所匹配的版本,以确保实现的Scala程序不会因为版本问题而运行失败。请下载并安装Scala2.10.5版本3.安装JDK如果您的机器上没有安装JDK,请下载并安装1.6版本以上的JDK。4.创建并配置Spark工程打开ScalaIDE,创建一个名称为spark-exercise的Scala工程。图1.创建scala工程在工程目录下创建一个lib文件夹,并且把您的Spark安装包下的spark-assemblyjar包拷贝到lib目录下。图2.Spark开发jar包并且添加该jar包到工程的classpath并配置工程使用刚刚安装的Scala2.10.5版本.,工程目录结构如下。图3.添加jar包到classpath回页首运行环境介绍为了避免读者对本文案例运行环境产生困惑,本节会对本文用到的集群环境的基本情况做个简单介绍。本文所有实例数据存储的环境是一个8个机器的Hadoop集群,文件系统总容量是1.12T,NameNode叫hadoop036166,服务端口是9000。读者可以不关心具体的节点分布,因为这个不会影响到您阅读后面的文章。本文运行实例程序使用的Spark集群是一个包含四个节点的Standalone模式的集群,其中包含一个Master节点(监听端口7077)和三个Worker节点,具体分布如下:ServerNameRolehadoop036166Masterhadoop036187Workerhadoop036188Workerhadoop036227WorkerSpark提供一个WebUI去查看集群信息并且监控执行结果,默认地址是:,对于该实例提交后我们也可以到web页面上去查看执行结果,当然也可以通过查看日志去找到执行结果。图4.Spark的webconsole回页首案例分析与编程实现案例一a.案例描述提起WordCount(词频数统计),相信大家都不陌生,就是统计一个或者多个文件中单词出现的次数。本文将此作为一个入门级案例,由浅入深的开启使用Scala编写Spark大数据处理程序的大门。b.案例分析对于词频数统计,用Spark提供的算子来实现,我们首先需要将文本文件中的每一行转化成一个个的单词,其次是对每一个出现的单词进行记一次数,最后就是把所有相同单词的计数相加得到最终的结果。对于第一步我们自然的想到使用flatMap算子把一行文本split成多个单词,然后对于第二步我们需要使用map算子把单个的单词转化成一个有计数的Key-Value对,即word-(word,1).对于最后一步统计相同单词的出现次数,我们需要使用reduceByKey算子把相同单词的计数相加得到最终结果。c.编程实现清单1.SparkWordCount类源码importorg.apache.spark.SparkConfimportorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._objectSparkWordCount{defFILE_NAME:String=word_count_results_;defmain(args:Array[String]){if(args.length1){println(Usage:SparkWordCountFileName);System.exit(1);}valconf=newSparkConf().setAppName(SparkExercise:SparkVersionWordCountProgram);valsc=newSparkContext(conf);valtextFile=sc.textFile(args(0));valwordCounts=textFile.flatMap(line=line.split()).map(word=(word,1)).reduceByKey((a,b)=a+b)//printtheresults,fordebuguse.//println(WordCountprogramrunningresults:);//wordCounts.collect().foreach(e={//val(k,v)=e//println(k+=+v)//});wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());println(WordCountprogramrunningresultsaresuccessfullysaved.);}}d.提交到集群执行本实例中,我们将统计HDFS文件系统中/user/fams目录下所有txt文件中词频数。其中spark-exercise.jar是Spark工程打包后的jar包,这个jar包执行时会被上传到目标服务器的/home/fams目录下。运行此实例的具体命令如下:清单2.SparkWordCount类执行命令./spark-submit\--classcom.ibm.spark.exercise.basic.SparkWordCount\--masterspark://hadoop036166:7077\--num-executors3\--driver-memory6g--executor-memory2g\--executor-cores2\/home/fams/sparkexercise.jar\hdfs://hadoop036166:9000/user/fams/*.txte.监控执行状态该实例把最终的结果存储在了HDFS上,那么如果程序运行正常我们可以在HDFS上找到生成的文件信息图5.案例一输出结果打开Spark集群的WebUI,可以看到刚才提交的job的执行结果。图6.案例一完成状态如果程序还没运行完成,那么我们可以在RunningApplications列表里找到它。案例二a.案例描述该案例中,我们将假设我们需要统计一个1000万人口的所有人的平均年龄,当然如果您想测试Spark对于大数据的处理能力,您可以把人口数放的更大,比如1亿人口,当然这个取决于测试所用集群的存储容量。假设这些年龄信息都存储在一个文件里,并且该文件的格式如下,第一列是ID,第二列是年龄。图7.案例二测试数据格式预览现在我们需要用Scala写一个生成1000万人口年龄数据的文件,源程序如下:清单3.年龄信息文件生成类源码importjava.io.FileWriterimportjava.io.Fileimportscala.util.RandomobjectSampleDataFileGenerator{defmain(args:Array[String]){valwriter=newFileWriter(newFile(C:\\sample_age_data.txt),false)valrand=newRandom()for(i-1to10000000){writer.write(i++rand.nextInt(100))writer.write(System.getProperty(line.separator))}writer.flush()writer.close()}}b.案例分析要计算平均年龄,那么首先需要对源文件对应的RDD进行处理,也就是将它转化成一个只包含年龄信息的RDD,其次是计算元素个数即为总人数,然后是把所有年龄数加起来,最后平均年龄=总年龄/人数。对于第一步