CHAR02 算法

整理文档很辛苦,赏杯茶钱您下走!

免费阅读已结束,点击下载阅读编辑剩下 ...

阅读已结束,您可以下载文档离线阅读编辑

资源描述

全国高校标准教材《云计算》姊妹篇,剖析大数据核心技术和实战应用大数据刘鹏主编张燕张重生张志立副主编BIGDATA刘鹏教授,清华大学博士。现任南京大数据研究院院长、中国信息协会大数据分会副会长、中国大数据技术与应用联盟副理事长。主持完成科研项目25项,发表论文80余篇,出版专业书籍15本。获部级科技进步二等奖4项、三等奖4项。主编的《云计算》被全国高校普遍采用,被引用量排名中国计算机图书第一名。创办了知名的中国云计算(chinacloud.cn)和中国大数据(thebigdata.cn)网站。曾率队夺得2002PennySort国际计算机排序比赛冠军,两次夺得全国高校科技比赛最高奖,并三次夺得清华大学科技比赛最高奖。荣获“全军十大学习成才标兵”(排名第一)、南京“十大杰出青年”、江苏省中青年科学技术带头人、清华大学“学术新秀”等称号。第二章数据采集与预处理2.12.2数据预处理原理2.3数据仓库与ETL工具习题全国高校标准教材《云计算》姊妹篇,剖析大数据核心技术和实战应用大数据采集架构of4232.1大数据采集架构第二章数据采集与预处理如今,社会中各个机构、部门、公司、团体等正在实时不断地产生大量的信息,这些信息需要以简单的方式进行处理,同时又要十分准确且能迅速满足各种类型的数据(信息)需求者。这给我们带来了许多挑战,第一个挑战就是在大量的数据中收集需要的数据,下面介绍常用的大数据采集工具。of4242.1.1概述2.1大数据采集架构第二章数据采集与预处理FlumeChukwaScribleKafka大数据采集工具of4252.1.2常用大数据采集工具数据采集最传统的方式是企业自己的生产系统产生的数据,除上述生产系统中的数据外,企业的信息系统还充斥着大量的用户行为数据、日志式的活动数据、事件信息等,越来越多的企业通过架设日志采集系统来保存这些数据,希望通过这些数据获取其商业或社会价值。2.1大数据采集架构第二章数据采集与预处理of426在Flume中,外部输入称为Source(源),系统输出称为Sink(接收端)。Channel(通道)把Source和Sink链接在一起。ApacheChukwa项目与Flume有些相类似,Chukwa继承了Hadoop的伸缩性和鲁棒性。也内置一个功能强大的工具箱,用于显示系统监控和分析结果。互联网时代,网络爬虫也是许多企业获取数据的一种方式。Nutch就是网络爬虫中的娇娇者,Nutch是Apache旗下的开源项目,存在已经超过10年,拥有大量的忠实用户。Flume体系架构2.1大数据采集架构第二章数据采集与预处理of4272.1.3ApacheKafka数据采集ApacheKafka被设计成能够高效地处理大量实时数据,其特点是快速的、可扩展的、分布式的,分区的和可复制的。Kafka是用Scala语言编写的,虽然置身于Java阵营,但其并不遵循JMS规范。Topics(话题):消息的分类名。Producers(消息发布者):能够发布消息到Topics的进程。Consumers(消息接收者):可以从Topics接收消息的进程。Broker(代理):组成Kafka集群的单个节点。基本Kafka集群的工作流程2.1大数据采集架构第二章数据采集与预处理of4281、TopicsTopics是消息的分类名(或Feed的名称)。Kafka集群或Broker为每一个Topic都会维护一个分区日志。每一个分区日志是有序的消息序列,消息是连续追加到分区日志上,并且这些消息是不可更改的。2、日志区分一个Topic可以有多个分区,这些分区可以作为并行处理的单元,从而使Kafka有能力高效地处理大量数据。Topics与日志分析2.1大数据采集架构第二章数据采集与预处理of4293、ProducersProducers是向它们选择的主题发布数据。生产者可以选择分配某个主题到哪个分区上。这可以通过使用循环的方式或通过任何其他的语义分函数来实现。4、ConsumersKafka提供一种单独的消费者抽象,此抽象具有两种模式的特征消费组:Queuing和Publish-Subscribe。5、ApacheKafka的安装及使用因为Kafka是处理网络上请求,所以,应该为其创建一个专用的用户,这将便于对Kafka相关服务的管理,减少对服务器上其他服务的影响。2.1大数据采集架构第二章数据采集与预处理of4210使用useradd命令来创建一个Kafka用户:$sudouseraddkafka–m使用passwd命令来设置其密码:$sudopasswdkafaka接下来把kafaka用户添加到sudo管理组,以便kafaka用户具有安装ApacheKafka依赖库的权限。这里使用adduser命令来进行添加:$sudoadduserkafkasudo这时就可以使用kafka账户了。切换用户可以使用su命令:$su-kafka在ApacheKafka安装所依赖的软件包前,最好更新一下apt管理程序的软件列表:$sudoapt-getupdateApacheKafka需要Java运行环境,这里使用apt-get命令安装default-jre包,然后安装Java运行环境:$sudoapt-getinstalldefault-jre通过下面的命令测试一下Java运行环境是否安装成功,并查看Java的版本信息:$java-version2.1大数据采集架构第二章数据采集与预处理of4211机器有如下显示:2.1大数据采集架构第二章数据采集与预处理of42122.1大数据采集架构第二章数据采集与预处理of42132.1大数据采集架构第二章数据采集与预处理of42142.1大数据采集架构第二章数据采集与预处理of42156、使用Java来编写Kafka的实例首先,编写KafkaProducer.properties文件:zk.connect=localhost:2181broker.list=localhost:9092serializer.class=kafka.serializer.StringEncoderrequest.required.acks=1下面的代码是使用Java编写了一个Kafka消息发布者:importkafka.javaapi.producer.Producer;importkafka.producer.KeyedMessage;importkafka.producer.ProducerConfig;publicclassMyKafkaProducer{privateProducerString,Stringproducer;privatefinalStringtopic;publicMyKafkaProducer(Stringtopic)throwsException{InputStreamin=Properties.class.getResourceAsStream(KafkaProducer.properties);Propertiesprops=newProperties();props.load(in);ProducerConfigconfig=newProducerConfig(props);producer=newProducerString,String(config);}publicvoidsendMessage(Stringmsg){KeyedMessageString,Stringdata=newKeyedMessageString,String(topic,msg);producer.send(data);producer.close();}publicstaticvoidmain(String[]args)throwsException{MyKafkaProducerproducer=newMyKafkaProducer(HelloTopic);Stringmsg=HelloKafka!;producer.sendMessage(msg);}}2.1大数据采集架构第二章数据采集与预处理of4216下面创建Comsumer,首先编写KafkaProperties文件:zk.connect=localhost:2181group.id=testgroupzookeeper.session.timeout.ms=500zookeeper.sync.time.ms=250auto.commit.interval.ms=1000上述参数配置,十分容易理解,具体的详细说明,可以参考Kafka的官方文档。下面的代码是使用Java编写了一个Kafka的Comsumer。importjava.io.InputStream;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Properties;importkafka.consumer.ConsumerConfig;importkafka.consumer.ConsumerIterator;importkafka.consumer.KafkaStream;importkafka.javaapi.consumer.ConsumerConnector;importkafka.consumer.Consumer;publicclassMyKafkaConsumer{privatefinalConsumerConnectorconsumer;privatefinalStringtopic;publicMyKafkaConsumer(Stringtopic)throwsException{InputStreamin=Properties.class.getResourceAsStream(KafkaProducer.properties);Propertiesprops=newProperties();props.load(in);ConsumerConfigconfig=newConsumerConfig(props);consumer=Consumer.createJavaConsumerConnector(config);this.topic=topic;}publicvoidconsumeMessage(){MapString,StringtopicMap=newHashMapString,String();topicMap.put(topic,newInteger(1));MapString,ListKafkaStreambyte[],byte[]consumerStreamsMap=consumer.createMessageStreams(topicMap);ListKafkaStreambyte[],byte[]streamList=consumerStreamsMap.get(topic);for(finalKafkaStreambyte[],byte[]stream:streamList){ConsumerIteratorbyte[],byte[]consumerIte=stream.iterator();while(consumerIte.hasNext())System.out.println(message::+newString(consumerIte.next().message()));}if(consumer!=null)consumer.shutdown();}publicstaticvoidmain(String[]args)throwsException{StringgroupId=testgroup;Stringtopic=HelloTopic;MyKafkaConsumerconsumer=newMyKafkaConsumer(topic);cons

1 / 11
下载文档,编辑使用

©2015-2020 m.777doc.com 三七文档.

备案号:鲁ICP备2024069028号-1 客服联系 QQ:2149211541

×
保存成功