jms与ActiveMQ实践与应用

2024-09-07 17:32
文章标签 实践 应用 activemq jms

本文主要是介绍jms与ActiveMQ实践与应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

这是我自己从不知道JMS为何物到学习如何使用第三方工具实现跨服务器的知识总结,在整个过程中可能考虑不全。另外,如果想尽快使用JMS,建议直接看实例那一节就可以了。有问题多交流。

词语解释

(有些词可能用的不是很正确,在这里我把自己能意识到的词拿出来解释一下):

1、  跨服务器:专业术语好像叫“跨实例”。意思是,可以在多个服务器(可以是不同的服务器,如resin与tomcat)之间相互通信。与之对应的是单服务器版。

2、  消息生产者:就是专门制造消息的类。

3、  消息消费者:也叫消息接收者,它主要是实现了消息监听的一个接口,当然,也可以难过Spring提供的一个转换器接口指定任意一个类中的任意方法。

我们都知道,任何一个系统从整体上来看,其实质就是由无数个小的服务或事件(我们可以称之为事务单元)有机地组合起来的。对于系统中任何一个比较复杂的功能,都是通过调用各个独立的事务单元以实现统一的协调运作而实现的。

现在我们的问题是,如果有两个完全独立的服务(比如说两个不同系统间的服务)需要相互交换数据,我们该如何实现?

好吧,我承认,我很傻很天真,我想到的第一个方法就是在需要的系统中将代码再写一遍,但我也知道,这绝对不现实!好吧,那我就应该好好学习学习达人们是如何去解决这样的问题。

第一种方法,估计也是用的最多的,就是rpc模式。这种方法就是在自己的代码中远程调用其它程序中的代码以达到交换数据的目的。但是这种方法很显然地存在了一个问题:就是一定要等到获取了数据之后才能继续下面的操作。当然,如果一些逻辑是需要这些数据才能操作,那这就是我们需要的。

第二种方法就是Hessian,我个人觉得Hessian的实现在本质上与rpc模式的一样,只是它采用了配置,简化了代码。

上面这两个方法,基本上能解决所有的远程调用的问题了。但是美中不足的是,如果我在A系统中有一个操作是需要让B系统做一个响应的,但我又不需要等它响应完才做下面的操作,这该怎么办?于是新的解决方案就需要被提出来,而SUN公司的设计师们也考虑到了,在JAVA中这就被体现为JMS(java message service)。

一、认识JMS

JMS模块的功能只提供了接口,并没有给予实现,实现JMS接口的消息中间件叫JMS Provider,这样的消息中间件可以从Java里通过JMS接口进行调用。

JMS消息由两部分构成:header和body。header包含消息的识别信息和路由信息,body包含消息的实际数据。

JMS的通用接口集合以异步方式发送或接收消息。另外, JMS采用一种宽松结合方式整合企业系统的方法,其主要的目的就是创建能够使用跨平台数据信息的、可移植的企业级应用程序,而把开发人力解放出来。

Java消息服务支持两种消息模型:Point-to-Point消息(即P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub,也就是广播模式)。

根据数据格式,JMS消息可分为以下五种:

BytesMessage   消息是字节流。

MapMessage   消息是一系列的命名和值的对应组合。

ObjectMessage   消息是一个流化(即继承Serializable)的Java对象。

StreamMessage   消息是Java中的输入输出流。

TextMessage   消息是一个字符串,这种类型将会广泛用于XML格式的数据。

二、使用JMS

在使用JMS时,其步骤很像使用JDBC一样,需要的步骤为:

1、建立消息连接(也就是建立连接工厂);

2、设定消息目的地(其实与步骤1中用的类是一样的,只是它是用来指定目的地,而步骤1中是用来指定消息服务器地址的);

3、创建jmsTemplate实例(为下一步构建消息sessin作准备);

4、创建消息生产者(其中就用到了2、3两步的产物),它就是一个普通的类,一般是通过send方法发送消息,也可以通过MessageListenerAdapter指定发送信息的方法;

5、创建MDP(也就是消息接收者,它是一个必须实现MessageListener接口的类);

6、为每个MDP建立一个监听容器,当有相应的消息传来,则它会自动调用对应的MDP消费消息。

整个过程就像编写JDBC一样,代码维护量很大。为此,让Spring对其进行管理是个不错的选择。

三、Spring整合JMS

