RabbitMQ官网教程5——topic

    xiaoxiao2021-03-25  75

            前面的章节我们改进了日志系统,利用direct交换机,可以选择性的接收日志。但是,这仍然有局限,不能基于多个规则进行路由。在日志系统里,我们可能不只根据安全级别订阅日志,还可能想根据日志源来订阅。就像Unix 工具syslog,根据安全级别及设备来路由日志。这样就比较灵活了。为了实现这种效果,我们需要学习topic交换机。

     

    topic交换机

            发送到topic交换机的消息,其routing_key不能是任意值,必须是由” . “限定的一组词。routing_key中可以有很多词,但长度不能超过255字节。

            binding key也是相同的形式。topic交换的逻辑与direct交换机类似,特定routing_key的消息被投递到所有用匹配的bindingkey进行绑定的队列。对于binding key有两个重要的特别之处:

            * 表示一个词

            # 表示0个或多个词

    例子:是否投递到队列

    消息的routing key

    Q1 binding key = *.orange.*

    Q2 binding key = *.*.rabbit

                  lazy.#

    quick.orange.rabbit

    lazy.orange.elephant

    quick.orange.fox

    lazy.brown.fox

    lazy.pink.rabbit

    是(匹配两个绑定,也只投递一次)

    quick.brown.fox

    orange

    quick.orange.male.rabbit

    lazy.orange.male.rabbit

            topic交换机很强大,可以像其他交换机一样工作。如果一个队列用”#”绑定,它将接收所有消息,而不管routing key,就像fanout交换机。如果绑定中没有用*和#,topic交换机就跟direct交换机一样。

    import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

    转载请注明原文地址: https://ju.6miu.com/read-17107.html

    最新回复(0)