ActiveMQ学习一

    xiaoxiao2021-04-19  77

    转自:http://www.cnblogs.com/xwdreamer/archive/2012/02/21/2360818.html

    1.下载ActiveMQ

    去官方网站下载:http://activemq.apache.org/

    2.运行ActiveMQ

    windows:解压缩apache-activemq-5.14.4-bin.zip,然后双击apache-activemq-5.14.4\bin\win64(32)\activemq.bat运行ActiveMQ程序。

    linux:解压缩apache-activemq-5.14.4-bin.tar.gz,然后进入apache-activemq-5.14.4\bin\linux-x86-64(32)目录下./activemq start,运行ActiveMQ程序。

    启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

    3.创建Eclipse项目并运行

    创建project:myActiveMQ,并导入activemq-all-5.14.4.jar Producer.java package com.test.activemq.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 * @author * */ public class Producer{ static final String HOST = "tcp://localhost:61616"; //static final String HOST = ActiveMQConnection.DEFAULT_BROKER_URL; static final String USER = ActiveMQConnection.DEFAULT_USER; static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; static final String QUEUE = "FirstQueue";//必须在activeQM里面有配置 public static void main(String[] args) { //jms 连接工厂 ConnectionFactory connectionFactory = null; //connection 生产者和消费者的连接对象 Connection connection = null; //session 一个发送或接收消息的连接操作 Session session = null; //destination 消息目的地 Destination destination = null; //MessageProducer 消息发送者 MessageProducer messageProducer = null; // user password host connectionFactory = new ActiveMQConnectionFactory(USER,PASSWORD,HOST); try { //获得连接对象 connection = connectionFactory.createConnection(); //启动 connection.start(); //连接操作 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //创建目的地 destination = session.createQueue(QUEUE); //创建生产者 messageProducer = session.createProducer(destination); // 设置不持久化 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } //发送消息 public static void sendMessage(Session session, MessageProducer producer){ try { for (int i = 0; i < 10; i++) { // TextMessage message = session.createTextMessage("activemq发送的消息"+i); // //发送消息到目的地 // System.out.println("发送消息:activemq发送的消息"+i); // producer.send(message); MapMessage message = session.createMapMessage(); message.setStringProperty("extra"+i, "extra map"+i); message.setString("message--"+i,"message map"+i); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } } } Consumer.java package com.test.activemq.queue; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer{ static final String HOST = "tcp://localhost:61616"; //static final String HOST = ActiveMQConnection.DEFAULT_BROKER_URL; static final String USER = ActiveMQConnection.DEFAULT_USER; static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; static final String QUEUE = "FirstQueue";//必须在activeQM里面有配置 public static void main(String[] args) { //jms 连接工厂 ConnectionFactory connectionFactory = null; //connection 生产者和消费者的连接对象 Connection connection = null; //session 一个发送或接收消息的连接操作 Session session = null; //destination 消息目的地 Destination destination = null; //消息接收者 MessageConsumer messageConsumer = null; //获取工厂类 connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, HOST); try { //获取连接的对象 connection = connectionFactory.createConnection(); //启动连接 connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(QUEUE); messageConsumer = session.createConsumer(destination); Enumeration names = connection.getMetaData().getJMSXPropertyNames(); while (names.hasMoreElements()) { String name = (String) names.nextElement(); System.out.println("属性"+name); } int i = 0; while (i<10) { // //设置接收者接收消息的时间,为了便于测试,这里谁定为100s // TextMessage message = (TextMessage) messageConsumer.receive(100000); // if (null != message) { // System.out.println("收到消息" + message.getText()); // } else { // break; // } //设置接收者接收消息的时间,为了便于测试,这里谁定为100s MapMessage message = (MapMessage) messageConsumer.receive(); session.commit(); if (null != message) { System.out.println("收到消息" + message.getString("message--"+i)+";property"+message.getStringProperty("extra"+i)); } else { break; } i++; } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }

    4.注意事项

    1.linux要将解压ActiveMQ文件权限设为777

    5.测试过程

    1.先运行Producer.java 在运行Consumer.java
    转载请注明原文地址: https://ju.6miu.com/read-675742.html

    最新回复(0)