转自:http://www.cnblogs.com/xwdreamer/archive/2012/02/21/2360818.html
1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/
2.运行ActiveMQ
windows:解压缩apache-activemq-5.14.4-bin.zip,然后双击apache-activemq-5.14.4\bin\win64(32)\activemq.bat运行ActiveMQ程序。
linux:解压缩apache-activemq-5.14.4-bin.tar.gz,然后进入apache-activemq-5.14.4\bin\linux-x86-64(32)目录下./activemq start,运行ActiveMQ程序。
启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。
3.创建Eclipse项目并运行
创建project:myActiveMQ,并导入activemq-all-5.14.4.jar
Producer.java
package com.test.activemq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息生产者
* @author
*
*/
public class Producer{
static final String HOST = "tcp://localhost:61616";
//static final String HOST = ActiveMQConnection.DEFAULT_BROKER_URL;
static final String USER = ActiveMQConnection.DEFAULT_USER;
static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
static final String QUEUE = "FirstQueue";//必须在activeQM里面有配置
public static void main(String[] args) {
//jms 连接工厂
ConnectionFactory connectionFactory = null;
//connection 生产者和消费者的连接对象
Connection connection = null;
//session 一个发送或接收消息的连接操作
Session session = null;
//destination 消息目的地
Destination destination = null;
//MessageProducer 消息发送者
MessageProducer messageProducer = null;
// user password host
connectionFactory = new ActiveMQConnectionFactory(USER,PASSWORD,HOST);
try {
//获得连接对象
connection = connectionFactory.createConnection();
//启动
connection.start();
//连接操作
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//创建目的地
destination = session.createQueue(QUEUE);
//创建生产者
messageProducer = session.createProducer(destination);
// 设置不持久化
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
//发送消息
public static void sendMessage(Session session, MessageProducer producer){
try {
for (int i = 0; i < 10; i++) {
// TextMessage message = session.createTextMessage("activemq发送的消息"+i);
// //发送消息到目的地
// System.out.println("发送消息:activemq发送的消息"+i);
// producer.send(message);
MapMessage message = session.createMapMessage();
message.setStringProperty("extra"+i, "extra map"+i);
message.setString("message--"+i,"message map"+i);
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer.java
package com.test.activemq.queue;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer{
static final String HOST = "tcp://localhost:61616";
//static final String HOST = ActiveMQConnection.DEFAULT_BROKER_URL;
static final String USER = ActiveMQConnection.DEFAULT_USER;
static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
static final String QUEUE = "FirstQueue";//必须在activeQM里面有配置
public static void main(String[] args) {
//jms 连接工厂
ConnectionFactory connectionFactory = null;
//connection 生产者和消费者的连接对象
Connection connection = null;
//session 一个发送或接收消息的连接操作
Session session = null;
//destination 消息目的地
Destination destination = null;
//消息接收者
MessageConsumer messageConsumer = null;
//获取工厂类
connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, HOST);
try {
//获取连接的对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(QUEUE);
messageConsumer = session.createConsumer(destination);
Enumeration names = connection.getMetaData().getJMSXPropertyNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
System.out.println("属性"+name);
}
int i = 0;
while (i<10) {
// //设置接收者接收消息的时间,为了便于测试,这里谁定为100s
// TextMessage message = (TextMessage) messageConsumer.receive(100000);
// if (null != message) {
// System.out.println("收到消息" + message.getText());
// } else {
// break;
// }
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
MapMessage message = (MapMessage) messageConsumer.receive();
session.commit();
if (null != message) {
System.out.println("收到消息" + message.getString("message--"+i)+";property"+message.getStringProperty("extra"+i));
} else {
break;
}
i++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
4.注意事项
1.linux要将解压ActiveMQ文件权限设为777
5.测试过程
1.先运行Producer.java 在运行Consumer.java
转载请注明原文地址: https://ju.6miu.com/read-675742.html