RabbitMQ学习之延时队列

    xiaoxiao2021-04-01  35

    在实际的业务中我们会遇见生产者产生的消息,不立即消费,而是延时一段时间在消费。RabbitMQ本身没有直接支持延迟队列功能,但是我们可以根据其特性Per-Queue Message TTL和 Dead Letter Exchanges实现延时队列。也可以通过改特性设置消息的优先级。

    1.Per-Queue Message TTL RabbitMQ可以针对消息和队列设置TTL(过期时间)。队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。 2.Dead Letter Exchanges 当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。消息变成Dead Letter一向有以下几种情况: 消息被拒绝(basic.reject or basic.nack)并且requeue=false 消息TTL过期 队列达到最大长度 实际上就是设置某个队列的属性,当这个队列中有Dead Letter时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列,publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

    一、在队列上设置TTL

    1.建立delay.exchange

    这里Internal设置为NO,否则将无法接受dead letter,YES表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。

    2.建立延时队列(delay queue)

    如上配置延时5min队列(x-message-ttl=300000)

    x-max-length:最大积压的消息个数,可以根据自己的实际情况设置,超过限制消息不会丢失,会立即转向delay.exchange进行投递

    x-dead-letter-exchange:设置为刚刚配置好的delay.exchange,消息过期后会通过delay.exchange进行投递

    这里不需要配置”dead letter routing key”否则会覆盖掉消息发送时携带的routingkey,导致后面无法路由为刚才配置的delay.exchange

    3.配置延时路由规则

    需要延时的消息到exchange后先路由到指定的延时队列

    1)创建delaysync.exchange通过Routing key将消息路由到延时队列

    2.配置delay.exchange 将消息投递到正常的消费队列

    配置完成。

    下面使用代码测试一下:

    生产者:

    [java] view plain copy print ? package cn.slimsmart.study.rabbitmq.delayqueue.queue;    import java.io.IOException;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;    public class Producer {        private static String queue_name = “test.queue”;        public static void main(String[] args) throws IOException {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost(”10.1.199.169”);          factory.setUsername(”admin”);          factory.setPassword(”123456”);          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          // 声明队列          channel.queueDeclare(queue_name, truefalsefalsenull);          String message = ”hello world!” + System.currentTimeMillis();          channel.basicPublish(”delaysync.exchange”“deal.message”null, message.getBytes());          System.out.println(”sent message: ” + message + “,date:” + System.currentTimeMillis());          // 关闭频道和连接          channel.close();          connection.close();      }  }   package cn.slimsmart.study.rabbitmq.delayqueue.queue; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static String queue_name = "test.queue"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.1.199.169"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(queue_name, true, false, false, null); String message = "hello world!" + System.currentTimeMillis(); channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes()); System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis()); // 关闭频道和连接 channel.close(); connection.close(); } } 消费者:

    [java] view plain copy print ? package cn.slimsmart.study.rabbitmq.delayqueue.queue;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;    public class Consumer {        private static String queue_name = “test.queue”;        public static void main(String[] args) throws Exception {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost(”10.1.199.169”);          factory.setUsername(”admin”);          factory.setPassword(”123456”);          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          // 声明队列          channel.queueDeclare(queue_name, truefalsefalsenull);          QueueingConsumer consumer = new QueueingConsumer(channel);          // 指定消费队列          channel.basicConsume(queue_name, true, consumer);          while (true) {              // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)              QueueingConsumer.Delivery delivery = consumer.nextDelivery();              String message = new String(delivery.getBody());              System.out.println(”received message:” + message + “,date:” + System.currentTimeMillis());          }      }    }   package cn.slimsmart.study.rabbitmq.delayqueue.queue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer { private static String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.1.199.169"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(queue_name, true, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); // 指定消费队列 channel.basicConsume(queue_name, true, consumer); while (true) { // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("received message:" + message + ",date:" + System.currentTimeMillis()); } } } 二、在消息上设置TTL

    实现代码:

    生产者:

    [java] view plain copy print ? package cn.slimsmart.study.rabbitmq.delayqueue.message;    import java.io.IOException;  import java.util.HashMap;    import com.rabbitmq.client.AMQP;  import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;    public class Producer {        private static String queue_name = “message_ttl_queue”;        public static void main(String[] args) throws IOException {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost(”10.1.199.169”);          factory.setUsername(”admin”);          factory.setPassword(”123456”);          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          HashMap<String, Object> arguments = new HashMap<String, Object>();          arguments.put(”x-dead-letter-exchange”“amq.direct”);          arguments.put(”x-dead-letter-routing-key”“message_ttl_routingKey”);          channel.queueDeclare(”delay_queue”truefalsefalse, arguments);            // 声明队列          channel.queueDeclare(queue_name, truefalsefalsenull);          // 绑定路由          channel.queueBind(queue_name, ”amq.direct”“message_ttl_routingKey”);            String message = ”hello world!” + System.currentTimeMillis();          // 设置延时属性          AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();          // 持久性 non-persistent (1) or persistent (2)          AMQP.BasicProperties properties = builder.expiration(”300000”).deliveryMode(2).build();          // routingKey =delay_queue 进行转发          channel.basicPublish(”““delay_queue”, properties, message.getBytes());          System.out.println(”sent message: ” + message + “,date:” + System.currentTimeMillis());          // 关闭频道和连接          channel.close();          connection.close();      }  }   package cn.slimsmart.study.rabbitmq.delayqueue.message; import java.io.IOException; import java.util.HashMap; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static String queue_name = "message_ttl_queue"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.1.199.169"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "amq.direct"); arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey"); channel.queueDeclare("delay_queue", true, false, false, arguments); // 声明队列 channel.queueDeclare(queue_name, true, false, false, null); // 绑定路由 channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey"); String message = "hello world!" + System.currentTimeMillis(); // 设置延时属性 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 持久性 non-persistent (1) or persistent (2) AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build(); // routingKey =delay_queue 进行转发 channel.basicPublish("", "delay_queue", properties, message.getBytes()); System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis()); // 关闭频道和连接 channel.close(); connection.close(); } } 消费者:

    [java] view plain copy print ? package cn.slimsmart.study.rabbitmq.delayqueue.message;    import java.util.HashMap;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;    public class Consumer {        private static String queue_name = “message_ttl_queue”;        public static void main(String[] args) throws Exception {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost(”10.1.199.169”);          factory.setUsername(”admin”);          factory.setPassword(”123456”);          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          HashMap<String, Object> arguments = new HashMap<String, Object>();          arguments.put(”x-dead-letter-exchange”“amq.direct”);          arguments.put(”x-dead-letter-routing-key”“message_ttl_routingKey”);          channel.queueDeclare(”delay_queue”truefalsefalse, arguments);            // 声明队列          channel.queueDeclare(queue_name, truefalsefalsenull);          // 绑定路由          channel.queueBind(queue_name, ”amq.direct”“message_ttl_routingKey”);            QueueingConsumer consumer = new QueueingConsumer(channel);          // 指定消费队列          channel.basicConsume(queue_name, true, consumer);          while (true) {              // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)              QueueingConsumer.Delivery delivery = consumer.nextDelivery();              String message = new String(delivery.getBody());              System.out.println(”received message:” + message + “,date:” + System.currentTimeMillis());          }      }    }   package cn.slimsmart.study.rabbitmq.delayqueue.message; import java.util.HashMap; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer { private static String queue_name = "message_ttl_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.1.199.169"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "amq.direct"); arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey"); channel.queueDeclare("delay_queue", true, false, false, arguments); // 声明队列 channel.queueDeclare(queue_name, true, false, false, null); // 绑定路由 channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey"); QueueingConsumer consumer = new QueueingConsumer(channel); // 指定消费队列 channel.basicConsume(queue_name, true, consumer); while (true) { // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("received message:" + message + ",date:" + System.currentTimeMillis()); } } }

    查看资料:

    http://www.rabbitmq.com/ttl.html  http://www.rabbitmq.com/dlx.html  http://www.rabbitmq.com/maxlength.html https://www.cloudamqp.com/docs/delayed-messages.html 

    转载请注明原文地址: https://ju.6miu.com/read-665654.html

    最新回复(0)