分布式服务框架学习笔记8 ActiveMQ入门使用

    xiaoxiao2021-03-25  101

    测试环境

    windows+Eclipse 下载 http://activemq.apache.org/download-archives.html

    下载后 打开c:/tools/apache-activemq-5.9.0/bin/activemq.bat启动服务

    添加依赖

    <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.1</version> </dependency>

    Persistence持久化存储

    AMQ Message Store ActiveMQ 5.0 的缺省持久化存储方式。

    Kaha Persistence 这是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。

    JDBC Persistence 数据库方式:Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL, Oracle, Postgresql, SQLServer, Sybase。

    Disable Persistence 不应用持久化存储。

    方法解析

    生产者创建过程

    Created with Raphaël 2.1.0 创建生产者 生产消息 发送给消息队列

    消费者消费消息

    Created with Raphaël 2.1.0 创建消费者 监听消息队列 获得消息

    代码

    App.java

    package com.whrsmart.ActiveMQ; import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnection; /** * */ public class App { public static void main( String[] args ) { String username = "admin1"; String password = "admin1"; String url = ActiveMQConnection.DEFAULT_BROKER_URL; // 创建生产者 Producer producer; try { producer = new Producer(url, username, password); // 生产者产生一条消息 producer.sendMessage("Hello World"); // 创建消费者 Consumer consumer = new Consumer(url, username, password); // 消费者读取一条消息 Object msg = consumer.receive(); // 输出消息内容 System.out.println(msg); producer.close(); consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } }

    Consumer.java

    package com.whrsmart.ActiveMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消费者 * */ public class Consumer { /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * 连接地址,host:port,如localhost:8161 */ private String url; /** * 队列名 */ private static final String QUEUE_NAME = "HELLO"; //连接工厂 private ConnectionFactory connectionFactory; //会话 接受或者发送消息的线程 private Session session; //消息的目的地 private Destination destination; //消息消费者 private MessageConsumer messageConsumer; //与MQ的连接 private Connection connection = null; public Consumer(String url, String username, String password) throws JMSException { this.url = url; this.username = username; this.password = password; this.init(); } /** * 初始化方法 * @throws JMSException */ private void init() throws JMSException { // 实例化工厂 connectionFactory = new ActiveMQConnectionFactory( username, password, url); // 获取连接 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建指定队列名的队列 destination = session.createQueue(Consumer.QUEUE_NAME); // 创建消息消费者 messageConsumer = session.createConsumer(destination); } /** * 接收消息 * @throws JMSException */ public String receive() throws JMSException { TextMessage msg = (TextMessage) messageConsumer.receive(5000); if(msg == null) { return null; } return msg.getText(); } public void close() throws JMSException { session.close(); connection.close(); } }

    Producer.java

    package com.whrsmart.ActiveMQ; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 生产者 */ public class Producer implements Cloneable{ /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * 连接地址,host:port,如localhost:8161 */ private String url; /** * 队列名 */ private static final String QUEUE_NAME = "HELLO"; //连接工厂 private ConnectionFactory connectionFactory; //会话 接受或者发送消息的线程 private Session session; //消息的目的地 private Destination destination; //消息生产者 private MessageProducer messageProducer; //与MQ的连接 private javax.jms.Connection connection = null; public Producer(String url, String username, String password) throws JMSException { this.url = url; this.username = username; this.password = password; this.init(); } /** * 初始化方法 * @throws JMSException */ private void init() throws JMSException { // 实例化工厂 connectionFactory = new ActiveMQConnectionFactory(username, password, url); // 获取连接 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建指定队列名的队列 destination = session.createQueue(Producer.QUEUE_NAME); // 创建消息生产者 messageProducer = session.createProducer(destination); } /** * 发送消息 * @param data 数据 * @throws JMSException */ public void sendMessage(String data) throws JMSException { // 创建一个Message TextMessage msg = session.createTextMessage(data); // 通过消息发送者进行消息发送 this.messageProducer.send(msg); } public void close() throws JMSException { session.close(); connection.close(); } }

    本文参考: http://www.tuicool.com/articles/EjyqA3R

    转载请注明原文地址: https://ju.6miu.com/read-17615.html

    最新回复(0)