ActiveMQ的几种基本通信方式总结简介在前面一篇文章里讨论过几种应用系统集成的方式,发现实际上面向消息队列的集成方案算是一个总体比较合理的选择。这里,我们先针对具体的一个消息队列Activemq的基本通信方式进行探讨。activemq是JMS消息通信规范的一个实现。总的来说,消息规范里面定义最常见的几种消息通信模式主要有发布-订阅、点对点这两种。另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,我们针对这几种方式一一讨论一下。基础流程在讨论具体方式的时候,我们先看看使用activemq需要启动服务的主要过程。按照JMS的规范,我们首先需要获得一个JMSconnectionfactory.,通过这个connectionfactory来创建connection.在这个基础之上我们再创建session,destination,producer和consumer。因此主要的几个步骤如下:1.获得JMSconnectionfactory.通过我们提供特定环境的连接信息来构造factory。2.利用factory构造JMSconnection3.启动connection4.通过connection创建JMSsession.5.指定JMSdestination.6.创建JMSproducer或者创建JMSmessage并提供destination.7.创建JMSconsumer或注册JMSmessagelistener.8.发送和接收JMSmessage.9.关闭所有JMS资源,包括connection,session,producer,consumer等。publish-subscribe发布订阅模式有点类似于我们日常生活中订阅报纸。每年到年尾的时候,邮局就会发一本报纸集合让我们来选择订阅哪一个。在这个表里头列了所有出版发行的报纸,那么对于我们每一个订阅者来说,我们可以选择一份或者多份报纸。比如北京日报、潇湘晨报等。那么这些个我们订阅的报纸,就相当于发布订阅模式里的topic。有很多个人订阅报纸,也有人可能和我订阅了相同的报纸。那么,在这里,相当于我们在同一个topic里注册了。对于一份报纸发行方来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:现在,假定我们用前面讨论的场景来写一个简单的示例。我们首先需要定义的是publisher.publisherpublisher是属于发布信息的一方,它通过定义一个或者多个topic,然后给这些topic发送消息。publisher的构造函数如下:Java代码1.publicPublisher()throwsJMSException{2.factory=newActiveMQConnectionFactory(brokerURL);3.connection=factory.createConnection();4.try{5.connection.start();6.}catch(JMSExceptionjmse){7.connection.close();8.throwjmse;9.}10.session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);11.producer=session.createProducer(null);12.}我们按照前面说的流程定义了基本的connectionFactory,connection,session,producer。这里代码就是主要实现初始化的效果。接着,我们需要定义一系列的topic让所有的consumer来订阅,设置topic的代码如下:Java代码1.protectedvoidsetTopics(String[]stocks)throwsJMSException{2.destinations=newDestination[stocks.length];3.for(inti=0;istocks.length;i++){4.destinations[i]=session.createTopic(STOCKS.+stocks[i]);5.}6.}这里destinations是一个内部定义的成员变量Destination[]。这里我们总共定义了的topic数取决于给定的参数stocks。在定义好topic之后我们要给这些指定的topic发消息,具体实现的代码如下:Java代码1.protectedvoidsendMessage(String[]stocks)throwsJMSException{2.for(inti=0;istocks.length;i++){3.Messagemessage=createStockMessage(stocks[i],session);4.System.out.println(Sending:+((ActiveMQMapMessage)message).getContentMap()+ondestination:+destinations[i]);5.producer.send(destinations[i],message);6.}7.}8.9.protectedMessagecreateStockMessage(Stringstock,Sessionsession)throwsJMSException{10.MapMessagemessage=session.createMapMessage();11.message.setString(stock,stock);12.message.setDouble(price,1.00);13.message.setDouble(offer,0.01);14.message.setBoolean(up,true);15.16.returnmessage;17.}前面的代码很简单,在sendMessage方法里我们遍历每个topic,然后给每个topic发送定义的Message消息。在定义好前面发送消息的基础之后,我们调用他们的代码就很简单了:Java代码1.publicstaticvoidmain(String[]args)throwsJMSException{2.if(args.length1)3.thrownewIllegalArgumentException();4.5.//Createpublisher6.Publisherpublisher=newPublisher();7.8.//Settopics9.publisher.setTopics(args);10.11.for(inti=0;i10;i++){12.publisher.sendMessage(args);13.System.out.println(Publisher'+i+pricemessages);14.try{15.Thread.sleep(1000);16.}catch(InterruptedExceptione){17.e.printStackTrace();18.}19.}20.//Closeallresources21.publisher.close();22.}调用他们的代码就是我们遍历所有topic,然后通过sendMessage发送消息。在发送一个消息之后先sleep1秒钟。要注意的一个地方就是我们使用完资源之后必须要使用close方法将这些资源关闭释放。close方法关闭资源的具体实现如下:Java代码1.publicvoidclose()throwsJMSException{2.if(connection!=null){3.connection.close();4.}5.}consumerConsumer的代码也很类似,具体的步骤无非就是1.初始化资源。2.接收消息。3.必要的时候关闭资源。初始化资源可以放到构造函数里面:Java代码1.publicConsumer()throwsJMSException{2.factory=newActiveMQConnectionFactory(brokerURL);3.connection=factory.createConnection();4.connection.start();5.session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);6.}接收和处理消息的方法有两种,分为同步和异步的,一般同步的方式我们是通过MessageConsumer.receive()方法来处理接收到的消息。而异步的方法则是通过注册一个MessageListener的方法,使用MessageConsumer.setMessageListener()。这里我们采用异步的方式实现:Java代码1.publicstaticvoidmain(String[]args)throwsJMSException{2.Consumerconsumer=newConsumer();3.for(Stringstock:args){4.Destinationdestination=consumer.getSession().createTopic(STOCKS.+stock);5.MessageConsumermessageConsumer=consumer.getSession().createConsumer(destination);6.messageConsumer.setMessageListener(newListener());7.}8.}9.10.publicSessiongetSession(){11.returnsession;12.}在前面的代码里我们先找到同样的topic,然后遍历所有的topic去获得消息。对于消息的处理我们专门通过Listener对象来负责。Listener对象的职责很简单,主要就是处理接收到的消息:Java代码1.publicclassListenerimplementsMessageListener{2.3.publicvoidonMessage(Messagemessage){4.try{5.MapMessagemap=(MapMessage)message;6.Stringstock=map.getString(stock);7.doubleprice=map.getDouble(price);8.doubleoffer=map.getDouble(offer);9.booleanup=map.getBoolean(up);10.DecimalFormatdf=newDecimalFormat(#,###,###,##0.00);11.System.out.println(stock+\t+df.format(price)+\t+df.format(offer)+\t+(up?up:down));12.}catch(Exceptione){13.e.printStackTrace();14.}15.}16.17.}它实现了MessageListener接口,里面的onMessage方法就是在接收到消息之后会被调用的方法。现在,通过实现前面的publisher和consumer我们已经实现了pub-sub模式的一个实例。仔细回想它的步骤的话,主要就是要两者设定一个共同的topic,有了这个topic之后他们可以实现一方发消息另外一方接收。另外,为了连接到具体的messageserver,这里是使用了连接tcp://localhost:16161作为定义ActiveMQConnectionFactory的路径。在publisher端通过session创建producer,根据指定的参数创建destination,然后将消息和destination作为producer.send()方法的参数发消息。在co