基于RabbitMQ 3.6.5, Erlang 19.0实现,客户端用spring4.2
什么是延迟队列
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
我的应用场景是队列接收消息后,进行网络文件下载,如果下载失败,则将消息重新放入队列,延时后再消费。
AMQP和RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。 但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列: Time To Live(TTL)
RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
A: 通过队列属性设置,队列中所有消息都有相同的过期时间。B: 对消息进行单独设置,每条消息TTL可以不同。如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter
Dead Letter Exchanges(DLX)RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchangex-dead-letter-routing-key:指定routing-key发送队列出现dead letter的情况有:
消息或者队列的TTL过期
队列达到最大长度
消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。
<description>rabbitmq 连接服务配置</description> <!-- 连接配置 --> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}" /> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue name="test_queue" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="test_delay_queue" durable="true" auto-delete="false" exclusive="false"> <rabbit:queue-arguments> <entry key="x-message-ttl"> <value type="java.lang.Long">60000</value> </entry> <entry key="x-dead-letter-exchange" value="amqpExchange"/> </rabbit:queue-arguments> </rabbit:queue> <!-- 消息对象json转换类 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- rabbit:direct-exchange:定义exchange模式为direct, 意思就是消息与一个特定的路由键完全匹配,才会转发。 rabbit:binding:设置消息queue匹配的key --> <rabbit:fanout-exchange name="amqpDelayExchange" durable="true" auto-delete="false" id="amqpDelayExchange"> <rabbit:bindings> <rabbit:binding queue="test_delay_queue"/> </rabbit:bindings> </rabbit:fanout-exchange> <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange"> <rabbit:bindings> <rabbit:binding queue="test_queue" key="test" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring template声明 --> <rabbit:template exchange="amqpExchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 --> <!-- 因为需要在启动之前加载一些信息,所以这里设置了auto-startup为false。在名为Startup的servlet里启动 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" auto-startup="false"> <rabbit:listener queues="test_queue" ref="testConsumer" /> </rabbit:listener-container> <!-- 测试队列 --> <bean id="testConsumer" class="cn.jointwisdom.vuniversity.rabbitmq.consumer.TestConsumer"> </bean> 网上好多用topic实现的,但是我没有复现,同时我还在看到可以用x-dead-letter-routing-key参数转发到指定key,但是我尝试了一下,报如下错误 PRECONDITION_FAILED - invalid arg 'x-dead-letter-routing-key' for queue 'MyQueue' in vhost '/': routing_key_but_no_dlx_defined 以下是文档的说法,大概是说,不能设置为默认的队列,这样可能会形成一个环, It is possible to form a cycle of dead-letter queues. For instance, this can happen when a queue dead-letters messages to the default exchange without specifiying a dead-letter routing key. Messages in such cycles (i.e. messages that reach the same queue twice) will be dropped. 上面的错误也可能是我设置不当引起的,如果谁有解决方法,可以告诉我。 其它队列的生产者和消息者是正常的,只有test_delay_queue这个队列的生产者有点不一样(这个队列不需要消费者) public class TestDelayProducer { private static final Logger log = LogManager.getLogger(TestDelayProducer.class); @Resource private AmqpTemplate amqpTemplate; public void send(Object object) { String key = "test";//这个地方是在消息到期后,你要转发到的队列的key if (key != null) { try { amqpTemplate.convertAndSend("amqpDelayExchange", key, object);//amqpDelayExchange是绑定的死信消息队列的exchange的名字 log.debug("队列消息发送成功!Key=" + key + ", Msg=" + object); } catch (Exception e) { throw new RuntimeException(e); } } else { throw new NullPointerException("消息队列的KEY值不能为空"); } } } 注意:1 ttl设置之后,下次修改时间,会报错,这时候,需要先删除该队列,重启项目。否则会报错。 部分文字来源于网络