Spring框架提供了一个模板机制来隐藏Java APIs的细节。开发人员可以使用JDBCTemplate和JNDITemplate类来分别访问后台数据库和JEE资源(数据源,连接池)。JMS也不例外,Spring提供JMSTemplate类,因此开发人员不用为一个JMS实现去编写样本代码。接下来是在开发JMS应用程序时Spring所具有一些的优势。

1. 提供JMS抽象API,简化了访问目标(队列或主题)和向指定目标发布消息时JMS的使用。

2. 开发人员不需要关心JMS不同版本(例如JMS 1.0.2与JMS 1.1)之间的差异。

3. 开发人员不必专门处理JMS异常,因为Spring为所有JMS异常提供了一个未经检查的异常,并在JMS代码中重新抛出

具体的详细步骤与方法参考 spring-reference2.5.pdf 中的第十九章。

下面,我就将我在整个学习过程中实践过的例子一一列举出来,并将在其中遇到的问题和心得给出一定的说明,希望对大家能有所帮助。

四、实例

(一)、配置单服务器版消息机制

1、首先,我们需要配置resin下的resin.conf文件,在其中(<server></server>之间)加上:

<!-- The ConnectionFactory resource defines the JMS factory for creating JMS connections -->

<resource jndi-name="jms/factory"

    type="com.caucho.jms.ConnectionFactoryImpl">

</resource>

<!-- Queue configuration with Resin's database  -->

<resource jndi-name="jms/queue"

    type="com.caucho.jms.memory. MemoryQueue">

<init>

    <queue-name>OssQueue</queue-name>

    </init>

</resource>

<!-- Queue configuration with Resin's database  -->

<resource jndi-name="jms/topic"

    type="com.caucho.jms.memory. MemoryTopic">

<init>

    <queue-name>ossTopic</queue-name>

    </init>

</resource>

注:i、我现在只知道JNDI方式配置消息的连接工厂,我并不知道有没有其它的方式,但我看了许多资料上也没提到其它配置方式。

ii、网上很少有关于在resin中配置JMS消息工厂的资料,只有在resin的官网上才能见到。

iii、上面JNDI配置的地方需要注意的是,大家如果在网上看资料的话,可能会发现网上会比我给出的总是会多一些,也就是总是多一些<data-source>的初始化配置,如:

<resource jndi-name="jms/factory"

    type="com.caucho.jms.ConnectionFactoryImpl">

  <init>

    <data-source>jdbc/database</data-source>

  </init>

</resource>

就这样的配置,单独启动resin是没有问题的,但是如果将其按照下面的Spring配置加到系统中,就会出异常(具体的异常名称我忘了,中文的大概意思是:数据库对象不能转换成JMS连接对象,还有一种情况是启动系统时会内存溢出)。我认为这种配置可能是数据库消息模式的配置(因为JMS有内存和数据库两种管理方式,我目前只学习了内存管理的方式,至于数据库管理方式大家要是有兴趣可以参考:

http://www.oracle.com/technology/books/pdfs/2352_Ch06_FINAL.pdf

2、在web.xml文件中配置一个spring用的上下文:

<context-param>

    <param-name>contextConfigLocation</param-name>

    <param-value>/WEB-INF/jmsconfig.xml</param-value>

</context-param>

<!-- 配置Spring容器 -->

<listener>

    <listener-class>

    org.springframework.web.context.ContextLoaderListener

</listener-class>

</listener>

注:我是将jmsconfig.xml加载到service.xml中随系统启动的。

3、创建jmsconfig.xml用来装配jms,内容如下:

<?xml version="1.0" encoding="UTF-8"?>  

<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"

    "http://www.springframework.org/dtd/spring-beans.dtd">  

 

<beans>

    <bean id="jmsConnectionFactory"

 class="org.springframework.jndi.JndiObjectFactoryBean">

 <property name="jndiName">

     <value>java:comp/env/jms/factory </value>

 </property>

    </bean>

   

    <bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">

 <property name="jndiName">

     <value> java:comp/env/jms/queue</value>

 </property>

    </bean>

   

    <!--  Spring JmsTemplate config -->

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

 <property name="connectionFactory">

     <bean

  class="org.springframework.jms.connection.SingleConnectionFactory">

  <property name="targetConnectionFactory"

      ref="jmsConnectionFactory"/>

     </bean>

 </property>

    </bean>

   

    <!-- POJO which send Message uses Spring JmsTemplate --> <!--配置消息生产者-->

    <bean id="messageProducer" class="com.focustech.jms.MessageProducer">

 <property name="template" ref="jmsTemplate"/>

 <property name="destination" ref="destination"/>

    </bean>

   

    <!--  Message Driven POJO (MDP) -->

    <bean id="messageListener" class=" com.focustech.jms.MessageConsumer"/>

   

    <!--  listener container,MDP无需实现接口 -->

    <bean id="listenerContainer"

 class="org.springframework.jms.listener.DefaultMessageListenerContainer">

 <property name="connectionFactory" ref="jmsConnectionFactory"/>

 <property name="destination" ref="destination"/>

 <property name="messageListener" ref="messageListener"/>

    </bean>

</beans>

其中:

1)   jmsConnectionFactory和destination都是使用自定义的,而且你会发现,这两个对象的加载类其实是一样的,都是JndiObjectFactoryBean,这是从JNDI读取连接的意思。

