ActiveMQ——消息队列基础篇

    xiaoxiao2025-07-27  55

    简介

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    特性

     多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP  完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA 支持通过JDBC和journal提供高速的消息持久化 从设计上保证了高性能的集群,客户端-服务器,点对点 支持Ajax  支持与Axis的整合 可以很容易得调用内嵌JMS provider,进行测试

    JMS简介

         JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。

    JMS服务提供者

         JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。

    消息管理对象

         消息管理对象提供对消息进行操作的API。JMS AP中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

    消息的生产者消费者

         消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue receiver)。

    JMS消息

         息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:消息头(header)、属性(property)和消息体(body)。

    消息标头

    消息标头是消息的信封,包含为使消息到达目的地所需要的所有信息,可以直接控制其

    中一些字段的值,其它值则由JMS提供程序填写。

    JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表

    示消息、设置优先权和失效时间等等,并且为消息确定路由。

     

    JMSDestination: 由Send方法设置。指定消息的目的地,由JMS提供程序填写

    JMSDeliveryMode: 由Send方法设置。提交消息的模式-持续或非持续。发送消息后JMS提供程序填写该字段。

    JMSMessageID: 由Send方法设置。包含消息的唯一标识符。发送过程中由JMS提供程序填写

    JMSTimeStamp: 由Send 方法设置。记录消息被传递给send方法的时间。发送过程中由JMS提供程序填写

    JMSCorrelationID: 由客户端设置。包含用于将消息连接在一起的ID。客户端一般将其置为所引用消息的ID

    JMSReplyTo: 由客户端设置。响应消息的目的地,如果客户端期望得到响应消息,则填写该字段

    JMSRedelivered: 由JMS提供程序设置。指出该消息先前被发送过

    JMSType: 由客户端设置。包含由客户端提供的消息类型标识符。是否需要该字段,不同的提供程序有不同要求

    JMSExpiration: Send 方法设置。一个根据客户端提供的年龄计算出来的值,如果GMT比该过期值晚,则销毁消息

    JMSPriority: Send 方法设置。包含客户端在发送消息时所设置有限级值

    消息属性

    消息属性,用来添加删除消息头以外的附加信息。除了上面的属性,还可以自定义属

    性,以便进行消息的选择 。

    一般通过setXXXProperty方法来定义消息属性,XXX取值为:Boolean、Byte、

    Double、Float、Int、Long、Object、Short及String。

    每一属性均由字符串名字和相关的值组成 ,例如:

    TextMessage msg = tsession.createTextMessage();

    msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);

    String customer = msg.getStringProperty(“CUSTOMER_NAME”);

    其中的”CUSTOMER_NAME”和”MyCustomer”就是消息当中对应的key和value。

    消息主体

    消息主体包含了消息的核心数据。

    JMS 定义了5中消息类型: TextMessage、MapMessage、BytesMessage、

    StreamMessage和ObjectMessage

    选择最合适的消息类型可以使JMS最有效 的处理消息。

     

    TextMessage(文本消息)

    将数据作为简单字符串存放在主体中(XML就可以作为字符串发)

    TextMessage msg = session.createTextMessage();

    msg.setText(text);

    有些厂商支持一种XML专用的消息格式,带来了便利,但是不是标准的JMS类型,影响

    移植性。

    只自己定义了两个方法setText(String s)、getText()

     

    MapMessage(映射消息)

    使用一张映射表来存放其主体内容(参照Jms API)

    MapMessage msg = session.createMapMessage();

    msg.setString(“CUSTOMER_NAME”,”John”);

    msg.setInt(“CUSTOMER_AGE”,12);

    String s = msg.getString(“CUSTOMER_NAME”);

    int age = msg.getInt(“CUSTOMER_AGE”);

     

    BytesMessage(字节消息)

    将字节流存放在消息主体中。适合于下列情况:必须压缩发送的大量数据、需要与现有

    消息格式保持一致等(参照Jms API)

    byte[] data;

    BytesMessage msg = session.createBytesMessage();

    msg.wirte(data);

    byte[] msgData = new byte[256];

    int bytesRead = msg.readBytes(msgData);

     

      StreamMessage(流消息)

    用于处理原语类型。这里也支持属性字段和MapMessage所支持的数据类型。使用这种

    消息格式时,收发双方事先协商好字段的顺序,以保证写读顺序相同(参照Jms API)

    StringMessage msg = session.createStreamMessage();

    msg.writeString(“John”);

    msg.writeInt(12);

    String s = msg.readString();

    Int age = msg.readInt();

    (PS:个人认为有点像socket的信息收发)

     

      ObjectMessage(对象消息)

    用于往消息中写入可序列化的对象。

    消息中可以存放一个对象,如果要存放多个对象,需要建立一个对象集合,然后把这个

    集合写入消息。

    客户端接收到一个ObjectMessage时,是read-only模式。如果一个客户端试图写

    message,将会抛出MessageNotWriteableException。如果调用了clearBody方法,message既可以读又可以写

    自己只单独定义了两个方法:getObject()和setObject(Serializable s)

    ObjectMessage包含的只是object的一个快照,set之后object的修改对ObjectMessage的body无效 (从两个方法可以看出,这种消息已经强制要你实现java.io. Serializable接口)

    Message只读时被set抛出MessageNotWriteableException;

    set和get时,如果对象序列化失败抛出MessageFormatException

    消息的通信方式(点对点通信和发布/订阅方式)

    点对点方式(point-to-point)

    点对点的消息发送方式主要建立在 Message Queue、Sender、Receiver上,

    Message Queue 存贮消息,Sender 发送消息,Receiver接收消息.具体点就是Sender Client发送Message Queue ,而 Receiver Client从Queue中接收消息和"发送消息已接受"到Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。

    发布/订阅方式(publish/subscriber Messaging)

    发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户

    端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

    编程模式

    消息产生者向JMS发送消息的步骤

    创建连接使用的工厂类JMS ConnectionFactory 使用管理对象JMS ConnectionFactory建立连接Connection 使用连接Connection建立会话Session 使用会话Session和管理对象Destination创建消息生产者MessageSender 使用消息生产者MessageSender发送消息

    消息消费者从JMS接受消息的步骤

    创建连接使用的工厂类JMS ConnectionFactory 使用管理对象JMS ConnectionFactory建立连接Connection 使用连接Connection 建立会话Session 使用会话Session和管理对象Destination创建消息消费者MessageReceiver 使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver 消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

    ActiveMQ运行

    ActiveMQ5.3版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于

    监控ActiveMQ的admin应用。运行%activemq_home%bin/目录下的 activemq.bat , 之后你会看见如下一段话表示启动成功。

    打开http://localhost:8161/admin/queues.jsp ,可以查看相应的queue中是否有

    消息,如下截图

      

    简单的例子

    Java代码   import javax.jms.Connection;  import javax.jms.DeliveryMode;  import javax.jms.Destination;  import javax.jms.JMSException;  import javax.jms.MapMessage;  import javax.jms.Message;  import javax.jms.MessageConsumer;  import javax.jms.MessageListener;  import javax.jms.MessageProducer;  import javax.jms.Session;    import org.apache.activemq.ActiveMQConnection;  import org.apache.activemq.ActiveMQConnectionFactory;    public class Test {      public static void main(String[] args) throws JMSException {          new Receiver().beginReceiveMsg();          new Sender().send();      }  }    class Sender {        static String subject = "TEST";      static String user = ActiveMQConnection.DEFAULT_USER;      static String password = ActiveMQConnection.DEFAULT_PASSWORD;      // failover://tcp://localhost:61616      static String url = ActiveMQConnection.DEFAULT_BROKER_URL;      boolean transacted = true;      boolean persistent = true;      int ackMode = Session.AUTO_ACKNOWLEDGE;        Session session;      Destination destination;      MessageProducer producer;        public Sender() throws JMSException {          ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                  user, password, url);          Connection connection = connectionFactory.createConnection();          connection.start();          session = connection.createSession(transacted, ackMode);          destination = session.createQueue(subject);          producer = session.createProducer(destination);          producer.setDeliveryMode(DeliveryMode.PERSISTENT);      }        public void send() throws JMSException {          MapMessage message = session.createMapMessage();          message.setString("test", "test");          message.setStringProperty("RECEIVER_CHAR", " TEST ");          producer.send(message);          session.commit();      }  }    class Receiver implements MessageListener {        static String subject = "TEST";      static String user = ActiveMQConnection.DEFAULT_USER;      static String password = ActiveMQConnection.DEFAULT_PASSWORD;      // failover://tcp://localhost:61616      static String url = ActiveMQConnection.DEFAULT_BROKER_URL;      boolean transacted = true;      int ackMode = Session.AUTO_ACKNOWLEDGE;        Session session;      Destination destination;      MessageConsumer consumer;        public Receiver() throws JMSException {          ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                  user, password, url);          Connection connection = connectionFactory.createConnection();          connection.start();          session = connection.createSession(transacted, ackMode);          destination = session.createQueue(subject);          consumer = session.createConsumer(destination, "RECEIVER_CHAR = TEST");      }        public void beginReceiveMsg() throws JMSException {          consumer.setMessageListener(this);      }        @Override      public void onMessage(Message message) {          if (message instanceof MapMessage) {              MapMessage msg = (MapMessage) message;              try {                  System.out.println(msg.getString("test"));              } catch (JMSException e) {                  try {                      session.rollback();                  } catch (JMSException e1) {                      e1.printStackTrace();                  }                  e.printStackTrace();              } finally {                  if (session != null) {                      try {                          session.close();                      } catch (JMSException e) {                          // TODO Auto-generated catch block                          e.printStackTrace();                      }                  }              }          }      }  }  

    其他产品

    1、开源JMS供应商

    jbossmq(jboss 4) jbossmessaging (jboss 5) joram-4.3.21 2006-09-22 openjms-0.7.7-alpha-3.zip December 26, 2005 mantamq ubermq SomnifugiJMS 2005-7-27

    2、商业JMS供应商

    IBM WebSphere MQ BEA WebLogic JMS Oracle AQ NonStop Server for Java Message Service(JMS) Sun Java System Message Queue Sonic jms TIBCO Enterprise For JMS iLinkMQ (国内) TongLink/Q(北京东方通科技)
    转载请注明原文地址: https://ju.6miu.com/read-1301130.html
    最新回复(0)