前面的章节我们改进了日志系统,利用direct交换机,可以选择性的接收日志。但是,这仍然有局限,不能基于多个规则进行路由。在日志系统里,我们可能不只根据安全级别订阅日志,还可能想根据日志源来订阅。就像Unix 工具syslog,根据安全级别及设备来路由日志。这样就比较灵活了。为了实现这种效果,我们需要学习topic交换机。
topic交换机
发送到topic交换机的消息,其routing_key不能是任意值,必须是由” . “限定的一组词。routing_key中可以有很多词,但长度不能超过255字节。
binding key也是相同的形式。topic交换的逻辑与direct交换机类似,特定routing_key的消息被投递到所有用匹配的bindingkey进行绑定的队列。对于binding key有两个重要的特别之处:
* 表示一个词
# 表示0个或多个词
例子:是否投递到队列
消息的routing key
Q1 binding key = *.orange.*
Q2 binding key = *.*.rabbit
lazy.#
quick.orange.rabbit
是
是
lazy.orange.elephant
是
是
quick.orange.fox
是
否
lazy.brown.fox
否
是
lazy.pink.rabbit
否
是(匹配两个绑定,也只投递一次)
quick.brown.fox
否
否
orange
否
否
quick.orange.male.rabbit
否
否
lazy.orange.male.rabbit
否
是
topic交换机很强大,可以像其他交换机一样工作。如果一个队列用”#”绑定,它将接收所有消息,而不管routing key,就像fanout交换机。如果绑定中没有用*和#,topic交换机就跟direct交换机一样。
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()