3)  MessageProducer是消息发送方。

4)  MessageConsumer实现了一个MessageListener,监听是否收到消息。

4、发送和接收消息的class如下(主要代码):

MessageProducer.java

public class MessageProducer {

    private JmsTemplate template;

    private Destination destination;

 

    public void send(final String message) {

 template.send(destination, new MessageCreator() {

     public Message createMessage(Session session) throws JMSException {

  Message m = session.createTextMessage(message);

  return m;

     }

 });

    }

 

    public void setDestination(Destination destination) {

 this.destination = destination;

    }

 

    public void setTemplate(JmsTemplate template) {

 this.template = template;

    }

 

}

 

MessageConsumer.java

public class MessageConsumer implements MessageListener {

 

    public void onMessage(Message message) {

        try

       {

           System.out.println(((TextMessage) message).getText());

       }

       catch (JMSException e)

       {

       }

    }

}

注:在上面的实例类中,由于在发送方发送的是文本消息(TextMessage),所以在上面的接收者代码中我直接将其转换成TextMessage就行了。如果是在真正的环境下,应该首先判断一下对方发送的是什么类型,然后才转换成对应的消息。

5、测试消息

为了测试的方便,可以在webroot下新建一个test.jsp,然后将下面的代码放到JSP的代码中,然后在网页地址栏中输入链接(如:http://oss.vemic.com/test.jsp 注:oss.vemic.com是本地服务器链接)就可以看到发送的消息了。

<%

try {

     ServletContext servletContext = this.getServletContext();

     WebApplicationContext wac = WebApplicationContextUtils

      .getRequiredWebApplicationContext(servletContext);

     MessageProducer mp = (MessageProducer) wac.getBean("messageProducer");

           mp.send("JMS TEST!!");

      } catch (JmsException e) {

  }

%>

(二)、配置跨服务器(即两个或多个resin之间)版消息机制

上面介绍的是单服务器的消息模式配置,使用消息模式,是因为我们需要在两个或多个服务器之间进行消息的传递,而不是单个服务器内容的消息传递。看过很多资料才发现,几乎所有的服务器都不支持跨服务器的消息模式,就算有(如JBoss),那也是因为它们本身集成了第三方的工具而实现的。而在第三方软件里面,我最终选择了apache activeMQ。

apache activeMQ的简介可以去其官网查看:http://activemq.apache.org/

或参考:http://www.blogjava.net/cctvx1/archive/2007/02/07/98457.html

I、ActiveMQ的安装与配置

在其官网上下载最新的对应系统的版本。一般来说,下载完解压之后就可能通过运行:apache-activemq-5.2.0\bin\ activemq.bat就可以成功启动。具体详细的信息参考:

http://andyao.javaeye.com/blog/153171,或者也可参考其官网。

II、整合Spring的JMS消息发送

在真正操作之前,为了不至于糊涂,我们应该忘掉前面所说的所有配置(当然,JMS的基础知识我们还是应该记住的,因为所有的JMS操作都是基于此的。还有那两个生产消息与消费消息的类与测试页面我们也要保留,因为我们下面还需要它们),好吧,现在将所有的配置回归到开始的状态吧(如:resin.conf, JMS在Spring中的配置等等都回到原始状态吧)。

先说一下我运行时所需的环境吧:JDK1.5.0_12和JDK1.6.0_05都可以;resin-3.0.25(其它版本没有试过);配置ActiveMQ所需的JAR包有:

activemq-core-5.2.0.jar、activemq-web-5.2.0.jar、geronimo-j2ee-management_1.0_spec-1.0.jar、geronimo-jms_1.1_spec-1.1.1.jar、geronimo-jta_1.0.1B_spec-1.0.1.jar、xbean-spring-3.4.jar。

好了,一切准备就绪了。那就让ActiveMQ先在系统中运行吧(也就是先单服务器运行,先易后难嘛)。为了让它能够运行起来,我们需要做以下的准备工作:

1、 使用ActiveMQ的JMS在Spring中的配置

其实这里的许多配置和上面说的单服务器的配置是差不多的,只是这里不再需要配置resin,web.xml的配置与上面的一模一样(当然,我还是按照我的方式配置在了service.xml中),好了,现在不同的配置是jmsconfig.xml:

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:amq="http://activemq.org/config/1.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

       <!-- 配置ActiveMQ服务 -->

       <amq:broker useJmx="false" persistent="false">

              <amq:transportConnectors>

<!-- ActiveMQ目前支持的transport有:VM TransportTCP TransportSSL TransportPeer TransportUDP TransportMulticast TransportHTTP and HTTPS TransportFailover TransportFanout TransportDiscovery TransportZeroConf Transport-->

                     <amq:transportConnector uri="tcp://test.vemic.com:61616" />

              </amq:transportConnectors>

       </amq:broker>

       <!-- 配置JMS连接工厂(注:brokerURL是关键,它应该是上面的amq:transportConnectors里面的值之一对应,因为这里指定连接的对象) -->

       <amq:connectionFactory id="jmsConnectionFactory"

              brokerURL="tcp://test.vemic.com:61616" />

       <!-- 消息发送的目的地(注:”amq:queue”是用于指定是发送topic还是queue) -->

       <amq:queue name="destination" physicalName="ossQueue" />

       <!-- 创建JMS的Session生成类,也就是jmsTemplate -->

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

       class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

       </bean>

       <!-- 消息生产者(通过指定目的地, 就可以同时指定其发送的消息模式是topic还是queue) -->

       <bean id="messageProducer"

              class="com.focustech.jms.MessageProducer">

              <property name="template" ref="jmsTemplate" />

              <property name="destination" ref="destination" />

       </bean>

       <!-- 消息接收类(这个类需要继承javax.jms.MessageListener,当然也可以通过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发) -->

       <bean id="messageListener"

              class=" com.focustech.jms.MessageConsumer">

       </bean>

       <!-- 消息监听容器,其各属性的意义为:

              connectionFactory:指定所监听的对象,在这里就是监听连接到tcp://test.vemic.com:61616上面的ActiveMQ;

              destination:监听的消息模式;

              messageListener:接收者

              ) -->

       <bean id="listenerContainer"

       class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="messageListener" />

       </bean>

