7.rabbitmq之GSON

    xiaoxiao2023-03-25  5

    下面我们通过一个实例看一下rabbit的使用。

    1.实现一个消息监听器ReceiveMessageListener.Java

    [java]  view plain  copy  print ? package org.springframework.amqp.core;      /**   * Listener interface to receive asynchronous delivery of Amqp Messages.   *   * @author Mark Pollack   */   public interface MessageListener {          void onMessage(Message message);      }   2.消费者配置Consumer.xml

    [html]  view plain  copy  print ? <?xml version="1.0" encoding="UTF-8"?>   <beans xmlns="http://www.springframework.org/schema/beans"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xmlns:context="http://www.springframework.org/schema/context"          xmlns:rabbit="http://www.springframework.org/schema/rabbit"          xsi:schemaLocation="               http://www.springframework.org/schema/beans                   http://www.springframework.org/schema/beans/spring-beans.xsd               http://www.springframework.org/schema/context                   http://www.springframework.org/schema/context/spring-context.xsd               http://www.springframework.org/schema/rabbit                   http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">          <!-- 连接服务配置  -->       <rabbit:connection-factory id="connectionFactory" host="192.168.36.102" username="admin"           password="admin" port="5672" virtual-host="/"  channel-cache-size="5" />                  <rabbit:admin connection-factory="connectionFactory"/>            <!-- queue 队列声明-->      <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/>                    <!-- exchange queue binging key 绑定 -->       <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false">           <rabbit:bindings>               <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/>           </rabbit:bindings>       </rabbit:direct-exchange>              <bean id="receiveMessageListener"           class="cn.slimsmart.rabbitmq.demo.spring.tag.ReceiveMessageListener" />               <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->       <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >           <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" />       </rabbit:listener-container>   </beans>   3.生产者配置Producer.xml

    [html]  view plain  copy  print ? <?xml version="1.0" encoding="UTF-8"?>   <beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"       xmlns:rabbit="http://www.springframework.org/schema/rabbit"       xsi:schemaLocation="               http://www.springframework.org/schema/beans                   http://www.springframework.org/schema/beans/spring-beans.xsd               http://www.springframework.org/schema/context                   http://www.springframework.org/schema/context/spring-context.xsd               http://www.springframework.org/schema/rabbit                   http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">          <!-- 连接服务配置 -->       <rabbit:connection-factory id="connectionFactory"           host="192.168.36.102" username="admin" password="admin" port="5672"           virtual-host="/" channel-cache-size="5" />          <rabbit:admin connection-factory="connectionFactory" />          <!-- queue 队列声明 -->       <rabbit:queue  durable="true"           auto-delete="false" exclusive="false" name="spring.queue.tag" />             <!-- exchange queue binging key 绑定 -->       <rabbit:direct-exchange name="spring.queue.exchange"           durable="true" auto-delete="false">           <rabbit:bindings>               <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" />           </rabbit:bindings>       </rabbit:direct-exchange>          <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 -->       <bean id="jsonMessageConverter"           class="cn.slimsmart.rabbitmq.demo.spring.tag.Gson2JsonMessageConverter" />          <!-- spring template声明 -->       <rabbit:template id="amqpTemplate" exchange="spring.queue.exchange"  routing-key="spring.queue.tag.key"           connection-factory="connectionFactory" message-converter="jsonMessageConverter" />   </beans>   4.消费者启动类ConsumerMain.java

    [java]  view plain  copy  print ? package cn.slimsmart.rabbitmq.demo.spring.tag;      import org.springframework.context.support.ClassPathXmlApplicationContext;      public class ConsumerMain {          public static void main(String[] args) {           new ClassPathXmlApplicationContext("Consumer.xml");         }   }   5.生产者启动类ProducerMain.java

    [java]  view plain  copy  print ? package cn.slimsmart.rabbitmq.demo.spring.tag;      import org.springframework.amqp.core.AmqpTemplate;   import org.springframework.amqp.rabbit.core.RabbitTemplate;   import org.springframework.context.ApplicationContext;   import org.springframework.context.support.ClassPathXmlApplicationContext;      public class ProducerMain {              public static void main(String[] args) {           ApplicationContext context = new ClassPathXmlApplicationContext("Producer.xml");             AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class);              User user = new User();           user.setName("niuniu");           amqpTemplate.convertAndSend(user);       }   }   先启动消费者,监听接收消息,再启动生产者发送消息。

    输出: data :{"name":"niuniu"}

    如下4中转发器类型标签

    rabbit:fanout-exchange

    rabbit:direct-exchange

    rabbit:topic-exchange

    rabbit:headers-exchange

    RabbitMQ已经实现了Jackson的消息转换(Jackson2JsonMessageConverter),由于考虑到效率,如下使用Gson实现消息转换。

    如下消息的转换类的接口MessageConverter,Jackson2JsonMessageConverter的父类AbstractJsonMessageConverter针对json转换的基类。

    我们实现Gson2JsonMessageConverter转换类也继承AbstractJsonMessageConverter。

    引入Gson的pom

    [html]  view plain  copy  print ? <dependency>               <groupId>com.google.code.gson</groupId>               <artifactId>gson</artifactId>               <version>2.3</version>           </dependency>   转换类实现如下:

    [java]  view plain  copy  print ? package cn.slimsmart.rabbitmq.demo.spring.tag;      import java.io.IOException;   import java.io.UnsupportedEncodingException;      import org.apache.commons.logging.Log;   import org.apache.commons.logging.LogFactory;   import org.springframework.amqp.core.Message;   import org.springframework.amqp.core.MessageProperties;   import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;   import org.springframework.amqp.support.converter.ClassMapper;   import org.springframework.amqp.support.converter.DefaultClassMapper;   import org.springframework.amqp.support.converter.MessageConversionException;      import com.google.gson.Gson;      public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {              private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);              private static  ClassMapper classMapper =  new DefaultClassMapper();          private static Gson gson = new Gson();          public Gson2JsonMessageConverter() {           super();       }          @Override       protected Message createMessage(Object object,               MessageProperties messageProperties) {           byte[] bytes = null;           try {               String jsonString = gson.toJson(object);               bytes = jsonString.getBytes(getDefaultCharset());           }           catch (IOException e) {               throw new MessageConversionException(                       "Failed to convert Message content", e);           }           messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);           messageProperties.setContentEncoding(getDefaultCharset());           if (bytes != null) {               messageProperties.setContentLength(bytes.length);           }           classMapper.fromClass(object.getClass(),messageProperties);           return new Message(bytes, messageProperties);       }          @Override       public Object fromMessage(Message message)               throws MessageConversionException {           Object content = null;           MessageProperties properties = message.getMessageProperties();           if (properties != null) {               String contentType = properties.getContentType();               if (contentType != null && contentType.contains("json")) {                   String encoding = properties.getContentEncoding();                   if (encoding == null) {                       encoding = getDefaultCharset();                   }                   try {                           Class<?> targetClass = getClassMapper().toClass(                                   message.getMessageProperties());                           content = convertBytesToObject(message.getBody(),                                   encoding, targetClass);                   }                   catch (IOException e) {                       throw new MessageConversionException(                               "Failed to convert Message content", e);                   }               }               else {                   log.warn("Could not convert incoming message with content-type ["                           + contentType + "]");               }           }           if (content == null) {               content = message.getBody();           }           return content;       }          private Object convertBytesToObject(byte[] body, String encoding,               Class<?> clazz) throws UnsupportedEncodingException {           String contentAsString = new String(body, encoding);           return gson.fromJson(contentAsString, clazz);       }   }  
    转载请注明原文地址: https://ju.6miu.com/read-1203498.html
    最新回复(0)