1Kafka安装配置及使用说明(铁树2018-08-08)(Windows平台,5个分布式节点,修改消息大小,调用程序范例)1安装配置采用5台服务器作为集群节点,IP地址为:XX.XX.0.12-XX.XX.0.16.每台机器依次安装配置JDK、zookeeper、kafka,先安装完一台机器,然后拷贝到其他机器,再修改配置文件。1.1JDK安装配置JDK版本:jdk1.7.0_51_x64解压版(jdk1.7.0_51_x64.rar)解压到C盘kafka目录下,如图所示。2设置环境变量:JAVA_HOME:C:\kafka\jdk1.7.0_51_x64PATH:C:\kafka\jdk1.7.0_51_x64\bin1.2zookeeper安装配置1.2.1解压安装zookeeper版本:3.4.12(zookeeper-3.4.12.tar.gz)3解压到C盘kafka目录下,如图所示。1.2.2创建zookeeper数据目录和日志目录zkdata#存放快照C:\kafka\zookeeper-3.4.12\zkdatazkdatalog#存放日志C:\kafka\zookeeper-3.4.12\zkdatalog41.2.3修改配置文件进入到“C:\kafka\zookeeper-3.4.12”目录下的conf目录中,复制zoo_sample.cfg(官方提供的zookeeper的样板文件),重命名为zoo.cfg(官方指定的文件命名规则)。默认内容:5修改后配置文件为:#Thenumberofmillisecondsofeachtick6tickTime=2000#Thenumberofticksthattheinitial#synchronizationphasecantakeinitLimit=10#Thenumberofticksthatcanpassbetween#sendingarequestandgettinganacknowledgementsyncLimit=5#thedirectorywherethesnapshotisstored.#donotuse/tmpforstorage,/tmphereisjust#examplesakes.dataDir=C:/kafka/zookeeper-3.4.12/zkdatadataLogDir=C:/kafka/zookeeper-3.4.12/zkdatalog#theportatwhichtheclientswillconnectclientPort=12181server.1=XX.XX.0.12:12888:13888server.2=XX.XX.0.13:12888:13888server.3=XX.XX.0.14:12888:13888server.4=XX.XX.0.15:12888:13888server.5=XX.XX.0.16:12888:13888#themaximumnumberofclientconnections.#increasethisifyouneedtohandlemoreclients7#maxClientCnxns=60##Besuretoreadthemaintenancesectionofthe#administratorguidebeforeturningonautopurge.##=100#Purgetaskintervalinhours#Setto0todisableautopurgefeatureautopurge.purgeInterval=24配置文件解释:#tickTime:这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳。#initLimit:这个配置项是用来配置Zookeeper接受客户端(这里所说的客户端8不是用户连接Zookeeper服务器的客户端,而是Zookeeper服务器集群中连接到Leader的Follower服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过10个心跳的时间(也就是tickTime)长度后Zookeeper服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是10*2000=20秒#syncLimit:这个配置项标识Leader与Follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是5*2000=10秒#dataDir:快照日志的存储路径#dataLogDir:事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多#clientPort:这个端口就是客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口,接受客户端的访问请求。修改他的端口改大点通过配置autopurge.snapRetainCount和autopurge.purgeInterval这两个参数能够实现定时清理了。这两个参数都是在zoo.cfg中配置的:autopurge.purgeInterval这个参数指定了清理频率,单位是小时,需9要填写一个1或更大的整数,默认是0,表示不开启自己清理功能。autopurge.snapRetainCount这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。1.2.4创建myid文件在“C:\kafka\zookeeper-3.4.12\zkdata”目录下,创建myid文件(无后缀名),内容为对应IP地址的主机号。如server.1则内容为1。1.3Kafka安装配置1.3.1解压安装kafka版本:kafka1.1.1(kafka_2.11-1.1.1.tgz)解压到C盘kafka目录下,如图所示。101.3.2创建消息目录kafkalogs:C:\kafka\kafka_2.11-1.1.1\kafkalogs1.3.3修改配置文件打开C:\kafka\kafka_2.11-1.1.1\config\server.properties实际的修改项为:11broker.id=1listeners=PLAINTEXT://:19092log.dirs=C:/kafka/kafka_2.11-1.1.1/kafkalogs#在log.retention.hours=168下面新增下面三项(消息大小最大1GB)message.max.byte=1073741824replica.fetch.max.bytes=1073741824log.segment.bytes=1073741824default.replication.factor=2#设置zookeeper的连接端口zookeeper.connect=XX.XX.0.12:12181,XX.XX.0.13:12181,XX.XX.0.14:12181,XX.XX.0.15:12181,XX.XX.0.16:12181配置说明:broker.id=0#当前机器在集群中的唯一标识,和zookeeper的myid性质一样port=19092#当前kafka对外提供服务的端口默认是9092host.name=192.168.7.100#这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。num.network.threads=3#这个是borker进行网络处理的线程数12num.io.threads=8#这个是borker进行I/O处理的线程数log.dirs=/opt/kafka/kafkalogs/#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个socket.send.buffer.bytes=102400#发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能socket.receive.buffer.bytes=102400#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘socket.request.max.bytes=104857600#这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小num.partitions=1#默认的分区数,一个topic默认1个分区数log.retention.hours=168#默认消息的最大持久化时间,168小时,7天message.max.byte=5242880#消息保存的最大值5Mdefault.replication.factor=2#kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务replica.fetch.max.bytes=5242880#取消息的最大字节数log.segment.bytes=1073741824#这个参数是:因为kafka的消13息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件log.retention.check.interval.ms=300000#每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168),到目录查看是否有过期的消息如果有,删除log.cleaner.enable=false#是否启用log压缩,一般不用启用,启用的话可以提高性能zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218#设置zookeeper的连接端口1.4其他节点配置将安装以上配置好的目录c:\kafka拷贝到其他节点的c盘目录,并修改如下配置。1、JAVA环境变量:JAVA_HOME:C:\kafka\jdk1.7.0_51_x64PATH:C:\kafka\jdk1.7.0_51_x64\bin2、zookeeper的myidC:\kafka\zookeeper-3.4.12\zkdata\myid,修改为对应的数值XX.XX.0.12:1XX.XX.0.13:2XX.XX.0.14:314XX.XX.0.15:4XX.XX.0.16:53、kafka配置C:\kafka\kafka_2.11-1.1.1\config\server.properties的broker.id,修改为对应的数值XX.XX.0.12:1XX.XX.0.13:2XX.XX.0.14:3XX.XX.0.15:4XX.XX.0.16:51.5服务启动1、启动zookeeperC:\kafka\zookeeper-3.4.12\bin\zkServer.cmdXX.XX.0.12-16,依次双击启动。152、启动kafka运行cmd,cdC:\kafka\kafka_2.11-1.1.1目录,再执行命令:【cdC:\kafka\kafka_2.11-1.1.1】C:\kafka\kafka_2.11-1.1.1.\bin\windows\kafka-server-start.bat.\config\server.properties161.6服务状态测试1.6.1创建Topics打开cmd进入C:\kafka\kafka_2.11-1.1.1\bin\windo