ActiveMQ简单介绍+简单实例

    xiaoxiao2021-03-25  136

    1. JMS基本概念      JMS(Java Message Service) 即Java消息服务。它提供标准的产生、发送、接收消息的接口简化   企业   应用的开发。它支持两种消息通信模型:点到点(point-to-point)(P2P)模型和发布/订阅(Pub/Sub)模型。P2P 模型规定了一个消息只能有一个接收者;Pub/Sub 模型允许一个消息可以有多个接收者。       对于点到点模型,消息生产者产生一个消息后,把这个消息发送到一个Queue(队列)中,然后消息接收者再从这个Queue中读取,一旦这个消息被一个接收者读取之后,它就在这个Queue中消失了,所以一个消息只能被一个接收者消费。  

        与点到点模型不同,发布/订阅模型中,消息生产者产生一个消息后,把这个消息发送到一个Topic中,这个Topic可以同时有多个接收者在监听,当一个消息到达这个Topic之后,所有消息接收者都会收到这个消息。

    2.编程的结构

    2.1消息产生者向JMS发送消息的步骤  (1)创建连接使用的工厂类JMS ConnectionFactory  (2)使用管理对象JMS ConnectionFactory建立连接Connection  (3)使用连接Connection 建立会话Session  (4)使用会话Session和管理对象Destination创建消息生产者MessageSender  (5)使用消息生产者MessageSender发送消息  2.2消息消费者从JMS接受消息的步骤  (1)创建连接使用的工厂类JMS ConnectionFactory  (2)使用管理对象JMS ConnectionFactory建立连接Connection  (3)使用连接Connection 建立会话Session  (4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver  (5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver 

    消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

    1.下载ActiveMQ  去官方网站下载:http://activemq.apache.org/  我下载的时候是 ActiveMQ 5.8.0 Release版  2.运行ActiveMQ  解压缩apache-activemq-5.8.0-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。  启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。  3.创建Eclipse项目并运行  创建java project:ActiveMQ-5.8,新建lib文件夹  打开apache-activemq-5.8.0\lib目录  拷贝  activemq-broker-5.8.0.jar  activemq-client-5.8.0.jar  geronimo-j2ee-management_1.1_spec-1.0.1.jar  geronimo-jms_1.1_spec-1.1.1.jar  slf4j-api-1.6.6.jar  这5个jar文件到lib文件夹中,并Build Path->Add to Build Path  结构如图:    Sender.java 

    Java代码   package com.lm.activemq;      /**   * @Header: Sender.java   * 类描述:   * @author: lm   * @date 2013-7-17 上午10:52:42   * @Email    * @company 欢   * @addr 北京市朝阳区劲松   */   import javax.jms.Connection;   import javax.jms.ConnectionFactory;   import javax.jms.DeliveryMode;   import javax.jms.Destination;   import javax.jms.MessageProducer;   import javax.jms.Session;   import javax.jms.TextMessage;   import org.apache.activemq.ActiveMQConnection;   import org.apache.activemq.ActiveMQConnectionFactory;      public class Sender {       private static final int SEND_NUMBER = 5;          public static void main(String[] args) {           // ConnectionFactory :连接工厂,JMS 用它创建连接           ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS           // Provider 的连接           Connection connection = null// Session: 一个发送或接收消息的线程           Session session; // Destination :消息的目的地;消息发送给谁.           Destination destination; // MessageProducer:消息发送者           MessageProducer producer; // TextMessage message;           // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar           connectionFactory = new ActiveMQConnectionFactory(                   ActiveMQConnection.DEFAULT_USER,                   ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");           try { // 构造从工厂得到连接对象               connection = connectionFactory.createConnection();               // 启动               connection.start();               // 获取操作连接               session = connection.createSession(Boolean.TRUE,                       Session.AUTO_ACKNOWLEDGE);               // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置               destination = session.createQueue("FirstQueue");               // 得到消息生成者【发送者】               producer = session.createProducer(destination);               // 设置不持久化,此处学习,实际根据项目决定               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);               // 构造消息,此处写死,项目就是参数,或者方法获取               sendMessage(session, producer);               session.commit();           } catch (Exception e) {               e.printStackTrace();           } finally {               try {                   if (null != connection)                       connection.close();               } catch (Throwable ignore) {               }           }       }          public static void sendMessage(Session session, MessageProducer producer)               throws Exception {           for (int i = 1; i <= SEND_NUMBER; i++) {               TextMessage message = session.createTextMessage("ActiveMq 发送的消息"                       + i);               // 发送消息到目的地方                  System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);               producer.send(message);           }       }   }   Receiver.java  Java代码   package com.lm.activemq;      /**   * @Header: Receiver.java   * 类描述:   * @author: lm   * @date 2013-7-17 上午10:52:58   * @Email    * @company 欢   * @addr 北京市朝阳区劲松   */   import javax.jms.Connection;   import javax.jms.ConnectionFactory;   import javax.jms.Destination;   import javax.jms.MessageConsumer;   import javax.jms.Session;   import javax.jms.TextMessage;   import org.apache.activemq.ActiveMQConnection;   import org.apache.activemq.ActiveMQConnectionFactory;      public class Receiver {       public static void main(String[] args) {           // ConnectionFactory :连接工厂,JMS 用它创建连接           ConnectionFactory connectionFactory;           // Connection :JMS 客户端到JMS Provider 的连接           Connection connection = null;           // Session: 一个发送或接收消息的线程           Session session;           // Destination :消息的目的地;消息发送给谁.           Destination destination;           // 消费者,消息接收者           MessageConsumer consumer;           connectionFactory = new ActiveMQConnectionFactory(                   ActiveMQConnection.DEFAULT_USER,                   ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");           try {               // 构造从工厂得到连接对象               connection = connectionFactory.createConnection();               // 启动               connection.start();               // 获取操作连接               session = connection.createSession(Boolean.FALSE,                       Session.AUTO_ACKNOWLEDGE);               // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置               destination = session.createQueue("FirstQueue");               consumer = session.createConsumer(destination);               while (true) {                   // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s                   TextMessage message = (TextMessage) consumer.receive(100000);                   if (null != message) {                       System.out.println("收到消息" + message.getText());                   } else {                       break;                   }               }           } catch (Exception e) {               e.printStackTrace();           } finally {               try {                   if (null != connection)                       connection.close();               } catch (Throwable ignore) {               }           }       }   }   5.测试过程  先运行:Receiver.java  再运行:Sender.java  可以看到结果  Sender运行后:  发送消息:ActiveMq 发送的消息1  发送消息:ActiveMq 发送的消息2  发送消息:ActiveMq 发送的消息3  发送消息:ActiveMq 发送的消息4  发送消息:ActiveMq 发送的消息5  Receiver运行后:  收到消息ActiveMq 发送的消息1  收到消息ActiveMq 发送的消息2  收到消息ActiveMq 发送的消息3  收到消息ActiveMq 发送的消息4  收到消息ActiveMq 发送的消息5  要想看到不同的输出内容,通过点击如下图的按钮切换console    在Receiver.java中,可以设置一个时间,比如receive(500000),如下代码所示:  Java代码   TextMessage message = (TextMessage) consumer.receive(500000);   这个时候运行Receiver.java的话,会使得这个Receiver.java一直运行500秒,在eclipse中可以发现:  点击那个红色方块可以手动停止运行程序  黑色头发:http://heisetoufa.iteye.com/

    转载请注明原文地址: https://ju.6miu.com/read-13281.html

    最新回复(0)