springboot10:集成RabbitMQ

    xiaoxiao2021-04-19  81

    有渔YOUYU 2017-04-04 22:59

    RabbitMQ是个非常牛掰的消息中间件,网上也有不少关于AMQP和RabbitMQ的介绍。今天,小编就来讲下怎么在springboot中集成RabbitMQ。我不喜欢废话,喜欢直接上代码:

    1. pom.xml

    <dependency>

    <groupId>org.springframework</groupId>

    <artifactId>spring-messaging</artifactId>

    <version>4.3.7.RELEASE</version>

    </dependency>

    <dependency>

    <groupId>org.springframework</groupId>

    <artifactId>spring-websocket</artifactId>

    <version>4.3.7.RELEASE</version>

    </dependency>

    <dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-amqp</artifactId>

    <version>1.5.2.RELEASE</version>

    </dependency>

    这里,最重要的是spring-boot-starter-amqp这个package。

    2. RabbitConfig.java

    @Configuration

    public class RabbitConfig {

    public static final String EXCHANGE = "my-mq-exchange";

    public static final String ROUTINGKEY1 = "queue_one_key1";

    public static final String ROUTINGKEY2 = "queue_one_key2";

    @Bean

    public ConnectionFactory connectionFactory() {

    CachingConnectionFactory connectionFactory =

    new CachingConnectionFactory("127.0.0.1",5672);

    connectionFactory.setUsername("guest");

    connectionFactory.setPassword("guest");

    connectionFactory.setVirtualHost("/");

    connectionFactory.setPublisherConfirms(true);

    return connectionFactory;

    }

    @Bean

    public DirectExchange defaultExchange() {

    return new DirectExchange(EXCHANGE, true, false);

    }

    @Bean

    public Queue queue() {

    return new Queue("hello", true);

    }

    @Bean

    public Binding binding() {

    return BindingBuilder.bind(queue()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY1); }

    @Bean

    public Queue queue1() {

    return new Queue("queue_one1", true);

    }

    @Bean

    public Binding binding1() {

    return BindingBuilder.bind(queue1()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY2);

    }

    @Bean

    public SimpleMessageListenerContainer messageContainer() {

    SimpleMessageListenerContainer container =

    new SimpleMessageListenerContainer(connectionFactory());

    container.setQueues(queue());

    container.setExposeListenerChannel(true);

    container.setMaxConcurrentConsumers(1);

    container.setConcurrentConsumers(1);

    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

    container.setMessageListener(new ChannelAwareMessageListener() {

    public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {

    byte[] body = message.getBody();

    System.out.println("Receive MSG : " + new String(body));

    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }

    });

    return container;

    }

    @Bean

    public SimpleMessageListenerContainer messageContainer2() {

    SimpleMessageListenerContainer container =

    new SimpleMessageListenerContainer(connectionFactory());

    container.setQueues(queue1());

    container.setExposeListenerChannel(true);

    container.setMaxConcurrentConsumers(1);

    container.setConcurrentConsumers(1);

    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

    container.setMessageListener(new ChannelAwareMessageListener() {

    public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {

    byte[] body = message.getBody();

    System.out.println("queue1 Receive MSG : " + new String(body));

    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }

    });

    return container;

    }

    }

    3. Controller.java

    @RestController

    public class SendController implements RabbitTemplate.ConfirmCallback{

    private RabbitTemplate rabbitTemplate;

    public SendController(RabbitTemplate rabbitTemplate){

    this.rabbitTemplate = rabbitTemplate;

    this.rabbitTemplate.setConfirmCallback(this);

    }

    @RequestMapping("send1")

    public String send1(String msg){

    String uuid = UUID.randomUUID().toString();

    CorrelationData correlationId = new CorrelationData(uuid);

    rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, RabbitConfig.ROUTINGKEY1, msg,

    correlationId);

    return null;

    }

    @RequestMapping("send2")

    public String send2(String msg){

    String uuid = UUID.randomUUID().toString();

    CorrelationData correlationId = new CorrelationData(uuid);

    rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, RabbitConfig.ROUTINGKEY2, msg,

    correlationId);

    return null;

    }

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

    System.out.println(" CallBack id:" + correlationData);

    if (ack) {

    System.out.println("MSG Consumed Successfully");

    } else {

    System.out.println("MSG Consumed Failed:" + cause+"\n Please send again");

    }

    }

    }

    4. 测试

    打开postman,输入:http://localhost:8080/send2?msg=bbbcccddd

    图1 rabbitMq消息消费成功

    转载请注明原文地址: https://ju.6miu.com/read-675777.html

    最新回复(0)