RabbitMQ 路由

    xiaoxiao2021-03-25  146

    我们定义交换机的时候,若指定类型为fanout,会把消息发到全部与之绑定的队列中,当为direct时候,可以定义一个关键字(消息类型),使得不同类型的消息,到与该类型匹配的队列中。

    日志发布者:

    package ly; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class SendLog { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //广播消息到符合条件的队列 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String msg="hello word!"; String msgType="info"; //发送error类型的消息,消费者绑定相同类型才可以收到该消息,消息类型随便定义 channel.basicPublish(EXCHANGE_NAME, msgType, null, msg.getBytes()); channel.basicPublish(EXCHANGE_NAME, "abc", null, "hello1".getBytes()); //发送abc类型消息 System.out.println(" [x] Sent '" + msgType + "':'" + msg + "'"); channel.close(); connection.close(); } } 日志订阅者1

    package ly; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class RecvLog { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); //第三个参数要与发送的消息类型一致,要想绑定多个可以继续queueBind需要的类型,该消费者只接受error info类型消息 channel.queueBind(queueName, EXCHANGE_NAME, "error"); channel.queueBind(queueName, EXCHANGE_NAME, "info"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } } 日志订阅者2

    package ly; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class RecvLog1 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); //第三个参数要与发送的消息类型一致,要想绑定多个可以继续queueBind需要的类型,该消费者只接受error info类型消息 channel.queueBind(queueName, EXCHANGE_NAME, "abc"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x1] Received '" + routingKey + "':'" + message + "'"); } } }

    转载请注明原文地址: https://ju.6miu.com/read-13645.html

    最新回复(0)