</beans>

注:test.vemic.com是我本机的URL,和localhost一样。

2、 消息测试

采用上面单机测试的消息就可以了。最终运行的结果为:

JMS TEST!!

注:

注意到了没有?上面的配置从jmsTemplate开始往下就和前面介绍过的单服务器的配置一样了。看到这里,我相信大家对JMS的工作过程应该很清楚了。我个人认为我们可以简单的这样理解其工作过程:

生产者

 

消费者

 

JMS connectionFactory

产生消息并发往指定的目的地

接收消息并给出已消费确认信息,JMS connectionFactory收到确认信息后就将对应的信息从自己的管理库中删除

从上面的图很清楚地看出,要想实现跨服务器的JMS消息机制,JMS connectionFactory是关键的地方,简单地说:connectionFactory就决定了JMS的作用范围。如果connectionFactory是受制于系统(也就是说,当系统停掉之后connectionFactory也就跟着销毁),那么它就不能实现跨服务器功能。要想实现跨服务功能,connectionFactory就必须独立于系统或服务器。由此可见,结合前面的知识,我们就可以知道,能够实现跨服务器的JMS消息机制其实有两种方式:JDBC方式和采用第三方工具。前面我也说过,我选择了后者(而且我也一直这么做了)。

3、  实现多服务器的JMS共享,即实现JMS跨服务器功能

