RabbitMQ官网教程4——路由

    xiaoxiao2021-03-25  116

            前面的章节我们创建了一个简单的日志系统,可以把消息广播到许多接收者。本节我们将增加一个特性——只订阅一部分消息。比如只把错误日志输出到文件,同时把所有日志输出到屏幕。

    绑定

            前面的例子里我们已经创建过绑定。绑定就是交换机和队列间的一种关系,简单解读为队列关注该交换机的消息。创建绑定时可以增加一个参数routing_key,为了避免跟basic_publish的参数混淆,我们先把它叫做binding key。

            channel.queue_bind(exchange=exchange_name,queue=queue_name, routing_key='black')

    binding key的意义与交换机类型有关,fanout类型的交换机直接忽略该参数值。

     

    direct交换机

            之前的日志系统把所有的消息广播给所有的消费者。现在我们希望根据消息的安全级别进行过滤。比如只讲错误日志写入磁盘,而不为告警日志和信息日志浪费磁盘空间。fanout交换机不能提供这种灵活性,它只会广播。

            这里我们需要direct交换机,其路由规则也很简单——消息将发往满足条件的队列:队列的binding key严格匹配消息的routing key。

     

    multiple bindings

            用同样的binding key绑定多个队列是可以的。

     

    提交日志

            使用direct交换机,把日志的安全级别作为消息的routing key。接收端可以按级别接收。

     

    import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 2 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()

    订阅

            根据级别创建不同的绑定。

    import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

    转载请注明原文地址: https://ju.6miu.com/read-7853.html

    最新回复(0)