在配置完RabbitMQ后,我们可以在eclipes中写一个简单的demo测试下。 下面就是路由模式的demo 基本的配置信息写在ConnectionUtil中,这里就不举例了
发送消息
package com.rabbitmq.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtil; public class Send { private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange,direct模式 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息内容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }消费者1
package com.rabbitmq.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.util.ConnectionUtil; public class Recv { private final static String QUEUE_NAME = "test_queue_direct_work"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }8.11.4消费者2
package com.rabbitmq.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.util.ConnectionUtil; public class Recv2 { private final static String QUEUE_NAME = "test_queue_direct_work2"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }注意:在绑定时,设置路由KEY。 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “key2”); 绑定结果 可以看到定义了一个路由KEY。
后台系统–消息的生产者
增加MQ依赖
<!-- RabbitMQ --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>配置文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 定义RabbitMQ的连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.vhost}" /> <!-- 定义Rabbit模板,指定连接工厂以及定义exchange --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="itemDirectExchange" /> <!-- MQ的管理,包括队列、交换器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定义交换器,自动声明,持久化 --> <rabbit:direct-exchange name="itemDirectExchange" auto-declare="true" durable="true"> </rabbit:direct-exchange> </beans>属性文件配置信息rabbitmq.properties
rabbit.ip=127.0.0.1 rabbit.port=5672 rabbit.username=jtmqadmin rabbit.password=123456 rabbit.vhost=jtmq在spring的配置文件中加载rabbitmq.properties
<!-- 配置资源文件 --> <property name="locations"> <list> <value>classpath:jdbc.properties</value> <value>classpath:env.properties</value> <value>classpath:redis.properties</value> <value>classpath:httpclient.properties</value> <value>classpath:rabbitmq.properties</value> </list> </property>后台系统-商品更新发送消息
@Autowired private RabbitTemplate rabbitTemplate; …… //发送更新消息到RabbitMQ中("路由模式的key","要传输的内容") rabbitTemplate.convertAndSend("jt_item_update", item.getId());前台系统—接受消息 增加MQ依赖
<!-- RabbitMQ --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>前台系统-和spring框架整合
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 定义RabbitMQ的连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.vhost}" /> <!-- MQ的管理,包括队列、交换器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定义消息队列 --> <rabbit:queue name="jt-web.itemQueue" auto-declare="true"/> <!-- 定义交换机,并且完成队列和交换机的绑定 --> <rabbit:direct-exchange name="itemDirectExchange" auto-declare="true"> <rabbit:bindings> <!-- 前台系统只接收商品更新的消息,这里的key要去后台传递过来的key相对应 --> <rabbit:binding queue="jt-web.itemQueue" key="jt_item_update"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 定义监听 ,等会需要创建一个RabbitItemService的Service类,里面有方法updateItem, 当有消息时,会根据配置调用RabbitItemService的updataItem方法。并将参数传入。注意:和后台 系统配置时的区别,后台中并未绑定消息队列到交换机上,这里根据key进行了绑定。并通过指定的service 的方法来侦听消息。 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="rabbitItemService" method="updateItem" queue-names="jt-web.itemQueue"/> </rabbit:listener-container> </beans>创建一个RabbitItemService
package com.jt.web.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RabbitItemService { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitItemService.class); @Autowired private ItemService itemService; public void updateItem(Long itemId) { LOGGER.info("接受到MQ的消息,内容为:{}", itemId); this.itemService.updateRedis(itemId); } }