ActiveMQ的单服务器版我们已经成功搭建并能成功运行了。现在让我们实现JMS跨服务器功能吧。等等,我们先准备另一个服务器环境。为了明显的区别两个服务,我将上面所有的环境重新弄了一份(一个新的MyEclipse;一个新的web工程,当然web工程里面的环境与上面的一样;一个新的resin;一个新的resin端口8081,上面的resin端口是80),我称之为client。

接下来,我们来配置client中的JMS。在所有的系统配置中只有一个配置文件与上面的有区别,那就是jmsconfig.xml:

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:amq="http://activemq.org/config/1.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

       <!-- 配置JMS连接工厂(注:brokerURL是关键,它应该是上面的amq:transportConnectors里面的值之一对应,因为这里指定连接的对象) -->

       <amq:connectionFactory id="jmsConnectionFactory"

              brokerURL="tcp://test.vemic.com:61616" />

       <!-- 消息发送的目的地(注:” amq:queue”是用于指定是发送topic还是queue,对应上面配置中的amq:destinations) -->

       <amq:queue name="destination" physicalName="ossQueue" />

       <!-- 创建JMS的Session生成类 -->

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

       class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

       </bean>

       <!-- 消息生产者(通过指定目的地, 就可以同时指定其发送的消息模式是topic还是queue) -->

       <bean id="messageProducer"

              class="com.focustech.jms.MessageProducer">

              <property name="template" ref="jmsTemplate" />

              <property name="destination" ref="destination" />

       </bean>

       <!-- 消息接收类(这个类需要继承javax.jms.MessageListener,当然也可以通过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发) -->

       <bean id="messageListener"

              class="com.focustech.jms.MessageConsumer">

       </bean>

       <!-- 消息监听容器,其各属性的意义为:

              connectionFactory:指定所监听的对象,在这里就是监听连接到tcp://test.vemic.com:61616上面的ActiveMQ;

              destination:监听的消息模式;

              messageListener:接收者

              ) -->

       <bean id="listenerContainer"

       class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="messageListener" />

       </bean>

</beans>

注意了!这个配置与上面的ActiveMQ的配置只有一个地方不一样,也就是:这个配置中没有配置ActiveMQ的相关信息。为什么?结合上面所述的JMS简单工作方式,我们应该不难得到答案:因为ActiveMQ要实现跨服务器就必须独立运行,所以我们只需要启动一个就够了。

注:

其实在这个简单的跨服务器的例子中,其中一方只需要配置消息生产者,而另一方只需要配置消息的消费者就可以测试通过了,而且测试效果会很明显:在消息生产者的那个服务器上运行测试程序,在消息接收的服务器上就会有相应的响应!在这里我之所以这样做,是让大家在测试时发现一个奇怪的现象:当在一端运行测试程序,第一个消息会被当前运行测试程序的服务消费掉,而接下来的消息又被另一个服务器消费掉,如此循环(我没有测试过三个及以上的服务器运行情况,我想可能是消息一个个的均摊下去的吧)。之所以要让大家看到这个现象,是为了让大家有个疑问,容易接受后面高级应用中关于JMS的两种消息机制:Point-to-Point消息(即P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub,也就是广播模式)的使用方法。

III、高级应用

从这里开始,我们将进入JMS消息的一些特殊用法,或者叫高级应用。在介绍这些应用的时候,我们会用到上面已经布署好的跨服务器的应用来作例子。为了区别两个服务器,我们把配置有ActiveMQ的叫server,另一个还是叫client。

1、使用指定的消息,即消息的P2P与Pub/Sub的应用

上节内容的最后给大家留下了一个有趣的现象。在这里,我们将针对这个现象进行详细的解析。

