1Metamorphosis介绍通用产品-消息中间件无花(wuhua@taobao.com)2011-11-11主要内容Meta是什么,特征和适用场景在公司的应用状况原理和内部实现Meta的使用性能3Metamorphosis是什么?Adistributedpublish-subscribemessagingsystem开源MQ-kafka的Java版本Linkedin开源的MQ《Themetamorphosis》——卡夫卡的代表作设计原则消息都是持久的,保存在磁盘吞吐量第一消费状态保存在客户端分布式,生产者、服务器和消费者都可分布。4跟kafka有什么不同?•用java替换scala•实现完全重写•存储结构上采用自定义结构,更简洁紧凑•ConsumerAPI没有采用kafka的stream方式,而是同时实现同步获取和异步订阅两种方式,更接近JMS和Notify5跟kafka有什么不同?•添加了实时统计功能和协议•客户端的连接复用•实现HA复制•实现发送的软负载•实现事务机制•支持http协议•实现消息数据的无痛迁移和水平扩展6Meta有什么特性?•生产者、服务器和消费者都可分布•消息存储顺序写•性能极高,吞吐量大•支持消息顺序•支持本地和XA事务•客户端pull,随机读,利用sendfile系统调用,zero-copy,批量拉数据7Meta有什么特性?•支持消费端事务•支持消息广播模式•支持异步发送消息•支持http协议•支持消息重试和recover•数据迁移、扩容对用户透明•消费状态保存在客户端8Meta能做什么收发消息作为普通的消息发布订阅模型使用收集数据收集和传输日志Tail4j—日志传输Agent•目录扫描•断点续传•文件正则匹配•自动编码探测同步缓存使用广播模式,同步各台机器的本地缓存顺序消息需要消息有序的场景,例如数据库同步其他……更多其他场景,例如acookie等大吞吐量场景Meta的应用ctuacookie商城的会员营销一淘GeneSNS卖家中心新业务充值平台湖畔聚划算交易安全系统TDDL数据同步支付宝淘宝每天消息3T左右,6000多万条更多……支付宝每天消息6-9T,大约250亿条TOPMetamorphosis优缺点服务端无状态消息存储顺序写,性能极高。客户端pull,随机读,利用zero-copysendfile提高发送效率。客户端有状态•保存pull的偏移量offset•异常情况下的消息暂存和recover利用zookeeper在同一个group的consumer之间做负载均衡实时性取决于pull间隔,消费者处理能力差异不影响服务端太多的无效pull请求可能浪费服务器资源•合理设定间隔•合理设计协议Meta的部署结构BrokerBrokerZK集群ProducerProducerProducerConsumerConsumerConsumerslaveSlaverSlaverSlaverConsumer异步复制异步复制同步复制异步复制MetaServerMeta的系统结构gecko/notify-remotingMessageStoreNetworkProcessorstatsHttpMetaSlaveMetaMasterHttpClientMetaClientMetaManagerToolsCppClientMetaOpsZookeeperProducerAppConsemerAppHDFSDBHbaseAgent主要概念的对应关系•Broker-1Topic-1partition0partition1partition2partition3Topic-2partition0partition1partition2partition3•Broker-2Topic-1partition0partition1partition2Topic-3partition0partition1partition2partition3messagesProducer和Broker之间的负载均衡partition1partition2partition3producer同一Group的Consumer和Broker之间的负载均衡partition1partition2partition3consumer1consumer2consumer3consumer4•每个分区针对每个消费者group只挂一个消费者;•同一个group的多余的消费者不参与消费同一Group的Consumer和Broker之间的负载均衡partition1partition2partition2consumer1consumer2consumer3consumer4•当分区数目(n)大于单个group的消费者数目(m)的时候,则有n%m个消费者需要额外承担1/n的消费任务。•n足够大的时候,仍然可以认为负载平均分配partition3partition4异步发送消息发送消息调用所消耗的时间少(0.01-0.02ms)使用者不用关心发送成功或失败降低可靠性,降低对业务逻辑的影响吞吐量更大不关心发送结果希望发消息对业务流程不产生影响(耗时上、逻辑上)收集日志特点使用场景异步发送消息流量控制•Why?异步发送消息流量控制11秒钟发送条数4k消息,2w2发送1条移动的距离3自定义窗口大小Legth=N(消息大小)/4k服务端FAQ如何保证高可用?–Master/Slave方案(同步和异步复制两种);集群持久化数据保留多久的?–看业务要求,可以为每个Topic配置不同的保留时间消息发送成功后,已经写入服务器磁盘?–可以说是,也可以说不是,因为存在os和磁盘缓存–每条消息在返回应答前都先write–每1000条消息force一次,每10秒force一次,可为全局或某个Topic配置参数–可以配置Groupcommit方式消息是怎么保存的?–每条消息保存在一个分区,分区内是一系列文件,顺序写,固定大小切换文件发送端FAQ为什么发送消息前需要先publishtopic?–为了根据topic从zk获取有效的broker列表发送消息怎么保证有序?–只保证单线程发送的消息有序–只保证发送同一个分区的消息有序–实现自定义分区选择器消息可以带属性吗?–仅允许带一个字符串属性,消费者可依此过滤消息ID怎么产生?–Long类型,在发送成功后由服务器端返回–默认-1–42位时间+10位brokerId+12位递增数字消息体怎么产生?–消息体仅要求是一个byte[]数组–序列化方式完全由用户决定消费者的FAQ实时性问题如何解决?–服务端提高刷盘频率,客户端减少pull时间间隔–ConsumerConfig.setMaxDelayFetchTimeInMills(longmaxDelayFetchTimeInMills)消费者是单线程还是多线程拉消息?–多线程(默认为CPU的个数),也可以配置只有一个线程拉–ConsumerConfig.setFetchRunnerCount(intfetchRunnerCount)处理消息的回调方法是运行在单线程还是多线程中?–多线程拉,不同分区消息的回调是运行在多线程环境中的–多线程拉,相同分区消息的回调可以认为是运行在单线程环境中的–单线程拉,运行在单线程中消费者的FAQ为什么在调用subscribe还要调用一次completeSubscribe?–因为subscribe可以调用多次,为了减少跟zk交互次数,subscribe会将订阅信息保存在内存,completeSubscribe的时候一次性处理Pull的偏移量保存在哪里?–默认保存在zk–我们还提供文件、数据库的存储实现。–OffsetStorage接口,可自主实现。偏移量多长时间保存一次?–默认5秒,可设置–ConsumerConfig.setCommitOffsetPeriodInMills(longcommitOffsetPeriodInMills)新加入的消费者不想接收到以前发的消息怎么办?–新增的group和广播新增的机器有这个问题–1.3-SNAPSHOT及其之后的版本支持可设置–ConsumerConfig.setConsumeFromMaxOffset()消费者的FAQ如果我暂时无法处理某个消息,又想继续往下走,怎么办?消息处理失败如何重试?–可选择跳过,设置最大重试次数,超过即跳过,默认5次–ConsumerConfig.setMaxFetchRetries(intmaxFetchRetries)–跳过的消息将保存在消费者本地磁盘或者notify,并自动Recover重试。–如果不想往下走就把这个参数设为int的最大值消息能保证不重复接收吗?–因为每个分区物理隔离消息,理论上每个消费者接收的消息不会重复–在consumer重新负载均衡的时候,可能由于offset保存延迟,导致重复接收极小部分消息。可以设置pull请求的时间间隔吗?–可以,你可以设置允许的最大延迟时间,当响应为空的时候,每次递增最大延迟时间的1/10做延迟,不会超过设定的最大延迟时间。默认5秒。消费者的FAQ广播消息和非广播消息的区别?Topic1Message1Message2Group1consumer1consumer2Group2consumer3consumer4group1非广播接收group2广播接收Meta的使用发送消息//Newsessionfactory,强烈建议使用单例MessageSessionFactorysessionFactory=newMetaMessageSessionFactory(newMetaClientConfig());//createproducer,强烈建议使用单例MessageProducerproducer=sessionFactory.createProducer();//publishtopicfinalStringtopic=“meta-test”;//调一次就够了producer.publish(topic);byte[]data=...//sendmessageSendResultsendResult=producer.sendMessage(newMessage(topic,data));Meta的使用异步发送//Newsessionfactory,强烈建议使用单例AsyncMessageSessionFactorysessionFactory=newAsyncMetaMessageSessionFactory(newMetaClientConfig());//createproducer,强烈建议使用单例MessageProducerproducer=sessionFactory.createAsyncProducer();//publishtopicfinalStringtopic=“meta-test”;//调一次就够了producer.publish(topic);byte[]data=...//sendmessageSendResultsendResult=producer.sendMessage(newMessage(topic,data));Meta的使用同步消费//subscribedtopicfinalStringtopic=“meta-test”;//consumergroupfinalStringgroup=“meta-example”;//createconsumerMessageConsumerconsumer=sessionFactory.createConsumer(newConsumerConfig(group));//startoffsetlongoffset=0;MessageIteratorit=null;//fetchmessageswhile((it=consumer.get(topic,newPartition(0-0),offset,1024*1024)