RabbitMQ 发布和订阅

    xiaoxiao2021-03-25  106

    发布和订阅:一个消息转发给多个消费者;

    生产者并不是将消息直接发送给消息队列,而是发送给交换机,由交换机根据规则发送给指定的消息队列或丢弃。

    临时队列:当没有消费者链接时候,队列的消息会被丢弃,队列会被自动删除。

    消息发布者:

    package pub.exch; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class SendLog { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义交换机,第一个参数为交换机名称,第二个参数为交换机类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg="hello word"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println(" [x] Sent '"+msg+"'"); channel.close(); connection.close(); } } 消息订阅者1

    package pub.exch; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class ReqvLog { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //获取自动创建的消息队列(临时队列)名称 String queueName = channel.queueDeclare().getQueue(); //绑定临时消息队列和交换机 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true,consumer); while(true){ QueueingConsumer.Delivery delivery=consumer.nextDelivery(); String message=new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } 消息订阅者2

    package pub.exch; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class ReqvLog1 { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //获取自动创建的消息队列(临时队列)名称 String queueName = channel.queueDeclare().getQueue(); //绑定临时消息队列和交换机 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true,consumer); while(true){ QueueingConsumer.Delivery delivery=consumer.nextDelivery(); String message=new String(delivery.getBody()); System.out.println(" [x1] Received '" + message + "'"); } } }

    转载请注明原文地址: https://ju.6miu.com/read-11038.html

    最新回复(0)