RabbitMQ是个非常牛掰的消息中间件,网上也有不少关于AMQP和RabbitMQ的介绍。今天,小编就来讲下怎么在springboot中集成RabbitMQ。我不喜欢废话,喜欢直接上代码:
1. pom.xml
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>4.3.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>4.3.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
这里,最重要的是spring-boot-starter-amqp这个package。
2. RabbitConfig.java
@Configuration
public class RabbitConfig {
public static final String EXCHANGE = "my-mq-exchange";
public static final String ROUTINGKEY1 = "queue_one_key1";
public static final String ROUTINGKEY2 = "queue_one_key2";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("127.0.0.1",5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
@Bean
public Queue queue() {
return new Queue("hello", true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY1); }
@Bean
public Queue queue1() {
return new Queue("queue_one1", true);
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY2);
}
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("Receive MSG : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
@Bean
public SimpleMessageListenerContainer messageContainer2() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue1());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("queue1 Receive MSG : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
}
3. Controller.java
@RestController
public class SendController implements RabbitTemplate.ConfirmCallback{
private RabbitTemplate rabbitTemplate;
public SendController(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
@RequestMapping("send1")
public String send1(String msg){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, RabbitConfig.ROUTINGKEY1, msg,
correlationId);
return null;
}
@RequestMapping("send2")
public String send2(String msg){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, RabbitConfig.ROUTINGKEY2, msg,
correlationId);
return null;
}
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(" CallBack id:" + correlationData);
if (ack) {
System.out.println("MSG Consumed Successfully");
} else {
System.out.println("MSG Consumed Failed:" + cause+"\n Please send again");
}
}
}
4. 测试
打开postman,输入:http://localhost:8080/send2?msg=bbbcccddd
图1 rabbitMq消息消费成功