SS-JMS第1页基于JMS的数据清分1JMS概述JMS是一个标准的应用编程接口(API),用来建立厂商中立的异步通信机制。从这个意义上说,JMS类似于JDBC和JNDI,例如就JDBC来说,它要求有一个底层的数据库提供者,JMS则要求有一个支持JMS标准的底层异步通信中间件提供者,一般称为面向消息的中间件(Message-OrientedMiddleware,MOM)。MOM是一种支持应用程序通过交换消息实现异步通信的技术。在某种程度上,异步通信有点象是人们通过email进行通信;类似地,同步通信的程序就象是人们通过电话进行通信。在异步通信过程中,程序的结合方式是宽松的,换句话说,异步通信的程序并不直接相互联系,而是通过称为队列(Queue)或主题(Topic)的虚拟通道联系。JMS有两种通信方式:第一种是端对端通信(Point-to-Point,P2P)方式,第二种是出版/订阅(Publish/Subscribe,Pub/Sub)方式。1.1P2P方式P2P消息传递又可以按照推(Push)和拉(Pull)两种方式运作。在P2P拉方式中,程序通过称为队列的虚拟通道通信:在通信会话的发送方,发送程序把一个消息放入队列,在接收方,接收程序定期扫描队列,寻找它希望接收和处理的消息。和推方式相比,拉方式的消息传递效率较低,因为它需要周而复始地检查消息是否到达,这个过程会消耗一定的资源。另外必须注意的一点是,当接收方发现一个需要处理的消息时,它就会提取消息,从效果上看等于从队列删除了消息。因此,即使有多个接收程序在处理同一个队列,对于某一特定的消息来说,总是只有一个接收者。JMS程序可以使用多个队列,每一个队列可以由多个程序处理,但是只有一个程序才会收到某个特定的消息。在P2P推方式的消息传递中,发送程序的工作原理也一样,它把消息发送到队列,但接收程序的工作原理有所不同。接收程序实现了一个Listener接口,包括实现了该接口中的onMessage回调方法,在J2EE环境中监听队列接收消息的任务交给了容器,每当一个新的消息达到队列,容器就调用onMessage方法,将消息作为参数传递给onMessage方法。P2P通信最重要的特点(不管是推还是拉)是:每一个消息总是只由一个程序接收。一般而言,SS-JMS第2页P2P程序在通信过程中参与的活动较多,例如,发送程序可以向接收程序指出应答消息应当放入哪一个队列,它还可以要求返回一个确认或报告消息。1.2Pub/Sub方式在Pub/Sub通信方式中,程序之间通过一个主题(Topic)实现通信。在消息发送方,生产消息的程序向主题发送消息;在接收方,消息的消费程序向感兴趣的主题订阅消息。当一个消息到达主题,所有向该主题订阅的消费程序都会通过onMessage方法的参数收到消息。这是一种推式的消息传递机制。可以设想,会有一个以上的消费程序收到同一消息的副本。相比之下,程序在Pub/Sub通信过程中参与的活动较少,当生产者程序向某个特定的队列发送消息,它不知道到底会有多少程序接收到该消息(可能有多个,可能没有)。2数据清分2.1清分模式清分程序从总公司把数据按各分公司分类,分别发送给各个分公司,分公司接收处理数据后反馈给总公司,如下图:在数据清分中,对于某一笔数据来说,只发送给某个地方,也就是说一个消息只有一个消费者;从这个意义上来说P2P方式更适合做数据清分。如果数据是单向的传输,即只从总公司发送给分公司,那么只需要每个公司一个队列就可以实现,总公司配置“反馈队列”,分公司配置“消息队列”,如下图:总公司分公司A分公司B分公司NSS-JMS第3页如果数据是双向的,即分公司也要发送数据给总公司,那么每个公司都需要配置两个队列,如下图:我们的实现的程序支持双向方式,我们称之为ssjms,后面文档中的部署例子是双向方式的。2.2ssjms介绍ssjms实现了双向消息发送机制,程序包中提供了发送消息的API,消息的接受采用MDB(消息EJB)来自动处理,下图是文件的结构。src是源代码目录,resource是ejb的配置文件,test是发送消息的测试例子。总公司分公司反馈队列消息队列消息队列反馈队列总公司分公司反馈队列消息队列SS-JMS第4页2.2.1发送消息com.sinosoft.jms.core.Producer类提供了发送消息方法sendMessage。以下是发送消息的例子,在test目录中可以找到。packagecom.sinosoft.jms.core.examples;importjava.util.Properties;importjavax.jms.JMSException;importjavax.naming.NamingException;importcom.sinosoft.jms.core.Log;importcom.sinosoft.jms.core.MessageBody;importcom.sinosoft.jms.core.Producer;publicclassProducerTest{/***@paramargs*@throwsJMSException*@throwsNamingException*/publicstaticvoidmain(String[]args)throwsNamingException,JMSException{Producerproducer=newProducer(t3://localhost:5050,t3://localhost:6060);MessageBodybody=newMessageBody();body.setData(helloworld!);Propertiesproperties=newProperties();properties.setProperty(BizType,Policy);properties.setProperty(PK,PDAA200800000000000001);body.setProperties(properties);Stringid=producer.sendMessage(body);Log.info(JMSMessageID=+id);}}下面讲下发送消息的步骤1~3:1、创建Producer的对象SS-JMS第5页Producerproducer=newProducer(t3://localhost:5050,t3://localhost:6060);构造函数的入参含义如下:/***消息生产者*@paramfromServerURL发送端服务器URL*@paramtoServerURL接收端服务器URL*/publicProducer(StringfromServerURL,StringtoServerURL)2、创建消息体MessageBodyMessageBodybody=newMessageBody();消息体可以通过setData和setProperties方法来设置消息数据,3、把消息发送出去Stringid=producer.sendMessage(body);调用Producer.sendMessage方法,把消息发送出去。2.2.2接收消息我们采用了MDB自动接收消息,分成两种MDB:接收消息EJB和接收反馈EJB;由于我们是双向的方式,因此需要实现两对EJB(4个EJB),分别是:分公司接收消息EJB~总公司接收反馈EJB,总公司接收消息EJB~分公司接收反馈EJB。2.2.2.1接收消息EJB新编写类,继承RequestConsumerBean类,覆盖方法receiveMessage:publicMessageBodyreceiveMessage(MessageBodymessageBody);入参是接收到的消息,返回需要反馈的消息。META-INF/ejb-jar.xml:!DOCTYPEejb-jarPUBLIC-//SunMicrosystems,Inc.//DTDEnterpriseJavaBeans2.0//EN第6页ejb-jarenterprise-beansmessage-drivenejb-nameRequestConsumerBean/ejb-nameejb-classcom.sinosoft.jms.mdb.RequestConsumerBean/ejb-classtransaction-typeContainer/transaction-typemessage-driven-destinationdestination-typejavax.jms.Queue/destination-type/message-driven-destination/message-driven/enterprise-beans/ejb-jar其中ejb-class内容需要根据继承的类名修改。2.2.2.2接收反馈EJB新编写类,继承ResponseConsumerBean类,覆盖方法receiveMessage:publicMessageBodyreceiveMessage(MessageBodymessageBody);入参是接收到的反馈消息,返回null。META-INF/ejb-jar.xml:!DOCTYPEejb-jarPUBLIC-//SunMicrosystems,Inc.//DTDEnterpriseJavaBeans2.0//EN其中ejb-class内容需要根据继承的类名修改。2.2.3部署到weblogic上在部署之前,我们需要准备好两对EJB,总公司的2个EJB打成一个jar包,分公司的2个EJBSS-JMS第7页也打成一个jar包。总公司和分公司的部署步骤实际一样,只是用的EJB-jar包不同而已。我们以分公司为例列出部署步骤。1、创建JMSConnectionFactoryServices-JMS-ConnectionFactories-ConfigureanewJMSConnectionFactory...其中JNDIName的框填“queueConnectionFactory”,如下图:创建后,千万别忘了指定Targets。2、创建StoreServices-JMS-Stores-ConfigureanewJMSFileStore...其中Directory的框填个目录路径,持久化消息用,如下图:3、创建JMSServerServices-JMS-Servers-ConfigureanewJMSServer...其中Store选择刚才2步创建的,如下图:SS-JMS第8页创建后,千万别忘了指定Targets。4、创建QueueServices-JMS-Servers-MyJMSServer-Destinations-ConfigureanewJMSQueue.