转自本文介绍了WebSphereMQ中的消息组支持,以及如何利用该支持来提供逻辑消息排序和支持相关消息分组。并演示了如何使用WebSphereMQJava类进行分组操作,和如何使用JMSAPI实现与此相同的功能。随后,本文还给出了一个建议解决方案,并说明了如何在WebSphereApplicationServer或其他J2EE应用服务器中异步接收消息组时如何应用此解决方案。消息组介绍IBM®WebSphere®MQ并不能始终保证发送和接收应用程序间的消息的正确顺序。如果三条消息按照顺序ABC发送,可能不会按照相同的顺序到达(例如,如果中间网络将消息分布到集群中,然后再重新组合时)。但如果消息顺序对应用程序的正常工作非常重要又该如何呢?假设有这样的场景,消息B告知应用程序忽略前一个消息。如果消息以顺序CBA送达,则序列的意义将完全不同了。WebSphereMQ通过消息分组来解决此问题。发送消息的应用程序可以指定其将消息A、B和C作为组的一部分发送。组中的每个消息都分配了一个序列号(从1开始)。然后,接收应用程序可以指定希望按照此逻辑顺序接收消息(与消息到达目的地的实际顺序相对)。现在,即使消息B或C首先到达,也不会将其立即传递给应用程序,因为它们的序列号不为1。消息组还可用于另一个目的。有时候消息顺序可能并不重要,但可能要求将一个消息集合一起处理(在空间上和时间上)。例如,假定有一个应用程序在每次向在线购物车添加了物品后都会发送一条消息。购物车中的物品可能需要一起处理,或许要将其聚合到单个订单消息中。可以通过将消息放入到消息组中对此聚合进行管理。消息的接收者可以指定,在所有消息到达目的地之前,不希望接收组中的任何消息。在此场景中,在同一个位置接收所有消息也很重要。如果出于可伸缩性方面的原因,目的地有多个使用者,则务必将表示相同订单中的物品的所有消息发送到相同的使用者,而消息组就可以确保满足这一要求。消息组的概念与消息段不同,后者表示大型消息发送时被拆分为较小的消息,应在接收时将其重新组装为原始消息。消息组中的每个实体都是一个完整的消息。可以使用消息段对消息组内的消息进行拆分,但在本文中将不会考虑此选项。使用WebSphereMQJavaAPI现在我们将讨论使用WebSphereMQJava™API发送和接收消息组的实际操作。发送消息组下面的清单1给出了使用WebSphereMQJavaAPI将包含五条消息的组发送到队列管理器QM_host上的队列default所需的代码:清单1.使用WebSphereMQJavaAPI发送消息组MQQueueManagerqueueManager=newMQQueueManager(QM_host);MQQueuequeue=queueManager.accessQueue(default,MQC.MQOO_OUTPUT);MQPutMessageOptionspmo=newMQPutMessageOptions();pmo.options=MQC.MQPMO_LOGICAL_ORDER;for(inti=1;i=5;i++){MQMessagemessage=newMQMessage();message.format=MQSTR;message.writeString(Message+i);if(i5){message.messageFlags=MQC.MQMF_MSG_IN_GROUP;}else{message.messageFlags=MQC.MQMF_LAST_MSG_IN_GROUP;}queue.put(message,pmo);}queue.close();queueManager.disconnect();该示例首先连接到队列管理器,并打开用于进行输出的队列句柄。从消息组的角度而言,要注意的第一个重要方面是向put消息选项添加了约束MQPMO_LOGICAL_ORDER。此值告知队列管理器,应用程序将把组中的每个消息按照序列顺序放入队列中,客户机在处理任何后续消息前会将一个组中的所有消息放置到其上。此代码随后循环五次,每次放置一个新消息。(消息格式设置为MQSTR,因此我们可能稍后接收到JMS文本消息类型的消息。)对于前四条消息,设置了消息标志MQMF_MSG_IN_GROUP,以指示相应的消息应属于当前组。第五条消息设置了消息标志MQMF_LAST_MSG_IN_GROUP,以指示该消息是组中的最后一条消息。下一次放置具有MQMF_MSG_IN_GROUP标志的消息时,将自动开始一个新的组。示例最后关闭队列句柄并从队列管理器断开,从而结束。运行此代码时,通过使用WebSphereMQExplorer浏览组消息,可得到图1所示的结果:图1.在WebSphereMQExplorer中浏览消息组为每条消息分配了相同的24位组标识符,且分别具有从1到5的逻辑序列号。使用MQPMO_LOGICAL_ORDERput消息选项纯粹为了方便起见。应用程序可能不会使用此标志,而采用显式设置组标识符和序列号的方式。如果消息不按顺序发出,或和其他消息组穿插着发送,则有必要采取后一种方式。仍然应设置消息标志来指示消息属于某个组以及是否为组中的最后一条消息。此机制的另一个可用场景为组中的消息分散在很长的时间内进行传递。可能出现这样的情况,应用程序使用逻辑消息排序发送组中的开头的若干消息,然后系统出现故障。当应用程序重新启动时,可以继续处理该消息组,能在不进行逻辑排序的情况下发送后面的消息,只要显式地将组标识符设置为前面的消息所使用的组标识符并使用后续序列标识符即可。此时,可以仍然对后续消息使用逻辑消息排序。队列管理器将随后继续使用相同的组标识符,并在每次递增序列号。可以将消息组与事务结合使用。如果第一个消息放置在事务下,则必须将所有使用相同队列句柄的所有其他消息都放置于事务下。不过,每个消息并不一定要在相同的事务中。接收消息组我们已经以组的形式发送了消息,接下来我们希望采用相同的顺序接收这些消息。下面的清单2给出了如何使用WebSphereMQJavaAPI完成此任务的示例:清单2.使用WebSphereMQJavaAPI接收消息组MQQueueManagerqueueManager=newMQQueueManager(QM_host);MQQueuequeue=queueManager.accessQueue(default,MQC.MQOO_INPUT_AS_Q_DEF);MQGetMessageOptionsgmo=newMQGetMessageOptions();gmo.options=MQC.MQGMO_LOGICAL_ORDER|MQC.MQGMO_ALL_MSGS_AVAILABLE;gmo.matchOptions=MQC.MQMO_NONE;MQMessagemessage=newMQMessage();do{queue.get(message,gmo);intdataLength=retrievedMessage.getDataLength();System.out.println(message.readStringOfCharLength(dataLength));gmo.matchOptions=MQC.MQMO_MATCH_GROUP_ID;}while(gmo.groupStatus!=MQC.MQGS_LAST_MSG_IN_GROUP);queue.close();queueManager.disconnect();和前面一样,该代码首先连接到队列管理器,并打开队列句柄,但这次是为了使用缺省队列定义接收消息。我们指定两个get消息选择:MQGMO_LOGICAL_ORDER指示我们希望按照逻辑顺序接收消息,即,应该首先接收序列号为1的消息,然后是序列号为2的消息,依此类推。第二个选项MQGMO_ALL_MSGS_AVAILABLE指示在组中的所有消息可用前,我们不希望接收其中的任何消息。此选项可防止在开始处理组中的消息时却发现后续消息尚未发送或尚未到达的情况。对于第一个get,我们指定不需要任何匹配选项——准备接收任何组中的第一条消息。对于后续迭代,我们均指定MQMO_MATCH_GROUP_ID选项,以指示我们只希望接收具有匹配组标识符的消息。我们将为每个迭代使用相同的消息对象,因此,对于第二个get,将包含所接收到的第一条消息的组标识符。每个get操作都将更新get消息选项的组状态字段。当设置为MQGS_LAST_MSG_IN_GROUP时,我们就知道已经接收到了组中的所有消息。和前面的清单中一样,请确保在完成时进行清理工作,即关闭队列句柄并从队列管理器断开。使用WebSphereMQJMSAPI此时,您可能会问,为什么这些示例都使用WebSphereMQJavaAPI在这个标准盛行的时代,我们是不是应该使用JavaMessageService(JMS)API不过,对于大部分标准规范,JMS代表了消息传递系统所支持的功能的最低要求。因此,并非WebSphereMQ所支持的所有行为都可通过此API进行表示,而消息组正是其中之一。JMS规范确实定义了两个分别名为JMSXGroupID和JMSXSeqNum的属性,并指定这两个属性分别表示消息所属的组的标识符和在该组中的序列号。不过,JMS规范未提供任何使用这些属性的支持。不过,这并非十分绝对——通过采用一些补救方法,仍然可以通过使用这些属性来复现现有行为。发送消息组首先,让我们看看发送应用程序。正如上面提到的,put消息选项MQPMO_LOGICAL_ORDER仅是一个队列管理器指令,用于自动分配消息组标识符和序列号。下面的清单3演示了如何在JMSAPI缺少此选项的情况下显式地设置这些属性。清单3.使用WebSphereMQJMSAPI发送消息组MQConnectionFactoryfactory=newMQConnectionFactory();factory.setQueueManager(QM_host)MQQueuedestination=newMQQueue(default);destination.setTargetClient(JMSC.MQJMS_CLIENT_NONJMS_MQ);Connectionconnection=factory.createConnection();Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);MessageProducerproducer=session.createProducer(destination);规范版本本文中的JMS示例使用来自JMS1.1的统一域接口。不过,可以对其进行重新编写,以使用早期WebSphereMQJMS版本中提供的点到点或发布/订阅接口。类似地,可以使用EJB2.0部署描述符和JMS1.0.2b接口来在J2EE1.3应用服务器中使用下载部分提供的MDB示例。StringgroupId=ID:+newBigInteger(24*8,newRandom()).toString(16);for(inti=1;i=5;i++){TextMessagemessage=session.createTextMessage();message.setStringProperty(JMSXGroupID,groupId);message.setIntProperty(JMSXGroupSeq,i);if(i==5){message.setBooleanProperty(JMS_IBM_Last_Msg_In_Group,true);}message.setText(Message+i);producer.send(message);}connectio