前几篇博文简单介绍了rabbitmq的基础配置以及使用,今天就给大家介绍下转换器的概念。其实,生产者发消息不是直接发给队列的,而是发给转换器,再有转换器决定进入到哪一个队列或者被丢弃。在之前的代码没有指定转换器,这里其实用了rabbitmq默认的转换器,也就是direct方式,直接根据队列的名称这个路由key知道对应的队列。 下面,笔者就一一介绍下fanout、direct以及topic这三种类型转换器的使用。
这种类型是一种发布和订阅的模式,也就是生产者发送到exchange的消息会被所有的消费者接受处理。 下面就举个例子,生产者负责发布消息,然后有两类消费者,一类负责打印消息到控制台,一类负责打印到日志文件。代码如下: 生产者FanoutProducer:
/** * Project Name:qyk_testJava * File Name:FanoutProducer.java * Package Name:com.qiyongkang.mq.rabbitMq.exchange.fanout * Date:2017年3月6日下午6:10:22 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved. * */ package com.qiyongkang.mq.rabbitMq.exchange.fanout; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * ClassName:FanoutProducer <br/> * Function: TODO ADD FUNCTION. <br/> * Reason: TODO ADD REASON. <br/> * Date: 2017年3月6日 下午6:10:22 <br/> * * @author qiyongkang * @version * @since JDK 1.6 * @see */ public class FanoutProducer { private final static String EXCHANGE_NAME = "qyk_ex_log"; public static void main(String[] args) throws IOException { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明转发器和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String message = sdf.format(new Date()) + " : log something"; // 往转发器上发送消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }消费者1(FanoutConsumer1):
/** * Project Name:qyk_testJava * File Name:FanoutConsumer.java * Package Name:com.qiyongkang.mq.rabbitMq.exchange.fanout * Date:2017年3月6日下午6:10:10 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved. * */ package com.qiyongkang.mq.rabbitMq.exchange.fanout; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * ClassName:FanoutConsumer <br/> * Function: 打印日志到文件. <br/> * Date: 2017年3月6日 下午6:10:10 <br/> * * @author qiyongkang * @version * @since JDK 1.6 * @see */ public class FanoutConsumer1 { private final static String EXCHANGE_NAME = "qyk_ex_log"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 创建一个非持久的、唯一的且自动删除的队列 String queueName = channel.queueDeclare().getQueue(); // 为转发器指定队列,设置binding channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); // 指定接收者,第二个参数为自动应答,无需手动应答 channel.basicConsume(queueName, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("message:" + message); print2File(message); //另外需要在每次处理完成一个消息后,手动发送一次应答。 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } /** * * print2File: 打印日志到文件. <br/> * * @author qiyongkang * @param msg * @since JDK 1.6 */ private static void print2File(String msg) { try { String dir = FanoutConsumer1.class.getClassLoader().getResource("").getPath(); System.out.println("当前目录:" + dir); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".txt"); FileOutputStream fos = new FileOutputStream(file, true); fos.write((msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }消费者2:
/** * Project Name:qyk_testJava * File Name:FanoutConsumer.java * Package Name:com.qiyongkang.mq.rabbitMq.exchange.fanout * Date:2017年3月6日下午6:10:10 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved. * */ package com.qiyongkang.mq.rabbitMq.exchange.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * ClassName:FanoutConsumer <br/> * Function: 打印日志到控制台. <br/> * Date: 2017年3月6日 下午6:10:10 <br/> * * @author qiyongkang * @version * @since JDK 1.6 * @see */ public class FanoutConsumer2 { private final static String EXCHANGE_NAME = "qyk_ex_log"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 创建一个非持久的、唯一的且自动删除的队列 String queueName = channel.queueDeclare().getQueue(); // 为转发器指定队列,设置binding channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); // 指定接收者,第二个参数为自动应答,无需手动应答 channel.basicConsume(queueName, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); //打印日志到控制台 System.out.println(" [x] Received '" + message + "'"); //另外需要在每次处理完成一个消息后,手动发送一次应答。 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }测试我们可以发现,无论启动多少个消费者,所有的消费者都会收到生产者的消息。
direct类型比较清晰明了,就是在发送消息至转换器时,会指定一个路由key,消费者消费的时候也会指定一个路由key,这样你发送的消息指定的是什么routingKey,那么转发器就会把消息转给相应队列对应的消费者进行处理。 下面,笔者就直接贴出代码如下: 生产者DirectProducer:
/** * Project Name:qyk_testJava * File Name:DirectProducer.java * Package Name:com.qiyongkang.mq.rabbitMq.exchange.direct * Date:2017年3月6日下午6:42:05 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved. * */ package com.qiyongkang.mq.rabbitMq.exchange.direct; import java.util.Random; import java.util.UUID; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * ClassName:DirectProducer <br/> * Function: TODO ADD FUNCTION. <br/> * Reason: TODO ADD REASON. <br/> * Date: 2017年3月6日 下午6:42:05 <br/> * * @author qiyongkang * @version * @since JDK 1.6 * @see */ public class DirectProducer { private static final String EXCHANGE_NAME = "qyk_ex_logs_direct"; private static final String[] SEVERITIES = { "info", "warning", "error" }; public static void main(String[] argv) throws java.io.IOException { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明转发器的类型 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 发送6条消息 for (int i = 0; i < 6; i++) { String severity = getSeverity(); String message = severity + "_log :" + UUID.randomUUID().toString(); // 发布消息至转发器,指定routingkey channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } channel.close(); connection.close(); } /** * 随机产生一种日志类型 * * @return */ private static String getSeverity() { Random random = new Random(); int ranVal = random.nextInt(3); return SEVERITIES[ranVal]; } }消费者DirectConsumer:
/** * Project Name:qyk_testJava * File Name:DirectConsumer.java * Package Name:com.qiyongkang.mq.rabbitMq.exchange.direct * Date:2017年3月6日下午6:42:17 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved. * */ package com.qiyongkang.mq.rabbitMq.exchange.direct; import java.util.Random; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * ClassName:DirectConsumer <br/> * Function: TODO ADD FUNCTION. <br/> * Reason: TODO ADD REASON. <br/> * Date: 2017年3月6日 下午6:42:17 <br/> * * @author qiyongkang * @version * @since JDK 1.6 * @see */ public class DirectConsumer { private static final String EXCHANGE_NAME = "qyk_ex_logs_direct"; private static final String[] SEVERITIES = { "info", "warning", "error" }; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明direct类型转发器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); String severity = getSeverity(); // 指定binding_key channel.queueBind(queueName, EXCHANGE_NAME, severity); System.out.println(" [*] Waiting for " + severity + " logs. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } /** * 随机产生一种日志类型 * * @return */ private static String getSeverity() { Random random = new Random(); int ranVal = random.nextInt(3); return SEVERITIES[ranVal]; } }测试的时候,我们把消费者运行多次,那么就会有多个消费者,各种接受不同路由key的消息。如果指定的路由key不存在,那么消息就会被丢弃。
主题类型比direct类型更加灵活,提供.与*的匹配模式,让路由的绑定key与消费者的选择key更加强大。其中,.号可以匹配一个标识符,*号可以配置0个或多个标识符。所以路由选择的时候就更加的方便灵活了,这里笔者就直接贴出代码,大家试试便知。 生产者TopicProducer:
/** * Project Name:qyk_testJava * File Name:DirectProducer.java * Package Name:com.qiyongkang.mq.rabbitMq.exchange.topic * Date:2017年3月6日下午6:51:44 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved. * */ package com.qiyongkang.mq.rabbitMq.exchange.topic; import java.util.UUID; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * ClassName:DirectProducer <br/> * Function: TODO ADD FUNCTION. <br/> * Reason: TODO ADD REASON. <br/> * Date: 2017年3月6日 下午6:51:44 <br/> * @author qiyongkang * @version * @since JDK 1.6 * @see */ public class TopicProducer { private static final String EXCHANGE_NAME = "qyk_topic_logs"; public static void main(String[] argv) throws Exception { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String[] routing_keys = new String[] { "kernal.info", "cron.warning", "auth.info", "kernel.critical" }; for (String routing_key : routing_keys) { String msg = UUID.randomUUID().toString(); channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg .getBytes()); System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + "."); } channel.close(); connection.close(); } }消费者1(TopicConsumer):
/** * Project Name:qyk_testJava * File Name:TopicConsumer.java * Package Name:com.qiyongkang.mq.rabbitMq.exchange.topic * Date:2017年3月6日下午6:51:23 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved. * */ package com.qiyongkang.mq.rabbitMq.exchange.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * ClassName:TopicConsumer <br/> * Function: TODO ADD FUNCTION. <br/> * Reason: TODO ADD REASON. <br/> * Date: 2017年3月6日 下午6:51:23 <br/> * * @author qiyongkang * @version * @since JDK 1.6 * @see */ public class TopicConsumer { private static final String EXCHANGE_NAME = "qyk_topic_logs"; public static void main(String[] argv) throws Exception { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明转发器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 随机生成一个队列 String queueName = channel.queueDeclare().getQueue(); // 接收所有与kernel相关的消息 channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*"); System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received routingKey = " + routingKey + ",msg = " + message + "."); } } }消费者2(TopicConsumer2):
/** * Project Name:qyk_testJava * File Name:TopicConsumer.java * Package Name:com.qiyongkang.mq.rabbitMq.exchange.topic * Date:2017年3月6日下午6:51:23 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved. * */ package com.qiyongkang.mq.rabbitMq.exchange.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * ClassName:TopicConsumer <br/> * Function: TODO ADD FUNCTION. <br/> * Reason: TODO ADD REASON. <br/> * Date: 2017年3月6日 下午6:51:23 <br/> * @author qiyongkang * @version * @since JDK 1.6 * @see */ public class TopicConsumer2 { private static final String EXCHANGE_NAME = "qyk_topic_logs"; public static void main(String[] argv) throws Exception { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明转发器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 随机生成一个队列 String queueName = channel.queueDeclare().getQueue(); // 接收所有与kernel相关的消息 channel.queueBind(queueName, EXCHANGE_NAME, "*.critical"); System.out .println(" [*] Waiting for critical messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received routingKey = " + routingKey + ",msg = " + message + "."); } } }好了,转发器的类型就介绍到这儿了。 关于rabbitmq的使用,由于笔者也没用得太深入,经验也不够,所以就简单的介绍下了,希望给想学习rabbitmq的童鞋提供点帮助,同时也欢迎大家纠正问题~