ActiveMQ入门

    xiaoxiao2021-03-25  78

    ActiveMQ入门

    作者:一路向北

    摘要:本文主要讲述ActiveMQ的基本知识和使用方法,并简单结合spring使用ActiveMQ

    一、ActiveMQ特性和使用总览

    企业消息软件从80年代起就存在,它不只是一种应用间消息传递风格,也是一种集成风格。因此,消息传递可以满足应用间的通知和互相操作。但是开源的解决方案是到最近10年才出现的。Apache ActiveMQ就是其中一种。它使应用间能以异步,松耦合方式交流。本章将向您介绍ActiveMQ

    1ActiveMQ的特性

    ActiveMQApache软件基金下的一个开源软件,它遵循JMS1.1规范(Java Message Service),是消息驱动中间件软件(MOM)。它为企业消息传递提供高可用,出色性能,可扩展,稳定和安全保障。ActiveMQ使用Apache许可协议。因此,任何人都可以使用和修改它而不必反馈任何改变。这对于商业上将ActiveMQ用在重要用途的人尤为关键。MOM的工作是在分布式的各应用之间调度事件和消息,使之到达指定的接收者。所以高可用,高性能,高可扩展性尤为关键。

    ActiveMQ的目标是在尽可能多的平台和语言上提供一个标准的,消息驱动的应用集成。ActiveMQ实现JMS规范并在此之上提供大量额外的特性。

    下面是一个高层次的特性列表。

    ·遵循JMS规范 ----理解ActiveMQ的起始点是明白ActiveMQ的各种特性是JMS1.1规范的实现。本章后面将讨论JMS规范提供的好处和保证。它们包括同步和异步消息传递,一次和只有一次的消息传递,对于预订者的持久消息等等。依附于JMS规范意味着,不论JMS消息提供者是谁,同样的基本特性都是有效的。

    ·连接----ActiveMQ提供各种连接选择,包括HTTPHTTPSIP多点传送,SSLSTOMPTCPUDPXMPP等。大量的连接协议支持使之具有更好的灵活性。很多现有的系统使用一种特定协议并且不能改变,所以一个支持多种协议的消息平台降低了使用的门槛。虽然连接很重要,但是和其他容器集成也同样重要。第四章将讲解ActiveMQ的传输连接器(transport connectors)和网络连接器(network connectors)。

    ·可插拔的持久性和安全----ActiveMQ提供多种持久性方案可供选择,也可以完全按自己需求定制验证和授权。例如,ActiveMQ通过KahaDB提供自己的超快速消息持久方案(ultra-fast message persistence),但也支持标准的JDBC方案。ActiveMQ可以通过配置文件提供简单的验证和授权,也提供标准的JAAS登陆模块。

    ·用Java建立消息驱动应用----ActiveMQ最常用在Java应用中,用于发送和接收消息。这部分的内容涉及JMS规范API

    ·与应用服务器集成----ActiveMQjava应用服务器集成是很常见的。

    ·客户端APIs----ActiveMQ对多种语言提供客户端API,除了Java之外还有C/C++.NETPerlPHPPythonRuby等。这使得ActiveMQ能用在Java之外的其它语言中。很多其它语言都可以通过ActiveMQ提供的客户端API使用ActiveMQ的全部特性。当然,ActiveMQ代理器(broker)仍然是运行在java虚拟机上,但是客户端能够使用其它的被支持的语言。

    ·代理器集群(Broker clustering----为了利于扩展,多个ActiveMQ broker能够联合工作。这个方式就是network of brokers并且能支持多种拓扑结构。

    ·高级代理器特性和客户端选项----ActiveMQ为代理器和客户端连接提供很多高级的特性。ActiveMQ也可以通过代理器的XML配置文件支持Apache Camel

    ·简单的管理----ActiveMQ是为开发者设计的。它并不需要专门的管理工具,因为它提供各种易用且强大的管理特性。有很多方法去监控ActiveMQ的各个方面,可以通过JMX使用JConsoleActiveMQ web console;可以运行ActiveMQ消息报告;可以用命令行脚本;可以通过日志。

    2、了解JMS

    ActiveMQ 最好还是了解下 JMS

    JMS 公共

    点对点域

    发布/订阅域

    ConnectionFactory

    QueueConnectionFactory

    TopicConnectionFactory

    Connection

    QueueConnection

    TopicConnection

    Destination

    Queue

    Topic

    Session

    QueueSession

    TopicSession

    MessageProducer

    QueueSender

    TopicPublisher

    MessageConsumer

    QueueReceiver

    TopicSubscriber

    JMS 定义了两种方式:Queue(点对点);Topic(发布/订阅)。

    ·ConnectionFactory 是连接工厂,负责创建Connection。

    ·Connection 负责创建 Session。

    ·Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。

    ·Destination 是消息的目的地。

    详细的可以网上找些 JMS 规范(有中文版),本文对这两种方式都会有讲解。

    二、开始使用ActiveMQ

    开始使用ActiveMQ并不难。你只要启动代理器并确保它能接受连接和发送消息。ActiveMQ有一些自带的例子。

    在这部分,你将下载和安装Java SE,下载和安装ActiveMQ,检查ActiveMQ目录,然后第一次启动ActiveMQ。所需工具包括JDK1.5+AntActiveMQ

    1JDK下载和安装

    ActiveMQ 要求Sun Java SE 1.5或以上。在开始这部分前,必须先安装。JDK的下载和安装就不是本文介绍重点。

    2Ant下载和安装

    Ant可以用来构建和运行ActiveMQ自带例子。Ant可以从Apache Ant网址下载。URL: http://ant.apache.org/bindownload.cgi

    点击链接地址并选择正确的压缩包。(tar包是LinuxUnixzipWindows)。请按照下列地址安装AntURL: http://ant.apache.org/manual/install.html。确保你设置好$ANT_HOME环境变量,并将$ANT_HOME/bin放到$PATH环境变量里。安装完毕后你可以运行下面的命令查看Ant版本。

     

    你的可能跟我的不太一样,使用Ant的不同版本,不过这没关系。一旦Ant输出如上信息,你就可以确定Ant都安装正确。

    3ActiveMQ下载和安装

    ActiveMQ可以从Apache ActiveMQ网站下,URL: http://activemq.apache.org/download.html.

    点击地址到5.4.3版本,你可以看到tarzip格式包。(tar包是LinuxUnixzipWindows)。下载完后解压。我的解压到了E:\tools\apache-activemq-5.4.3

    从命令行进入apache-activemq-5.4.3目录,输入如下命令。

     

    ·LICENSE----Apache Software Foundation(ASF)要求的一个文件.包含ActiveMQ使用的所有库的许可证.

    ·NOTICE----ASF要求的另一个文件.包含ActiveMQ使用的所有库的版权信息.

    ·README.txt 一个包含一些URL的文档,使新手可以使用ActiveMQ.

    ·WebConsole-README.txt----包含使用ActiveMQ web console使用说明.

    ·activemq-all-5.4.3.jar---一个jar包包含ActiveMQ所有东西。放在这里是方便你使用它。

    ·bin----包含二进制或可运行文件。ActiveMQ启动脚本就放在里面。

    ·conf--ActiveMQ所有的配置信息。

    ·data--日志和持久化文件存储地方。

    ·docs--包含一个简单的index.html,该文件指向ActiveMQ网站。

    ·example----ActiveMQ例子。我们用这些例子来简单的测试ActiveMQ

    ·lib----所有ActiveMQ所需库。

    ·user-guide.html----一个简单指引启动ActiveMQ和运行例子。

    ·webapps----ActiveMQ web console和一些网络演示。

    下一部分将启动ActiveMQ并用这些例子验证它。

    启动ActiveMQ:在命令行中输入一下命令,或者直接运行bin目录下的activemq.bat文件

     

    刚才的命令启动了ActiveMQ代理器和一些连接器,使得客户端可以通过一些诸如TCPSSLSTOMPXMPP协议连接进来。请注意现在ActiveMQ已经启动,并且客户端可以通过TCP 61616端口连接进来。最好的方法是使用ActiveMQ自带的例子来发送和接收消息。下面我们来运行第一个例子。

    4、运行ActiveMQ第一个例子

    这个例子是模拟生产者消费者的例子,生产者产生2000条信息,然后关闭,消费者消费2000信息,然后关闭。下面我们演示一下。

    重新打开一个DOS窗口,切到MQ目录的example目录下,我的是E:\tools\apache-activemq-5.4.3\example

    然后输入一下命令

     

    最后一行显示程序在等待2000message。这是一个消费者。再重新打开DOS窗口并且到MQ目录下的example目录,输入一下命令ant producer,这个是生产者。输入命令之后你回看到刚才打开的两个DOS窗口打印出一堆东西

     

    看到BUILD SUCCESSFUL说明程序运行成功。

    消费者窗口变成了如下内容:

     

    示例程序演示完毕,观看别人写的看着没啥意思,我们自己写一个程序,测试一下。

    三、Queue(点对点)方式

    在点对点的传输方式中,消息数据被持久化,每条消息都能被消费,没有监听QUEUE地址也能被消费,数据不会丢失,一对一的发布接受策略,保证数据完整。我们自己编写一个跟上面类似的例子。

    1、建立web项目

    打开myeclipse,新建web项目,内容填下如下:

     

    点击finish。接着在项目根目录下新建一个lib目录,然后将我们上面提及的activemq-all-5.4.3.jar拷贝至lib目录下,并将其加入项目library中如下图:

     

    点击OK,项目目前的文件夹结构如下:

     

    2、编写代码

    src上右键选择新建class,具体如下:

     

    点击finish。然后输入如下代码

    package com.mq.queue;

     

    import java.util.Date;

     

    import javax.jms.Connection;

    import javax.jms.ConnectionFactory;

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.MessageProducer;

    import javax.jms.Session;

     

    import org.apache.activemq.ActiveMQConnection;

    import org.apache.activemq.ActiveMQConnectionFactory;

     

    /**

     * queue:消息消费者

     */

    public class Producer {

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    String user = ActiveMQConnection.DEFAULT_USER;

    String password = ActiveMQConnection.DEFAULT_PASSWORD;

    String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    String subject = "TOOL.DEFAULT";

    ConnectionFactory contectionFactory = new ActiveMQConnectionFactory( user, password, url);

    try {

    Connection connection = contectionFactory.createConnection();

    connection.start();

    Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createQueue(subject);

    MessageProducer producer = session.createProducer(destination);

    for (int i = 0; i <= 20; i++) {

    MapMessage message = session.createMapMessage();

    Date date = new Date();

    message.setLong("count", date.getTime());

    Thread.sleep(1000);

    producer.send(message);

    System.out.println("--发送消息:" + date);

    }

    session.commit();

    session.close();

    connection.close();

    } catch (JMSException e) {

    e.printStackTrace();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

     

    }

     

    然后再新建一个消费者类如下图:

     

    点击finish。然后输入如下代码

    package com.mq.queue;

     

    import java.util.Date;

     

    import javax.jms.Connection;

    import javax.jms.ConnectionFactory;

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.MessageConsumer;

    import javax.jms.MessageListener;

    import javax.jms.Session;

     

    import org.apache.activemq.ActiveMQConnection;

    import org.apache.activemq.ActiveMQConnectionFactory;

     

    /**

     * queue:消息消费者

     */

    public class Consumer {

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    String user = ActiveMQConnection.DEFAULT_USER;

    String password = ActiveMQConnection.DEFAULT_PASSWORD;

    String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    String subject = "TOOL.DEFAULT";

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url);

    Connection connection;

    try {

    connection = connectionFactory.createConnection();

    connection.start();

    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createQueue(subject);

    MessageConsumer message = session.createConsumer(destination);

    message.setMessageListener(new MessageListener() {

    public void onMessage(Message msg) {

    MapMessage message = (MapMessage) msg;

    try {

    System.out.println("--收到消息:" + new Date(message.getLong("count")));

    session.commit();

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    });

    Thread.sleep(30000);

    session.close();

    connection.close();

    } catch (JMSException e) {

    e.printStackTrace();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

    3、运行程序

    右键运行程序Consumer类,控制台打印出如下信息:

     

    再右键运行Producer类,控制台上打印出来的信息同上。稍等一秒,Producer类的运行控制台会打印出来如下信息:

     

    有信息可知,生产者正在生成消息,切到消费者类的控制台方法如下:

     

    选择Cosumer,会看到如下信息:

     

    由信息可以看到,消费者收到了同等数量的信息,而且信息的内容就是生产者发送的信息。这个是个PTP模式的例子,下面我们写一个P/S方式的例子。

    四、Topic(发布/订阅)

    在发布订阅消息方式中,消息是无状态的,不保证每条消息被消费,只有监听该TOPIC地址才能收到消息并消费,否则该消息将会丢失。一对多的发布接受策略,可以同时消费多个消息。下面编写代码。

    1、编写代码

    新建一个发布者类,界面如下:

     

    将类的内容修改为如下内容:

    package com.mq.topic;

     

    import java.util.Date;

     

    import javax.jms.Connection;

    import javax.jms.DeliveryMode;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.MessageProducer;

    import javax.jms.Session;

    import javax.jms.Topic;

     

    import org.apache.activemq.ActiveMQConnection;

    import org.apache.activemq.ActiveMQConnectionFactory;

     

    /**

     * topic:消息发布者

     */

    public class Publisher {

    public static void main(String[] args) {

    String user = ActiveMQConnection.DEFAULT_USER;

    String password = ActiveMQConnection.DEFAULT_PASSWORD;

    String url = "tcp://localhost:61616";

    String subject = "TOOL.DEFAULT";

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, url);

    Connection connection;

    try {

    connection = factory.createConnection();

    connection.start();

    Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

    Topic topic = session.createTopic(subject);

    MessageProducer producer = session.createProducer(topic);

    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    for (int i = 0; i <= 20; i++) {

    MapMessage message = session.createMapMessage();

    Date date = new Date();

    message.setLong("count", date.getTime());

    Thread.sleep(1000);

    producer.send(message);

    System.out.println("--发送消息:" + date);

    }

    session.commit();

    session.close();

    connection.close();

    } catch (JMSException e) {

    e.printStackTrace();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

     

    新建一个订阅者,界面如下,

    修改类的内容如下:

    package com.mq.topic;

     

    import java.util.Date;

     

    import javax.jms.Connection;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.MessageConsumer;

    import javax.jms.MessageListener;

    import javax.jms.Session;

    import javax.jms.Topic;

     

    import org.apache.activemq.ActiveMQConnection;

    import org.apache.activemq.ActiveMQConnectionFactory;

     

    /**

     * topic:消息订阅者一

     */

    public class SubscriberFirst {

    public static void main(String[] args) {

    String user = ActiveMQConnection.DEFAULT_USER;

    String password = ActiveMQConnection.DEFAULT_PASSWORD;

    String url = "tcp://localhost:61616";

    String subject = "TOOL.DEFAULT";

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, url);

    Connection connection;

    try {

    connection = factory.createConnection();

    connection.start();

    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

    Topic topic = session.createTopic(subject);

    MessageConsumer consumer = session.createConsumer(topic);

    consumer.setMessageListener(new MessageListener() {

    public void onMessage(Message msg) {

    MapMessage message = (MapMessage) msg;

    try {

    System.out.println("--订阅者一收到消息:" + new Date(message.getLong("count")));

    session.commit();

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    });

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    }

     

    再新建一个订阅者SubscriberSecond.java,内容如下:

    package com.mq.topic;

     

    import java.util.Date;

     

    import javax.jms.Connection;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.MessageConsumer;

    import javax.jms.MessageListener;

    import javax.jms.Session;

    import javax.jms.Topic;

     

    import org.apache.activemq.ActiveMQConnection;

    import org.apache.activemq.ActiveMQConnectionFactory;

     

    /**

     * topic:消息订阅者二

     */

    public class SubscriberSecond {

    public static void main(String[] args) {

    String user = ActiveMQConnection.DEFAULT_USER;

    String password = ActiveMQConnection.DEFAULT_PASSWORD;

    String url = "tcp://localhost:61616";

    String subject = "TOOL.DEFAULT";

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, url);

    Connection connection;

    try {

    connection = factory.createConnection();

    connection.start();

    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

    Topic topic = session.createTopic(subject);

    MessageConsumer consumer = session.createConsumer(topic);

    consumer.setMessageListener(new MessageListener() {

    public void onMessage(Message msg) {

    MapMessage message = (MapMessage) msg;

    try {

    System.out.println("--订阅者二收到消息:" + new Date(message.getLong("count")));

    session.commit();

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    });

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    }

    2、运行程序

    分别先运行两个订阅者类,你会发现控制台没有任何反映,先不要着急,我们再运行发布者类看看。查看发布者类控制台内容如下:

     

    现在我们再查看订阅者一的控制台,内容如下:

     

    在切到订阅者二的控制台看看,内容如下:

     

    两个订阅者收到的内容与发布者发布的内容一样。比较两者代码可以看出二者的区别。可能有的童鞋没有按照我说的顺序运行程序,在PTP方式中,可能发不了啥问题,但是在P/S方式中就会发现问题,如果先运行发布者,再运行订阅者,订阅者收到的消息不等于发布者发布的消息数量。出现这个原因,不是因为程序没写对,而是这种模式的原理所致。上面已经讲到,P/S模式总消息可能不被消费。这就好比听广播,如果我们要完整的听一个节目,肯定是在节目开始之前把收音机打开,如果我们在节目开始之后再打开收音机,肯定听不完全。在运行玩发布者之后,如果上趟厕所,回来再运行订阅者,你会发现一条消息都收不到。说到此,有个问题,如果我起两个消费者,会不会同时收到信息呢?答案是不会,不信可以试试。最简单的做法就是在写消费者测试类的时候调用两次接收方法。

    至此我们的第一个完整的程序成功完成。下面将MQspring结合使用。

    五、Queue(点对点)springactiveMQ整合

    在做上面的测试例子的时候我们可以看出发送一个简单的消息,要几行代码!其实我们的目的就是发送和接受消息。幸运的是Spring为我们提供了大量的模板,当然也有JMSTemplate模板。我们还从queue方式开始学习。

    1、引入spring相关文件和配置

    我使用的是spring版本是3.1.0.M2,其他版本的也可以,只是配置不同,去spring官网下载zip包,解开后将dist目录下的所有jar包(根据自己选择)拷贝到项目lib目录下并加入项目library中,具体可以参考上面步骤。

    src目录下新建applicationContext.xml文件并输入一下内容:

    <?xml version="1.0" encoding="GBK"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xmlns:aop="http://www.springframework.org/schema/aop"

    xmlns:tx="http://www.springframework.org/schema/tx"

    xmlns:context="http://www.springframework.org/schema/context"

    xsi:schemaLocation="

              http://www.springframework.org/schema/beans

              http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

              http://www.springframework.org/schema/tx

              http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

              http://www.springframework.org/schema/context

              http://www.springframework.org/schema/context/spring-context-3.0.xsd

              http://www.springframework.org/schema/aop

              http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"

    default-autowire="byName">

    </beans>

    打开web.xml并将其内容修改为以下内容:

    <?xml version="1.0" encoding="GBK"?>

    <web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee

    http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">

    <context-param>

    <param-name>contextConfigLocation</param-name>

    <param-value>classpath*:applicationContext*.xml</param-value>

    </context-param>

    <servlet>

    <servlet-name>spring</servlet-name>

    <servlet-class>

    org.springframework.web.servlet.DispatcherServlet

    </servlet-class>

    <load-on-startup>1</load-on-startup>

    </servlet>

    <servlet-mapping>

    <servlet-name>spring</servlet-name>

    <url-pattern>/</url-pattern>

    </servlet-mapping>

    <welcome-file-list>

    <welcome-file>index.jsp</welcome-file>

    </welcome-file-list>

    </web-app>

    2、配置JMSTemplate模板

    类似于jdbcTemplate,首先要配置一个ConnectionFactory,之后要开始配置JmsTemplate模板了。最后是配置消息目标了。消息分为队列和主题两大类。在applicationContext.xml中加入如下内容:

    <!-- 配置JMS连接工厂 -->

    <bean id="connectionFactory"

    class="org.apache.activemq.ActiveMQConnectionFactory">

    <property name="brokerURL" value="tcp://localhost:61616" />

    </bean>

    <!-- 发送消息的目的地(队列) -->

    <bean id="queueDest"

    class="org.apache.activemq.command.ActiveMQQueue">

    <!-- 设置消息队列的名字 -->

    <constructor-arg index="0" value="myQueue" />

    </bean>

    <!-- 配置QueueJms模板  -->

    <bean id="jmsQueueTemplate"

    class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="defaultDestination" ref="queueDest" />

    <property name="receiveTimeout" value="10000" />

    </bean>

    receiveTimeout表示接收消息时的超时时间,设置的为10秒,我建议在配置中一定要设置这个时间,因为如果不设置的话,加入接收消息时是阻塞着的,那么将一直阻塞下去。配置完成了,那么如何使用JmsTemplate发送消息呢?

    springbeanfactory得到一个jmsTemplate的实例和消息目标的实例,发送消息,够简单的吧。首先我们还从queue方式开始。下面我们就来编写具体代码。

    3、编写代码

    新建类ProducerService.java,界面如下:

    添加一个send方法。代码如下:

    package com.mq.queue.spring;

     

    /**

     * 发送者接口

     *

     */

    public interface ProducerService {

    /**

     * 发送方法

     */

    public void send();

    }

     

    新建一个生产者实现类ProducerServiceImpl.java,界面略,代码如下:

    package com.mq.queue.spring;

     

    import java.util.Date;

     

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.Session;

     

    import org.springframework.jms.core.JmsTemplate;

    import org.springframework.jms.core.MessageCreator;

     

    public class ProducerServiceImpl implements ProducerService {

     

    JmsTemplate jmsTemplate;

     

    Destination destination;

    public void send() {

    MessageCreator messageCreator = new MessageCreator(){

     

    public Message createMessage(Session session) throws JMSException {

    MapMessage message = session.createMapMessage();

    Date date = new Date();

    message.setLong("count", date.getTime());

    System.out.println("--发送消息:"+date);

    return message;

    }

    };

    jmsTemplate.send(this.destination,messageCreator);

    }

    }

     

    生产者编写完了,下面我们来编写消费者,上面说了,发送消息的时候,springbeanfactory得到一个jmsTemplate的实例和消息目标的实例,然后发送,那么接受的时候肯定也是得到一个jmsTemplate的实例和消息目标的实例,然后接受,下面我们来看具体代码。

    新建一个消费者的接口和实现类,具体代码如下:

    ConsumerService.java

    package com.mq.queue.spring;

     

    /**

     * @author yangdecai

     *消息者接口

     */

    public interface ConsumerService {

    /**

     * 接受信息

     */

    public void receive();

    }

    ConsumerServiceImpl.java

    package com.mq.queue.spring;

     

    import java.util.Date;

     

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

     

    import org.springframework.jms.core.JmsTemplate;

     

    public class ConsumerServiceImpl implements ConsumerService {

    JmsTemplate jmsTemplate;

     

    Destination destination;

     

    public void receive() {

    MapMessage message = (MapMessage) jmsTemplate.receive();

    try {

    System.out.println("--收到消息:" + new Date(message.getLong("count")));

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

     

    public void setJmsTemplate(JmsTemplate jmsTemplate) {

    this.jmsTemplate = jmsTemplate;

    }

     

    public void setDestination(Destination destination) {

    this.destination = destination;

    }

     

    }

     

    代码编写完毕,下面要进行bean的配置,在applicationContext.xml中加入如下代码:

    <bean id="producerService" class="com.mq.queue.spring.ProducerServiceImpl">

    <property name="jmsTemplate" ref="jmsQueueTemplate" />

    <property name="destination" ref="queueDest" />

    </bean>

     

    <bean id="consumerService" class="com.mq.queue.spring.ConsumerServiceImpl">

    <property name="jmsTemplate" ref="jmsQueueTemplate" />

    <property name="destination" ref="queueDest" />

    </bean>

    附上完整的applicationContext.xml的内容:

    <?xml version="1.0" encoding="GBK"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xmlns:aop="http://www.springframework.org/schema/aop"

    xmlns:tx="http://www.springframework.org/schema/tx"

    xmlns:context="http://www.springframework.org/schema/context"

    xsi:schemaLocation="

              http://www.springframework.org/schema/beans

              http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

              http://www.springframework.org/schema/tx

              http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

              http://www.springframework.org/schema/context

              http://www.springframework.org/schema/context/spring-context-3.0.xsd

              http://www.springframework.org/schema/aop

              http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"

    default-autowire="byName">

    <bean id="producerService" class="com.mq.queue.spring.ProducerServiceImpl">

    <property name="jmsTemplate" ref="jmsQueueTemplate" />

    <property name="destination" ref="queueDest" />

    </bean>

     

    <bean id="consumerService" class="com.mq.queue.spring.ConsumerServiceImpl">

    <property name="jmsTemplate" ref="jmsQueueTemplate" />

    <property name="destination" ref="queueDest" />

    </bean>

    <!-- 配置JMS连接工厂 -->

    <bean id="connectionFactory"

    class="org.apache.activemq.ActiveMQConnectionFactory">

    <property name="brokerURL" value="tcp://localhost:61616" />

    </bean>

    <!-- 发送消息的目的地(队列) -->

    <bean id="queueDest"

    class="org.apache.activemq.command.ActiveMQQueue">

    <!-- 设置消息队列的名字 -->

    <constructor-arg index="0" value="myQueue" />

    </bean>

    <!-- 配置Jms模板  -->

    <bean id="jmsQueueTemplate"

    class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="defaultDestination" ref="queueDest" />

    <property name="receiveTimeout" value="10000" />

    </bean>

    <!-- 发送消息的目的地(主题) -->

    </beans>

    4、编写测试程序

    需要的业务代码都已编写完毕,下面编写测试代码。新建一个生产者的测试类ProducerTest.java。具体代码如下:

    package com.mq.queue.spring;

     

    import org.springframework.context.ApplicationContext;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

     

    public class ProducerTest {

     

    private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");

     

    private static void send() {

    ProducerService producerService = (ProducerService) appContext.getBean("producerService");

    producerService.send();

    }

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    send();

    }

    }

    再建一个消费者的测试类,ConsumerTest.java,具体代码如下:

    package com.mq.queue.spring;

     

    import org.springframework.context.ApplicationContext;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

     

    public class ConsumerTest {

     

    private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");

     

    private static void receive() {

    ConsumerService consumerService = (ConsumerService) appContext.getBean("consumerService");

    consumerService.receive();

    }

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    receive();

    }

    }

    5、运行程序

    所有代码都编写完了,我们来看一下我们的劳动成果。运行生产者测试类。控制台打印出如下内容,画线标注的就是我们发送的内容:

    运行消费者测试类。控制台打印出如下内容,画线标注就是我们接受的内容:

    两者是一样的。下面我进行Topic方式的学习。

    六、Topic(发布/订阅)springactiveMQ整合

    在上面的代码中我们可以看出,其实整合很简单就是做个配置,将原来我们写的一些初始化的工作简化了。接下来就不进行繁琐的步骤了,直接上代码。

    1、配置JMSTemplate模版

    在配置文件中增加如下配置:

    <!-- 发送消息的目的地(主题) -->

    <bean id="topicDest"

    class="org.apache.activemq.command.ActiveMQTopic">

    <!-- 设置消息队列的名字 -->

    <constructor-arg index="0" value="myTopic" />

    </bean>

    <!-- 配置TopicJms模板  -->

    <bean id="jmsTopicTemplate"

    class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="defaultDestination" ref="topicDest" />

    <!-- 配置是否为发布订阅者模式,默认为false -->

    <property name="pubSubDomain" value="true"/>

    <property name="receiveTimeout" value="10000" />

    2、编写代码

    新建发布者接口和实现类,内容如下:

    PublisherService.java

    package com.mq.topic.spring;

     

    /**

     * 发布者接口

     *

     */

    public interface PublisherService {

    /**

     * 发送方法

     */

    public void send();

    }

    PublisherServiceImpl.java

    package com.mq.topic.spring;

     

    import java.util.Date;

     

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.Session;

     

    import org.springframework.jms.core.JmsTemplate;

    import org.springframework.jms.core.MessageCreator;

     

    public class PublisherServiceImpl implements PublisherService {

     

    JmsTemplate jmsTemplate;

     

    Destination destination;

    public void send() {

    MessageCreator messageCreator = new MessageCreator(){

    public Message createMessage(Session session) throws JMSException {

    MapMessage message = session.createMapMessage();

    Date date = new Date();

    message.setLong("count", date.getTime());

    System.out.println("--发送消息:"+date);

    return message;

    }

    };

    jmsTemplate.send(this.destination,messageCreator);

    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {

    this.jmsTemplate = jmsTemplate;

    }

    public void setDestination(Destination destination) {

    this.destination = destination;

    }

     

    }

    再新建一个订阅者接口并增加两个实现。

    SuscriberService.java

    package com.mq.topic.spring;

     

    /**

     *订阅者接口

     */

    public interface SubscriberService {

    /**

     * 接受信息

     */

    public void receive();

    }

    FirstSubscriberServiceImpl.java

    package com.mq.topic.spring;

     

    import java.util.Date;

     

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

     

    import org.springframework.jms.core.JmsTemplate;

     

    public class FirstSubscriberServiceImpl implements SubscriberService {

    JmsTemplate jmsTemplate;

     

    Destination destination;

    public void receive() {

    MapMessage message = (MapMessage) jmsTemplate.receive();

    try {

    System.out.println("--订阅者一收到消息:" + new Date(message.getLong("count")));

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

     

    public void setJmsTemplate(JmsTemplate jmsTemplate) {

    this.jmsTemplate = jmsTemplate;

    }

     

    public void setDestination(Destination destination) {

    this.destination = destination;

    }

     

    }

    SecondSubscriberServiceImpl.java

    package com.mq.topic.spring;

     

    import java.util.Date;

     

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

     

    import org.springframework.jms.core.JmsTemplate;

     

    public class SecondSubscriberServiceImpl implements SubscriberService {

    JmsTemplate jmsTemplate;

     

    Destination destination;

    public void receive() {

    MapMessage message = (MapMessage) jmsTemplate.receive();

    try {

    System.out.println("--订阅者二收到消息:" + new Date(message.getLong("count")));

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

     

    public void setJmsTemplate(JmsTemplate jmsTemplate) {

    this.jmsTemplate = jmsTemplate;

    }

     

    public void setDestination(Destination destination) {

    this.destination = destination;

    }

    }

    在配置文件中增加如下配置:

    <bean id="publisherService" class="com.mq.topic.spring.PublisherServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicDest" />

    </bean>

     

    <bean id="firstSubscriberService" class="com.mq.topic.spring.FirstSubscriberServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicDest" />

    </bean>

    <bean id="secondSubscriberService" class="com.mq.topic.spring.SecondSubscriberServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicDest" />

    </bean>

    附上applicationContext.xml的全部内容:

    <?xml version="1.0" encoding="GBK"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xmlns:aop="http://www.springframework.org/schema/aop"

    xmlns:tx="http://www.springframework.org/schema/tx"

    xmlns:context="http://www.springframework.org/schema/context"

    xsi:schemaLocation="

              http://www.springframework.org/schema/beans

              http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

              http://www.springframework.org/schema/tx

              http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

              http://www.springframework.org/schema/context

              http://www.springframework.org/schema/context/spring-context-3.0.xsd

              http://www.springframework.org/schema/aop

              http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"

    default-autowire="byName">

    <bean id="producerService" class="com.mq.queue.spring.ProducerServiceImpl">

    <property name="jmsTemplate" ref="jmsQueueTemplate" />

    <property name="destination" ref="queueDest" />

    </bean>

     

    <bean id="consumerService" class="com.mq.queue.spring.ConsumerServiceImpl">

    <property name="jmsTemplate" ref="jmsQueueTemplate" />

    <property name="destination" ref="queueDest" />

    </bean>

    <bean id="publisherService" class="com.mq.topic.spring.PublisherServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicDest" />

    </bean>

     

    <bean id="firstSubscriberService" class="com.mq.topic.spring.FirstSubscriberServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicDest" />

    </bean>

    <bean id="secondSubscriberService" class="com.mq.topic.spring.SecondSubscriberServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicDest" />

    </bean>

    <!-- 配置JMS连接工厂 -->

    <bean id="connectionFactory"

    class="org.apache.activemq.ActiveMQConnectionFactory">

    <property name="brokerURL" value="tcp://localhost:61616" />

    </bean>

    <!-- 发送消息的目的地(队列) -->

    <bean id="queueDest"

    class="org.apache.activemq.command.ActiveMQQueue">

    <!-- 设置消息队列的名字 -->

    <constructor-arg index="0" value="myQueue" />

    </bean>

    <!-- 配置QueueJms模板  -->

    <bean id="jmsQueueTemplate"

    class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="defaultDestination" ref="queueDest" />

    <property name="receiveTimeout" value="10000" />

    </bean>

    <!-- 发送消息的目的地(主题) -->

    <bean id="topicDest"

    class="org.apache.activemq.command.ActiveMQTopic">

    <!-- 设置消息队列的名字 -->

    <constructor-arg index="0" value="myTopic" />

    </bean>

    <!-- 配置TopicJms模板  -->

    <bean id="jmsTopicTemplate"

    class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="defaultDestination" ref="topicDest" />

    <!-- 配置是否为发布订阅者模式,默认为false -->

    <property name="pubSubDomain" value="true"/>

    <property name="receiveTimeout" value="10000" />

    </bean>

    </beans>

    3、编写测试程序

    按照国际惯例,下面我们来编写测试程序。

    发布者测试类,PublisherTest.java

    package com.mq.topic.spring;

     

    import org.springframework.context.ApplicationContext;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

     

    public class PublisherTest {

     

    private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");

     

    private static void send() {

    PublisherService publisherService = (PublisherService) appContext.getBean("publisherService");

    publisherService.send();

    }

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    send();

    }

    }

    第一个订阅者测试类,FirstSubscriberTest.java

    package com.mq.topic.spring;

     

    import org.springframework.context.ApplicationContext;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

     

    public class FirstSubscriberTest {

     

    private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");

     

    private static void receive() {

    SubscriberService firstSubscriberService = (SubscriberService) appContext.getBean("firstSubscriberService");

    firstSubscriberService.receive();

    }

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    receive();

    }

    }

    第二个订阅者测试类,SecondSubscriberTest.java

    package com.mq.topic.spring;

     

    import org.springframework.context.ApplicationContext;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

     

    public class SecondSubscriberTest {

     

    private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");

     

    private static void receive() {

    SubscriberService secondSubscriberService = (SubscriberService) appContext.getBean("secondSubscriberService");

    secondSubscriberService.receive();

    }

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    receive();

    }

    }

    按照前面说的顺序运行测试类。会发现两个订阅者都收到了发布者发布的信息。控制台信息,我就不截图了。可能部分童鞋的应用报错了,错误信息如下:

    如果出现以上错误,是因为取信息的时候没有取到导致空指针。这个应该很容易理解吧,不可能消费一个没有的东西。 如何解决呢?运行订阅者和发布者之间的时间间隔小一些就可以,当然这个只是简单的解决方法。有人问了,怎么每次接受消息的时候都要手动运行接收者呢?有没有一个不用每次都运行的解决方案呢?当然有了,不用spring的东西就是在接受的部分写一个循环,循环去取。Spring也想到了这个问题,而且还提供了一个不错的解决方案,spring可是个大神呐,我们想到的和没想到的,人家都想到。下面我们来学习一下,后面就不分开江苏QueueTopic模式,经过前面的学习,相信大家对这两种方式已经了解的差不多了。废话不多说,下面开始吧。

    七、spingActiveMQ的完全整合

    对于让spring管理监听的实现方式有两种方法,一种是自己写监听器,然后交给spring的监听适配器管理,再由监听容器管理监听适配器,另一种是写一个实现MessageListener接口的类

    1、编写消费者相关监听器

    ·Queue(点对点)方式

    先建要进行监听的接口及其实现。

    ConsumerListenerService.java

    package com.mq.queue.listener;

     

    import java.util.HashMap;

     

    public interface ConsumerListenerService {

    public void receive(HashMap message);

    }

    实现类ConsumerListenerServiceImpl.java

    package com.mq.queue.listener;

     

    import java.util.Date;

    import java.util.HashMap;

     

    public class ConsumerListenerServiceImpl implements ConsumerListenerService {

     

    public void receive(HashMap message) {

    System.out.println("--Listener收到消息:" + new Date(new Long((Long) message.get("count"))));

    }

    }

    ·Topic(发布/订阅)方式

    新建订阅者接口及其两个实现者:

    SubscriberListenerService.java

    package com.mq.topic.listener;

     

    import java.util.HashMap;

     

    public interface SubscriberListenerService {

     

    public void receive(HashMap message);

    }

    第一个实现类FirstSubscriberListenerServiceImpl.java

    package com.mq.topic.listener;

     

    import java.util.Date;

    import java.util.HashMap;

     

    public class FirstSubscriberListenerServiceImpl implements

    SubscriberListenerService {

     

    public void receive(HashMap message) {

    System.out.println("--订阅者一Listener收到消息:" + new Date(new Long((Long) message.get("count"))));

    }

     

    }

    第二个实现类SecondSubscriberListenerServiceImpl.java

    package com.mq.topic.listener;

     

    import java.util.Date;

    import java.util.HashMap;

     

    public class SecondSubscriberListenerServiceImpl implements

    SubscriberListenerService {

     

    public void receive(HashMap message) {

    System.out.println("--订阅者二Listener收到消息:" + new Date(new Long((Long) message.get("count"))));

    }

     

    }

    2、配置监听

    applicationContext.xml中增加bean配置:

    <!-- 配置队列消费者监听 -->

    <bean id="consumerListenerService" class="com.mq.queue.listener.ConsumerListenerServiceImpl">

    </bean>

    <!-- 配置主题订阅者监听 -->

    <bean id="firstSubscriberListenerService" class="com.mq.topic.listener.FirstSubscriberListenerServiceImpl">

    </bean>

    <bean id="secondSubscriberListenerService" class="com.mq.topic.listener.SecondSubscriberListenerService">

    </bean>

    增加监听器适配器和监听容器:

    <!-- 配置队列消费者监听 -->

    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  

            <constructor-arg  ref="consumerListenerService" />  

            <property name="defaultListenerMethod" value="receive" />  

        </bean>

        <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

            <property name="connectionFactory" ref="connectionFactory" /> 

            <property name="destination" ref="queueDest" /> 

            <property name="messageListener" ref="queueListener" />  

    </bean>   

     

    <!-- 配置主题订阅者监听 -->

    <bean id="firstSubscriberListenerService" class="com.mq.topic.listener.FirstSubscriberListenerServiceImpl">

    </bean>

    <bean id="secondSubscriberListenerService" class="com.mq.topic.listener.SecondSubscriberListenerService">

    </bean>

    <!-- 配置监听适配器 -->

    <!-- 配置主题订阅者监听适配器 -->

        <bean id="firstTopicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  

            <constructor-arg ref="firstSubscriberListenerService" />  

            <property name="defaultListenerMethod" value="receive" />  

        </bean>  

        <bean id="secondTopicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  

            <constructor-arg ref="secondSubscriberListenerService" />  

            <property name="defaultListenerMethod" value="receive" />  

        </bean>

        

    <!-- 配置监听适容器 -->

        <!-- 配置订阅者监听适容器 -->

        <bean id="firstTopicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

            <property name="connectionFactory" ref="connectionFactory" />  

            <property name="destination" ref="topicDest" />  

            <property name="messageListener" ref="topicListenerA" />  

        </bean>  

      

        <bean id="secondTopicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

            <property name="connectionFactory" ref="connectionFactory" />  

            <property name="destination" ref="topicDest" />  

            <property name="messageListener" ref="topicListenerB" />  

        </bean>

    以上的ListenerAdapterListenerContainer配置,从上面的介绍应该很容知道上述配置的目的。

    附上完整的配置文件:

    <?xml version="1.0" encoding="GBK"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xmlns:aop="http://www.springframework.org/schema/aop"

    xmlns:tx="http://www.springframework.org/schema/tx"

    xmlns:context="http://www.springframework.org/schema/context"

    xsi:schemaLocation="

              http://www.springframework.org/schema/beans

              http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

              http://www.springframework.org/schema/tx

              http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

              http://www.springframework.org/schema/context

              http://www.springframework.org/schema/context/spring-context-3.0.xsd

              http://www.springframework.org/schema/aop

              http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"

    default-autowire="byName">

    <!-- 配置JMS连接工厂 -->

    <bean id="connectionFactory"

    class="org.apache.activemq.ActiveMQConnectionFactory">

    <property name="brokerURL" value="tcp://localhost:61616" />

    </bean>

    <!-- Queue方式 -->

    <!-- 发送消息的目的地(队列) -->

    <bean id="queueDest"

    class="org.apache.activemq.command.ActiveMQQueue">

    <!-- 设置消息队列的名字 -->

    <constructor-arg index="0" value="myQueue" />

    </bean>

    <!-- 配置QueueJms模板  -->

    <bean id="jmsQueueTemplate"

    class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="defaultDestination" ref="queueDest" />

    <property name="receiveTimeout" value="10000" />

    </bean>

    <bean id="producerService" class="com.mq.queue.spring.ProducerServiceImpl">

    <property name="jmsTemplate" ref="jmsQueueTemplate" />

     <property name="destination" ref="queueDest" /> 

    </bean>

    <bean id="consumerService" class="com.mq.queue.spring.ConsumerServiceImpl">

    <property name="jmsTemplate" ref="jmsQueueTemplate" />

    <property name="destination" ref="queueDest" />

    </bean>

    <!-- 配置队列消费者监听 -->

    <bean id="consumerListenerService" class="com.mq.queue.listener.ConsumerListenerServiceImpl">

    </bean>

    <!-- 配置监听适配器 -->

    <!-- 配置队列消费者监听适配器 -->

    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  

            <constructor-arg  ref="consumerListenerService" />  

            <property name="defaultListenerMethod" value="receive" />  

        </bean>

        

        <!-- 配置监听适容器 -->

        <!-- 配置队列消费者监听适容器 -->

        <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

            <property name="connectionFactory" ref="connectionFactory" /> 

            <property name="destination" ref="queueDest" /> 

            <property name="messageListener" ref="queueListener" />  

        </bean>

        

        

        

        <!-- Topic方式 -->

        <!-- 发送消息的目的地(主题) -->

    <bean id="topicDest"

    class="org.apache.activemq.command.ActiveMQTopic">

    <!-- 设置消息队列的名字 -->

    <constructor-arg index="0" value="myTopic" />

    </bean>

    <!-- 配置TopicJms模板  -->

    <bean id="jmsTopicTemplate"

    class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="defaultDestination" ref="topicDest" />

    <!-- 配置是否为发布订阅者模式,默认为false -->

    <property name="pubSubDomain" value="true"/>

    <property name="receiveTimeout" value="10000" />

    </bean>

    <!-- 发布订阅配置 -->

    <bean id="publisherService" class="com.mq.topic.spring.PublisherServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicDest" />

    </bean>

    <bean id="firstSubscriberService" class="com.mq.topic.spring.FirstSubscriberServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicDest" />

    </bean>

    <bean id="secondSubscriberService" class="com.mq.topic.spring.SecondSubscriberServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicDest" />

    </bean>

    <!-- 配置主题订阅者监听 -->

    <bean id="firstSubscriberListenerService" class="com.mq.topic.listener.FirstSubscriberListenerServiceImpl">

    </bean>

    <bean id="secondSubscriberListenerService" class="com.mq.topic.listener.SecondSubscriberListenerService">

    </bean>

    <!-- 配置监听适配器 -->

    <!-- 配置主题订阅者监听适配器 -->

        <bean id="firstTopicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  

            <constructor-arg ref="firstSubscriberListenerService" />  

            <property name="defaultListenerMethod" value="receive" />  

        </bean>  

        <bean id="secondTopicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  

            <constructor-arg ref="secondSubscriberListenerService" />  

            <property name="defaultListenerMethod" value="receive" />  

        </bean>

        

    <!-- 配置监听适容器 -->

        <!-- 配置订阅者监听适容器 -->

        <bean id="firstTopicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

            <property name="connectionFactory" ref="connectionFactory" />  

            <property name="destination" ref="topicDest" />  

            <property name="messageListener" ref="firstTopicListener" />  

        </bean>  

      

        <bean id="secondTopicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

            <property name="connectionFactory" ref="connectionFactory" />  

            <property name="destination" ref="topicDest" />  

            <property name="messageListener" ref="secondTopicListener" />  

        </bean> 

    </beans>

    下面将项目发布到tomcat下,发布方式,怎么发布本文不再详细讨论,方式也很多。发布完成后运行tomcat,然后运行第四章中的ProducerTest类,控制台回打印出如下信息:

    再运行PublisherTest.java,控制台打印出如下信息:

     

    从后台信息我们看出,这样的配置是可以的,下面我们从另一种方式实现,即实现MessageListener接口的方式。

    3、编写借口实现类

    本章刚开始说了,要实现MessageListener接口,下面我们开始编写代码吧。

    ·Queue(点对点)方式

    ConsumerMessageListener类,一个实现了MessageListener接口的类。具体如下:

    package com.mq.queue.listener;

     

    import java.util.Date;

     

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.MessageListener;

     

    public class ConsumerMessageListener implements MessageListener {

     

    public void onMessage(Message msg) {

    if(msg instanceof MapMessage){

    MapMessage message = (MapMessage) msg;

    try {

    System.out.println("--MessageListener收到信息:"+new Date( message.getLong("count")));

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    }

     

    }

    ·Topic(发布/订阅)方式

    第一个订阅者实现类FirstMessageListener.java

    package com.mq.topic.listener;

     

    import java.util.Date;

     

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.MessageListener;

     

    public class FirstMessageListener implements MessageListener {

     

    public void onMessage(Message msg) {

    if (msg instanceof MapMessage) {

    MapMessage message = (MapMessage) msg;

    try {

    System.out.println("--订阅者一MessageListener收到信息:" + new Date(message.getLong("count")));

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    }

    }

    第二个订阅者实现类SecondMessageListener.java

    package com.mq.topic.listener;

     

    import java.util.Date;

     

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.MessageListener;

     

    public class SecondMessageListener implements MessageListener {

     

    public void onMessage(Message msg) {

    if(msg instanceof MapMessage){

    MapMessage message = (MapMessage) msg;

    try {

    System.out.println("--订阅者二MessageListener收到信息:"+new Date( message.getLong("count")));

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    }

    }

    4、编写接口的相关配置

    ·Queue方式的配置

     <!-- 实现接口的方式 -->

    <bean id="queueMessageListener" class="com.mq.queue.listener.ConsumerMessageListener">

    </bean>

    <!-- 新增一个队列地址名字不能跟上面的重复 -->

    <bean id="queueMessageListenerDest" class="org.apache.activemq.command.ActiveMQQueue">

    <constructor-arg index="0" value="myMessageListenerQueue" />

    </bean>

    <bean id="myMsgQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="destination" ref="queueMessageListenerDest" />

    <property name="messageListener" ref="queueMessageListener" />

    <property name="receiveTimeout" value="10000" />

    </bean>

    ·Topic方式的配置:

        <!-- 实现借口的方式 -->

        <bean id="firstMessageListener" class="com.mq.topic.listener.FirstMessageListener"></bean>

        <bean id="secondMessageListener" class="com.mq.topic.listener.SecondMessageListener"></bean>

        <bean id="topicMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">

        <constructor-arg index="0" value="myMessageListenerTopic"/>

        </bean>

        <bean id="myMsgFirstTopiclistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

        <property name="connectionFactory" ref="connectionFactory" />

    <property name="destination" ref="topicMessageListenerDest" />

    <property name="messageListener" ref="firstMessageListener" />

    <property name="pubSubDomain" value="true"/>

    <property name="receiveTimeout" value="10000" />

        </bean>

        <bean id="myMsgSecondTopiclistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

        <property name="connectionFactory" ref="connectionFactory" />

    <property name="destination" ref="topicMessageListenerDest" />

    <property name="messageListener" ref="secondMessageListener" />

    <property name="pubSubDomain" value="true"/>

    <property name="receiveTimeout" value="10000" />

        </bean>

    这里我们新增了一队列名称,为的是跟上面的区别。发送者跟上面使用的一样,但是有一点需要说明一下,生产者的发送地址,要改一下,配成我们刚建的那个队列名称。修改后的内容如下:

    ·Queue方式的配置

    <bean id="producerService" class="com.mq.queue.spring.ProducerServiceImpl">

    <property name="jmsTemplate" ref="jmsQueueTemplate" />

    <property name="destination" ref="queueMessageListenerDest" />

    <!-- <property name="destination" ref="queueDest" /> -->

    </bean>

    ·Topic(发布/订阅)方式

    <bean id="publisherService" class="com.mq.topic.spring.PublisherServiceImpl">

    <property name="jmsTemplate" ref="jmsTopicTemplate" />

    <property name="destination" ref="topicMessageListenerDest" />

    <!-- <property name="destination" ref="topicDest" /> -->

    </bean>

    重新发布程序,并启动应用。然后运行上面使用到的ProducerTest类,控制台打印出如下信息:

    接着运行PublisherTest类,控制台打印出如下信息:

     

    说明这种方式也是可以的。至于这两种方式的选择,是具体情况而定,不过第二种方式用的要多一些。在测试第二种方式的时候,可能会为了省事,队列就配成了以前用的队列,结构打印出的信息画线部分可能跟上面的不一样,这是为什么呢?如果有两个消费者监听同一个队列,就像两个人同去取同一个东西,结果就是谁先取到谁得到,同样,监听器谁先取到就能收到信息,后监听到的就没取不到内容了。

    八、消息转换

    上面的例子中直接发送MapMessage类型的数据,如果我们自己要发送的数据不是MapMessage类型的那该怎么办?难道每次接受后都要增加一个转换方法么?其实spring早就考虑到这种情况了。转化器在很多组件中都是必不缺少的东西。SpringMessageConverter接口提供了对消息转换的支持。

    我们看一下转换接口的方法。有源码的看源码,没有的我们也可以看,新建一个类MsgConverter.java,实现MessageConverter接口。生成的代码如下:

    package com.mq;

     

    import javax.jms.JMSException;

    import javax.jms.Message;

    import javax.jms.Session;

     

    import org.springframework.jms.support.converter.MessageConversionException;

    import org.springframework.jms.support.converter.MessageConverter;

     

    public class MsgConverter implements MessageConverter {

     

    public Object fromMessage(Message arg0) throws JMSException,

    MessageConversionException {

    // TODO Auto-generated method stub

    return null;

    }

     

    public Message toMessage(Object arg0, Session arg1) throws JMSException,

    MessageConversionException {

    // TODO Auto-generated method stub

    return null;

    }

     

    }

    MessageConverter接口的两个方法简单明了。在发送端toMessage会将java对象转化为消息,在接收端fromMessage会将消息转化为java对象。

    1、转换类的相关代码和配置

    新建一个类MyMsg,就是一个简单的Pojo类。具体代码如下:

    package com.mq;

     

    public class MyMsg {

     

    private String id;

    private String name;

    public String getId() {

    return id;

    }

    public void setId(String id) {

    this.id = id;

    }

    public String getName() {

    return name;

    }

    public void setName(String name) {

    this.name = name;

    }

    }

     

    这个类是我们要转换的对象,然后我们针对上面的转换类做些实现。具体代码如下:

    package com.mq;

     

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.Session;

     

    import org.springframework.jms.support.converter.MessageConversionException;

    import org.springframework.jms.support.converter.MessageConverter;

     

    public class MsgConverter implements MessageConverter {

    public Object fromMessage(Message message) throws JMSException,

    MessageConversionException {

    if (!(message instanceof MapMessage)) {

    throw new MessageConversionException("Messae is not MapMessage");

    }

    System.out.println("--转换接收的消息--");

    MapMessage mapMessage = (MapMessage) message;

    MyMsg myMsg = new MyMsg();

    myMsg.setId(mapMessage.getString("id"));

    myMsg.setName(mapMessage.getString("name"));

    return myMsg;

    }

     

    public Message toMessage(Object obj, Session session) throws JMSException,

    MessageConversionException {

    if (!(obj instanceof MyMsg)) {

    throw new MessageConversionException("obj is not MyMsg");

    }

    System.out.println("--转换发送的消息--");

    MyMsg myMsg = (MyMsg) obj;

    MapMessage mapMessage = session.createMapMessage();

    mapMessage.setString("id", myMsg.getId());

    mapMessage.setString("name", myMsg.getName());

    return mapMessage;

    }

    }

    代码很简单就是做些转换。此时,发送和接收消息要换成template.convertAndSend(message)template.receiveAndConvert()。接下来我做一些配置,让spring知道我们的转换类。修改applicationContext.xmljms模版配置的代码,修改后的代码如下:

    <bean id="msgConverter" class="com.mq.MsgConverter"></bean>

    <!-- 配置Jms模板  -->

    <bean id="jmsTemplate"

    class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="defaultDestination" ref="destination" />

    <property name="receiveTimeout" value="10000" />

    <property name="messageConverter" ref="msgConverter"></property>

    </bean>

    2、业务相关代码和配置

    ProducerService.java增加convertAndSend()方法并在其实现类中实现,实现类的代码如下:

    public void convertAndSend(){

    MyMsg myMsg = new MyMsg();

    myMsg.setId("1");

    myMsg.setName("first msg");

    System.out.println("--发送消息:myMsg.id"+myMsg.getId()+"myMsg.name"+myMsg.getName());

    jmsTemplate.convertAndSend(this.destination, myMsg);

    }

    同样在ConsumerService.java中增加receiveAndConvert()方法并在其实现类中实现,实现类的代码如下:

    public void receiveAndConvert() {

    MyMsg myMsg = (MyMsg)jmsTemplate.receiveAndConvert();

    System.out.println("--收到消息:myMsg.id"+myMsg.getId()+"myMsg.name"+myMsg.getName());

    }

    修改我们的两个测试类,增加对转换方法的调用,不再赘述,直接上代码:

    ProducerTest.java

    package com.mq.service;

     

    import org.springframework.context.ApplicationContext;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

     

    public class ProducerTest {

     

    private static ApplicationContext appContext = new ClassPathXmlApplicationContext(

    "applicationContext.xml");

     

    private static void send() {

    ProducerService producerService = (ProducerService) appContext

    .getBean("producerService");

    producerService.send();

    }

    private static void convertAndSend() {

    ProducerService producerService = (ProducerService) appContext

    .getBean("producerService");

    producerService.convertAndSend();

    }

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    //send();

    convertAndSend();

    }

     

    }

    ConsumerTest.java

    package com.mq.service;

     

    import org.springframework.context.ApplicationContext;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

     

    public class ConsumerTest {

     

    private static ApplicationContext appContext = new ClassPathXmlApplicationContext(

    "applicationContext.xml");

     

    private static void receive() {

    ConsumerService consumerService = (ConsumerService) appContext

    .getBean("consumerService");

    consumerService.receive();

    }

    private static void receiveAndConvert() {

    ConsumerService consumerService = (ConsumerService) appContext

    .getBean("consumerService");

    consumerService.receiveAndConvert();

    }

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    //receive();

    receiveAndConvert();

    }

     

    }

    代码编写完毕,我们看一下我们的劳动成果。首先运行生产者类控制台信息如下:

    咋滴没成功?先接着看吧,下面有解决方法。从日志可以看出,确实调用了我们写的转换类,至于转换结果,还不清楚,我们运行一下消费者类,看一下后台信息:

    收到的内容与发的内容相同,说明转换成功了。如果这一部分的程序使用的队列跟上面的一样,那你会发现发送的时候打印出的信息不值上面的一个,还包括一个接收的信息,这是为什么呢?了解spring原理的人应该知道,spring是把所有类都加载到内容中,当然也包括我们上门写的按个实现MessageListener的一个消费者类,他们也在运行,如果监听的地址跟你送的地址正好相同的话,他也有可能收到这个信息。所以在测试的时候要注意修改他们的地址。

    3、监听器上的使用方式

    我再来学习一下跟监听器联合使用的方式,只在发布订阅者模式上演示一下。我们先来修改发布者的实现方式,在发布者借口中增加convertAndSend方法并在其实现类中实现,修改后的代码如下:

    PublisherService.java

    package com.mq.topic.spring;

     

    /**

     * 发布者接口

     *

     */

    public interface PublisherService {

    /**

     * 发送方法

     */

    public void send();

    public void convertAndSend(Object obj);

    }

    PublisherServiceImpl.java

    package com.mq.topic.spring;

     

    import java.util.Date;

     

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.Session;

     

    import org.springframework.jms.core.JmsTemplate;

    import org.springframework.jms.core.MessageCreator;

     

    public class PublisherServiceImpl implements PublisherService {

     

    JmsTemplate jmsTemplate;

     

    Destination destination;

    public void send() {

    MessageCreator messageCreator = new MessageCreator(){

    public Message createMessage(Session session) throws JMSException {

    MapMessage message = session.createMapMessage();

    Date date = new Date();

    message.setLong("count", date.getTime());

    System.out.println("--发送消息:"+date);

    return message;

    }

    };

    jmsTemplate.send(this.destination,messageCreator);

    }

    public void convertAndSend(Object obj) {

    System.out.println("--发送JAVA对象...");

    jmsTemplate.convertAndSend(destination, obj);

    }

     

    public void setJmsTemplate(JmsTemplate jmsTemplate) {

    this.jmsTemplate = jmsTemplate;

    }

    public void setDestination(Destination destination) {

    this.destination = destination;

    }

    }

    我们来修改一下两个订阅者的实现,修改后的内容如下:

    FirstMessageListener.java

    package com.mq.topic.listener;

     

    import javax.jms.Message;

    import javax.jms.MessageListener;

    import javax.jms.ObjectMessage;

     

    import com.mq.MyMsg;

     

    public class FirstMessageListener implements MessageListener {

     

    public void onMessage(Message msg) {

    if (msg instanceof ObjectMessage) {

    MyMsg myMsg = (MyMsg)((ObjectMessage) msg);

    System.out.println("--订阅者一MessageListener收到信息:id" +myMsg.getId()+";name:"+myMsg.getName());

    }

    //if (msg instanceof MapMessage) {

    //MapMessage message = (MapMessage) msg;

    //try {

    //System.out.println("--订阅者一MessageListener收到信息:" + new Date(message.getLong("count")));

    //} catch (JMSException e) {

    //e.printStackTrace();

    //}

    //}

    }

    }

    SecondMessageListener.java

    package com.mq.topic.listener;

     

    import javax.jms.Message;

    import javax.jms.MessageListener;

    import javax.jms.ObjectMessage;

     

    import com.mq.MyMsg;

     

    public class SecondMessageListener implements MessageListener {

     

    public void onMessage(Message msg) {

    if (msg instanceof ObjectMessage) {

    MyMsg myMsg = (MyMsg)((ObjectMessage) msg);

    System.out.println("--订阅者二MessageListener收到信息:id" +myMsg.getId()+";name:"+myMsg.getName());

    }

    //if(msg instanceof MapMessage){

    //MapMessage message = (MapMessage) msg;

    //try {

    //System.out.println("--订阅者二MessageListener收到信息:"+new Date( message.getLong("count")));

    //} catch (JMSException e) {

    //e.printStackTrace();

    //}

    //}

    }

    }

    修改上面的发布测试类,修改增加对新增方法的调用,修改后的内容如下:

    package com.mq.topic.spring;

     

    import org.springframework.context.ApplicationContext;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

     

    import com.mq.MyMsg;

     

    public class PublisherTest {

     

    private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");

     

    private static void send() {

    PublisherService publisherService = (PublisherService) appContext.getBean("publisherService");

    publisherService.send();

    }

    private static void convertAndSend() {

    PublisherService publisherService = (PublisherService) appContext.getBean("publisherService");

    MyMsg myMsg = new MyMsg();

    myMsg.setId("1");

    myMsg.setName("first");

    publisherService.convertAndSend(myMsg);

    }

     

    /**

     * @param args

     */

    public static void main(String[] args) {

    //send();

    convertAndSend();

    }

    }

    我们来运行一下发布者的测试程序吧。呀……报错了,是滴我的也报错了……错误如下:

     

    类转换异常,仔细看一下程序,是接收的时候做转换的时候报错,看了一下别人的代码,发现再获得ObjectMessage对象之后需要在得到Object才能得到我们传递的对象,汗……修改一下代码再测试一下吧。

    FirstMessageListener.java

    package com.mq.topic.listener;

     

    import javax.jms.JMSException;

    import javax.jms.Message;

    import javax.jms.MessageListener;

    import javax.jms.ObjectMessage;

     

    import com.mq.MyMsg;

     

    public class FirstMessageListener implements MessageListener {

     

    public void onMessage(Message msg) {

    if (msg instanceof ObjectMessage) {

    MyMsg myMsg;

    try {

    myMsg = (MyMsg)(((ObjectMessage) msg).getObject());

    System.out.println("--订阅者一MessageListener收到信息:id" +myMsg.getId()+";name:"+myMsg.getName());

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    //if (msg instanceof MapMessage) {

    //MapMessage message = (MapMessage) msg;

    //try {

    //System.out.println("--订阅者一MessageListener收到信息:" + new Date(message.getLong("count")));

    //} catch (JMSException e) {

    //e.printStackTrace();

    //}

    //}

    }

    }

    SecondMessageListener.java

    package com.mq.topic.listener;

     

    import javax.jms.JMSException;

    import javax.jms.Message;

    import javax.jms.MessageListener;

    import javax.jms.ObjectMessage;

     

    import com.mq.MyMsg;

     

    public class SecondMessageListener implements MessageListener {

     

    public void onMessage(Message msg) {

    if (msg instanceof ObjectMessage) {

    MyMsg myMsg;

    try {

    myMsg = (MyMsg)(((ObjectMessage) msg).getObject());

    System.out.println("--订阅者二MessageListener收到信息:id" +myMsg.getId()+";name:"+myMsg.getName());

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    //if(msg instanceof MapMessage){

    //MapMessage message = (MapMessage) msg;

    //try {

    //System.out.println("--订阅者二MessageListener收到信息:"+new Date( message.getLong("count")));

    //} catch (JMSException e) {

    //e.printStackTrace();

    //}

    //}

    }

    }

    运行结果如下:

     

    偶了,至此,所有测试做完了。有问题的可以通过空间或微博和我交流。由于本人水平优先,文章提及到的有问题的地方,欢迎牛人拍砖哈。

    九、附录参考内容

    1http://yunzhongxia.iteye.com/blog/566727

    2http://www.haogongju.net/art/1186144

    3ActiveMQ in action

    4apache官网

    题外话:在接下来的时间,我会写一些高级应用和WMQ方面的文章,敬请关注本人百度空间和新浪微博。也欢迎各位通过一下两种途径和我进行交流,共同进步。推荐一下本人的微博,作为程序员,生活是非常枯燥的(一般都是很枯燥的,那些以此为乐的不涉及),平时要自己丰富一下自己的业余生活,看些搞笑微博之类的。我每天都会发布一些糗事,欢迎大家关注。另外我在网上看到过有句话——程序员活该你单身身的,程序员招谁惹谁了啊,我们就应该单身。不过话又说回来了,程序员的生活确实单调,感情不丰富,不会说话,但是不能怨我们,其实我们也不想这样。推荐关注新浪微博一千零一夜话,微博地址:http://weibo.com/1968007227/,每天发布一条夜话,你懂得,希望各位程序猿每天能给自己的另一位一句温馨的话,不要成为苦逼的娃。

    转载请注明原文地址: https://ju.6miu.com/read-39358.html

    最新回复(0)