下面我们通过一个实例看一下rabbit的使用。
1.实现一个消息监听器ReceiveMessageListener.Java
[java] view plain copy print ? package org.springframework.amqp.core; /** * Listener interface to receive asynchronous delivery of Amqp Messages. * * @author Mark Pollack */ public interface MessageListener { void onMessage(Message message); } 2.消费者配置Consumer.xml
[html] view plain copy print ? <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 连接服务配置 --> <rabbit:connection-factory id="connectionFactory" host="192.168.36.102" username="admin" password="admin" port="5672" virtual-host="/" channel-cache-size="5" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 队列声明--> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/> </rabbit:bindings> </rabbit:direct-exchange> <bean id="receiveMessageListener" class="cn.slimsmart.rabbitmq.demo.spring.tag.ReceiveMessageListener" /> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" > <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" /> </rabbit:listener-container> </beans> 3.生产者配置Producer.xml
[html] view plain copy print ? <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 连接服务配置 --> <rabbit:connection-factory id="connectionFactory" host="192.168.36.102" username="admin" password="admin" port="5672" virtual-host="/" channel-cache-size="5" /> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 队列声明 --> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag" /> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 --> <bean id="jsonMessageConverter" class="cn.slimsmart.rabbitmq.demo.spring.tag.Gson2JsonMessageConverter" /> <!-- spring template声明 --> <rabbit:template id="amqpTemplate" exchange="spring.queue.exchange" routing-key="spring.queue.tag.key" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> </beans> 4.消费者启动类ConsumerMain.java
[java] view plain copy print ? package cn.slimsmart.rabbitmq.demo.spring.tag; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ConsumerMain { public static void main(String[] args) { new ClassPathXmlApplicationContext("Consumer.xml"); } } 5.生产者启动类ProducerMain.java
[java] view plain copy print ? package cn.slimsmart.rabbitmq.demo.spring.tag; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ProducerMain { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("Producer.xml"); AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class); User user = new User(); user.setName("niuniu"); amqpTemplate.convertAndSend(user); } } 先启动消费者,监听接收消息,再启动生产者发送消息。
输出: data :{"name":"niuniu"}
如下4中转发器类型标签
rabbit:fanout-exchange
rabbit:direct-exchange
rabbit:topic-exchange
rabbit:headers-exchange
RabbitMQ已经实现了Jackson的消息转换(Jackson2JsonMessageConverter),由于考虑到效率,如下使用Gson实现消息转换。
如下消息的转换类的接口MessageConverter,Jackson2JsonMessageConverter的父类AbstractJsonMessageConverter针对json转换的基类。
我们实现Gson2JsonMessageConverter转换类也继承AbstractJsonMessageConverter。
引入Gson的pom
[html] view plain copy print ? <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.3</version> </dependency> 转换类实现如下:
[java] view plain copy print ? package cn.slimsmart.rabbitmq.demo.spring.tag; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractJsonMessageConverter; import org.springframework.amqp.support.converter.ClassMapper; import org.springframework.amqp.support.converter.DefaultClassMapper; import org.springframework.amqp.support.converter.MessageConversionException; import com.google.gson.Gson; public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter { private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class); private static ClassMapper classMapper = new DefaultClassMapper(); private static Gson gson = new Gson(); public Gson2JsonMessageConverter() { super(); } @Override protected Message createMessage(Object object, MessageProperties messageProperties) { byte[] bytes = null; try { String jsonString = gson.toJson(object); bytes = jsonString.getBytes(getDefaultCharset()); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(getDefaultCharset()); if (bytes != null) { messageProperties.setContentLength(bytes.length); } classMapper.fromClass(object.getClass(),messageProperties); return new Message(bytes, messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); if (contentType != null && contentType.contains("json")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = getDefaultCharset(); } try { Class<?> targetClass = getClassMapper().toClass( message.getMessageProperties()); content = convertBytesToObject(message.getBody(), encoding, targetClass); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } } else { log.warn("Could not convert incoming message with content-type [" + contentType + "]"); } } if (content == null) { content = message.getBody(); } return content; } private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws UnsupportedEncodingException { String contentAsString = new String(body, encoding); return gson.fromJson(contentAsString, clazz); } }
