测试环境
windows+Eclipse 下载 http://activemq.apache.org/download-archives.html
下载后 打开c:/tools/apache-activemq-5.9.0/bin/activemq.bat启动服务
添加依赖
<dependency>
<groupId>org.apache.activemq
</groupId>
<artifactId>activemq-all
</artifactId>
<version>5.14.1
</version>
</dependency>
Persistence持久化存储
AMQ Message Store ActiveMQ 5.0 的缺省持久化存储方式。
Kaha Persistence 这是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。
JDBC Persistence 数据库方式:Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL, Oracle, Postgresql, SQLServer, Sybase。
Disable Persistence 不应用持久化存储。
方法解析
生产者创建过程
Created with Raphaël 2.1.0
创建生产者
生产消息
发送给消息队列
消费者消费消息
Created with Raphaël 2.1.0
创建消费者
监听消息队列
获得消息
代码
App.java
package com.whrsmart.ActiveMQ;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
/**
*
*/
public class App
{
public static void main( String[] args )
{
String username =
"admin1";
String password =
"admin1";
String url = ActiveMQConnection.DEFAULT_BROKER_URL;
Producer producer;
try {
producer =
new Producer(url, username, password);
producer.sendMessage(
"Hello World");
Consumer consumer =
new Consumer(url, username, password);
Object msg = consumer.receive();
System.out.println(msg);
producer.close();
consumer.close();
}
catch (JMSException e) {
e.printStackTrace();
}
}
}
Consumer.java
package com.whrsmart.ActiveMQ;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消费者
*
*/
public class Consumer {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址,host:port,如localhost:8161
*/
private String url;
/**
* 队列名
*/
private static final String QUEUE_NAME =
"HELLO";
private ConnectionFactory connectionFactory;
private Session session;
private Destination destination;
private MessageConsumer messageConsumer;
private Connection connection =
null;
public Consumer(String url, String username, String password)
throws JMSException {
this.url = url;
this.username = username;
this.password = password;
this.init();
}
/**
* 初始化方法
* @throws JMSException
*/
private void init()
throws JMSException {
connectionFactory =
new ActiveMQConnectionFactory(
username, password, url);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(
true, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(Consumer.QUEUE_NAME);
messageConsumer = session.createConsumer(destination);
}
/**
* 接收消息
* @throws JMSException
*/
public String
receive()
throws JMSException {
TextMessage msg = (TextMessage) messageConsumer.receive(
5000);
if(msg ==
null) {
return null;
}
return msg.getText();
}
public void close()
throws JMSException {
session.close();
connection.close();
}
}
Producer.java
package com.whrsmart.ActiveMQ;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 生产者
*/
public class Producer implements Cloneable{
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址,host:port,如localhost:8161
*/
private String url;
/**
* 队列名
*/
private static final String QUEUE_NAME =
"HELLO";
private ConnectionFactory connectionFactory;
private Session session;
private Destination destination;
private MessageProducer messageProducer;
private javax.jms.Connection connection =
null;
public Producer(String url, String username, String password)
throws JMSException {
this.url = url;
this.username = username;
this.password = password;
this.init();
}
/**
* 初始化方法
* @throws JMSException
*/
private void init()
throws JMSException {
connectionFactory =
new ActiveMQConnectionFactory(username, password, url);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(
false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(Producer.QUEUE_NAME);
messageProducer = session.createProducer(destination);
}
/**
* 发送消息
* @param data 数据
* @throws JMSException
*/
public void sendMessage(String data)
throws JMSException {
TextMessage msg = session.createTextMessage(data);
this.messageProducer.send(msg);
}
public void close()
throws JMSException {
session.close();
connection.close();
}
}
本文参考: http://www.tuicool.com/articles/EjyqA3R
转载请注明原文地址: https://ju.6miu.com/read-17615.html