我们前面也知道了,消息模式有两种,但怎么使用却一直没提过。但是如果仔细的看过官网的资料也许已经知道一些了。在这里,我将用实例的方式给大家展现一下它的具体使用。(参考:http://andyao.javaeye.com/blog/234101)

首先,我们来看Queue消息的使用实例。上面的跨服务器的实例其实就是queue实例的应用,但问题是:如何指定唯一的接收者呢?也就是不能出现上面提到的那个奇怪的循环现象呢?

其实这个现象也并不难回答,首先让我们来仔细看一下queue消息的目的地配置:

<amq:queue name="destination" physicalName="ossQueue" />

对于上面的配置,我们可以一一解读其中各参数的含义就知道奥妙所在了:

amq:queue:表示这是配置是queue消息;

name:指定的消息发送与接收的目的地的名称;

physicalName:指定消息队列的物理名称,在ActiveMQ中它就是一个消息集群的表示形式。

根据上面配置的含义,我们不难发现,其实奥妙就在physicalName这个属性中体现的。具体来说,对于queue消息而言,只要发送方与接收方都使用同一个physicalName,这就是点对点指定了。例如:将上面的例子中client中的:

<amq:queue name="destination" physicalName="ossQueue" />

改成:

<amq:queue name="destination" physicalName="ossQueue1" />

这样的话,我们上面说的“奇怪的现象”就不会存在了。因为server中消息是发送到ossQueue这个消息队列里的,而client中消息目的地是指向ossQueue1的,当然就收不到server里面ossQueue中的消息了。

我们再来看一下如何使用Topic消息。其实很简单,只要将第II节配置中的

<amq:queue name="destination" physicalName="ossQueue" />

改成:

<amq:topic name="destination" physicalName="ossQueue" />

就可以了。运行测试程序时,会发现在两个服务都会收到响应信息。

注:关于topic,有一个概念,叫“订阅”。关于这个词我也不是很了解,但我理解是:在系统中配置了amq:topic并且connectionFactory指定到对应的uri上的前提上,只要amq:topic中对应的physicalName与publish端相同,这就是订阅,这样的配置之后它就能收到发送的信息了。

总之一句话,点对点(或发布订阅模式)的消息发送关键在于收发双方是否共同指定同一个physicalName

2、自定义消息的收发类

正如前面例子中配置文件中的一行注释说的一样,消息的收发类用户可以自定义,它是通过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发。具体的操作我们还是来看实例吧(为了简单起见,我们以单服务器的配置与运行作实例,跨服务配置是一样的):

jmsconfig.xml:

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:amq="http://activemq.org/config/1.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd">

       <!-- 配置ActiveMQ服务 -->

       <amq:broker useJmx="false" persistent="false">

              <amq:transportConnectors>

                     <!-- 提供的连接方式有:VM Transport、TCP Transport、SSL Transport、

                            Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、

                            Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等 -->

                     <amq:transportConnector uri="tcp://test.vemic.com:61616" />

              </amq:transportConnectors>

       </amq:broker>

       <!-- 配置JMS连接工厂(注:brokerURL是关键,

它应该是上面的amq:transportConnectors里面的值之一) -->

       <amq:connectionFactory id="jmsConnectionFactory"

              brokerURL="tcp://test.vemic.com:61616" />

       <!-- 消息发送的目的地(注:amq:queue是用于指定是发送topic不是queue,对应上面配置中的amq:destinations) -->

       <amq:queue name="destination" physicalName="ossQueue" />

       <!-- 创建JMS的Session生成类 -->

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

                            class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

              <!-- 指定发送信息时使用的消息转换类.

                     这个选项不填的话,默认的是:SimpleMessageConverter,它只支持4种类型的对象:String, byte[],Map,Serializable

              -->

              <!—如果加上下面这段配置就会出错错误原因是Book不是一个原始类但我已经将它继承Serializable,可还是不行,我想可能有其他什么原因吧但我现在不清楚 -->

              <!-- <property name="messageConverter"

                     ref="resourceMessageConverter" /> -->

       </bean>

       <!-- 发送消息的转换类

(这个类要继承org.springframework.jms.support.converter.MessageConverter -->

       <bean id="resourceMessageConverter"

              class=" com.focustech.jms.ResourceMessageConverter" />

       <!-- 消息生产者(通过指定目的地, 就可以同时指定其发送的消息模式是topic还是queue) -->

       <bean id="resourceMessageProducer"

              class=" com.focustech.jms.ResourceMessageProducer">

              <property name="template" ref="jmsTemplate" />

              <property name="destination" ref="destination" />

       </bean>

       <!-- 消息接收类(这个类需要继承,当然也可以通过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发) -->

       <bean id="resourceMessageListener"

              class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

              <constructor-arg>

                     <bean

                            class=" com.focustech.jms.ResourceMessageConsumer">

                     </bean>

              </constructor-arg>

              <property name="defaultListenerMethod" value="recieve" />

              <!—自定义接收类与接收的方法 -->

              <property name="messageConverter"

                     ref="resourceMessageConverter" />

       </bean>

       <!-- 消息监听容器,其各属性的意义为:

              connectionFactory:指定所监听的对象,在这里就是监听连接到tcp://test.vemic.com:61616上面的ActiveMQ;

              destination:监听的消息模式;

              messageListener:接收者

       -->

       <bean id="listenerContainer"

              class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="resourceMessageListener" />

       </bean>

</beans>

在这里,我们需要发送自己定义的消息格式,这样,我们就需要不同的消息的生产者与消费者,当然,也需要一个自定义的将两者消息进行转换的一个自定义的类,如上面配置文件中指定的一样,这三个自定义的类的主要代码如下:

ResourceMessageProducer:

public class ResourceMessageProducer

{

       private JmsTemplate    template;

       private Destination      destination;

       public JmsTemplate getTemplate()

       {

              return template;

       }

       public void setTemplate(JmsTemplate template)

       {

              this.template = template;

       }

       public Destination getDestination()

       {

              return destination;

       }

       public void setDestination(Destination destination)

       {

              this.destination = destination;

       }

       public void send(Book book)

       {

              System.out.println("=======================================");

              System.out.println("do send ......");

              long l1 = System.currentTimeMillis();

              template.convertAndSend(this.destination, book);

              System.out.println("send time:" + (System.currentTimeMillis() - l1) / 1000 + "s");

              System.out.println("=======================================");

       }

}

ResourceMessageConverter:

public class ResourceMessageConverter implements MessageConverter

{

       @SuppressWarnings("unchecked")

       public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException

       {

              // check Type

              if (obj instanceof Book)

              {

                     // 采用ActiveMQ的方式传递消息

                     ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();

                     Map map = new HashMap();

                     map.put("Book", obj);

                     // objMsg.setObjectProperty里面放置的类型只能是:String, Map, Object, List

                     objMsg.setObjectProperty("book", map);

                     return objMsg;

              }

              else

              {

                     throw new JMSException("Object:[" + obj + "] is not Book");

              }

       }

       public Object fromMessage(Message msg) throws JMSException, MessageConversionException

       {

              if (msg instanceof ObjectMessage)

              {

                     Object obj = ((ObjectMessage) msg).getObject();

                     return obj;

              }

              else

              {

                     throw new JMSException("Msg:[" + msg + "] is not Map");

              }

       }

}

ResourceMessageConsumer:

public class ResourceMessageConsumer

{

       public void recieve(Object obj)

       {

              Book book = (Book) obj;

              System.out.println("=======================================");

              System.out.println("receiveing message ...");

              System.out.println(book.toString());

              System.out.println("here to invoke our business method...");

              System.out.println("=======================================");

       }

}

Book:

public class Book implements Serializable

{

       /**

        *

        */

       private static final long       serialVersionUID   = -6988445616774288928L;

       long                                    id;

       String                                 name;

       String                                 author;

       public String getAuthor()

       {

              return author;

       }

       public void setAuthor(String author)

       {

              this.author = author;

       }

       public long getId()

       {

              return id;

       }

       public void setId(long id)

       {

              this.id = id;

       }

       public String getName()

       {

              return name;

       }

       public void setName(String name)

       {

              this.name = name;

       }

}

消息测试:将测试JSP中的JAVA代码改成:

<%

try {

     ServletContext servletContext = this.getServletContext();

     WebApplicationContext wac = WebApplicationContextUtils

      .getRequiredWebApplicationContext(servletContext);

ResourceMessageProducer resourceMessageProducer = (ResourceMessageProducer) context.getBean("messageProducer");

Book book = new Book();

book.setId(123);

book.setName("jms test!");

book.setAuthor("taofucheng");

resourceMessageProducer.send(book);

      } catch (JmsException e) {

  }

%>

运行系统,打开测试页面,会发现消息已经成功接收!

注:(1)、通过这种方法,我们就可以发送我们想发送的任何对象了(有些限制:这些对象的类型必须是:String, Map, byte[],Serializable。上面的例子已经注释得很清楚)。

(2)、如果大家有兴趣的话,看一下MessageListenerAdapter的源码,你就会发现其实它就是MessageListener的实现类,在它实现的onMessage方法中使用了用户自定义的转换类而已。

3、集成事务

Spring提供的JMS的API中已经有了集成事务的功能,我们只要将上面监听容器的配置改成下面的就行了:

首先,将jmsTemplate设置成支持事务(它默认是不支持事务的):

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

                            class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

              <property name="sessionTransacted" value="true"/>

       </bean>

然后再在消息监听容器中设置指定的事务管理:

    <bean id="listenerContainer"

              class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="resourceMessageListener" />

              <!—jtaTransactionManager是系统中的事务管理类,在我们的系统中,是由Spring托管的 -->

              <property name="transactionManager" ref="jtaTransactionManager" />

       </bean>

这样配置之后,当事务发生回滚时,消息也会有回滚,即不发送出去。

4、其它高级应用

ActiveMQ还有许多其它高级的应用,如:自动重连机制,也就是保证当通信双方或多方的链接断裂后它会根据用户的设置自动连接,以保证建立可靠的传输;另外,ActiveMQ还有其它方式嵌入到Spring中,如它可以通过xbean, file等方式建立应用;它还可以通过JMX对消息的发送与接收进行实时查看;消息的确认方式等等,还有很多高级的应用,请参考:《ActiveMQ in Action》(网址:http://whitesock.javaeye.com/blog/164925))

这篇关于jms与ActiveMQ实践与应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1145689

相关文章

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

将Python应用部署到生产环境的小技巧分享

《将Python应用部署到生产环境的小技巧分享》文章主要讲述了在将Python应用程序部署到生产环境之前,需要进行的准备工作和最佳实践,包括心态调整、代码审查、测试覆盖率提升、配置文件优化、日志记录完... 目录部署前夜:从开发到生产的心理准备与检查清单环境搭建:打造稳固的应用运行平台自动化流水线:让部署像

Linux中Curl参数详解实践应用

《Linux中Curl参数详解实践应用》在现代网络开发和运维工作中,curl命令是一个不可或缺的工具,它是一个利用URL语法在命令行下工作的文件传输工具,支持多种协议,如HTTP、HTTPS、FTP等... 目录引言一、基础请求参数1. -X 或 --request2. -d 或 --data3. -H 或

在Ubuntu上部署SpringBoot应用的操作步骤

《在Ubuntu上部署SpringBoot应用的操作步骤》随着云计算和容器化技术的普及,Linux服务器已成为部署Web应用程序的主流平台之一,Java作为一种跨平台的编程语言,具有广泛的应用场景,本... 目录一、部署准备二、安装 Java 环境1. 安装 JDK2. 验证 Java 安装三、安装 mys

Python中构建终端应用界面利器Blessed模块的使用

《Python中构建终端应用界面利器Blessed模块的使用》Blessed库作为一个轻量级且功能强大的解决方案,开始在开发者中赢得口碑,今天,我们就一起来探索一下它是如何让终端UI开发变得轻松而高... 目录一、安装与配置:简单、快速、无障碍二、基本功能:从彩色文本到动态交互1. 显示基本内容2. 创建链

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一

Docker集成CI/CD的项目实践

《Docker集成CI/CD的项目实践》本文主要介绍了Docker集成CI/CD的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录一、引言1.1 什么是 CI/CD?1.2 docker 在 CI/CD 中的作用二、Docke

java中VO PO DTO POJO BO DO对象的应用场景及使用方式

《java中VOPODTOPOJOBODO对象的应用场景及使用方式》文章介绍了Java开发中常用的几种对象类型及其应用场景,包括VO、PO、DTO、POJO、BO和DO等,并通过示例说明了它... 目录Java中VO PO DTO POJO BO DO对象的应用VO (View Object) - 视图对象

Go信号处理如何优雅地关闭你的应用

《Go信号处理如何优雅地关闭你的应用》Go中的优雅关闭机制使得在应用程序接收到终止信号时,能够进行平滑的资源清理,通过使用context来管理goroutine的生命周期,结合signal... 目录1. 什么是信号处理?2. 如何优雅地关闭 Go 应用?3. 代码实现3.1 基本的信号捕获和优雅关闭3.2

正则表达式高级应用与性能优化记录

《正则表达式高级应用与性能优化记录》本文介绍了正则表达式的高级应用和性能优化技巧,包括文本拆分、合并、XML/HTML解析、数据分析、以及性能优化方法,通过这些技巧,可以更高效地利用正则表达式进行复杂... 目录第6章:正则表达式的高级应用6.1 模式匹配与文本处理6.1.1 文本拆分6.1.2 文本合并6