rabbitmq 3.5.6->3.6.1升级 ,使用说明

    xiaoxiao2026-05-26  3

    问题1:

    v3.5.6 erlang版本:R14B

    # rpm -qa | grep erlang

    erlang-typer-R14B-04.3.el6.x86_64

    ...

    操作1:

    停止删除rabbitmq3.5.6

    # rabbitmqctl stop

    # rabbitmq-server start #启动

    操作2:

    #删除旧版本 rpm -e --allmatches --nodeps  `rpm -qa |grep erlang`  

    #安装参考(yum安装) https://www.erlang-solutions.com/resources/download.html

    # wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm # rpm -Uvh erlang-solutions-1.0-1.noarch.rpm # sudo yum install erlang # sudo yum install esl-erlang

    # rpm -ihv rabbitmq-server-3.6.1-1.noarch.rpm

    # rabbitmq-server start &

    访问:127.0.0.1:15672

    ----转:http://blog.csdn.net/shatty/article/details/9529463

    1,基本概念

    RabbitMQ是流行的开源消息队列系统,用erlang语言开发。我曾经对这门语言挺有兴趣,学过一段时间,后来没坚持。 RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看 RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。 几个概念说明: Broker:简单来说就是消息队列服务器实体。   Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。   Queue:消息队列载体,每个消息都会被投入到一个或多个队列。   Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。   Routing Key:路由关键字,exchange根据这个关键字进行消息投递。   vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。   producer:消息生产者,就是投递消息的程序。   consumer:消息消费者,就是接受消息的程序。   channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。 消息队列的使用过程大概如下: (1)客户端连接到消息队列服务器,打开一个channel。   (2)客户端声明一个exchange,并设置相关属性。   (3)客户端声明一个queue,并设置相关属性。   (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。   (5)客户端投递消息到exchange。 exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。 RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:   (1)exchange持久化,在声明时指定durable => 1   (2)queue持久化,在声明时指定durable => 1   (3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。 2,环境搭建 因为最经工作在 Openstack的开发上,所以我直接在ubuntu上使用 devstack安装的 RabbitMQ。Ubuntu上也可以手动安装 RabbitMQ(sudo apt-get install rabbitmq-server)。 RabbitMQ官网给的教程使用的 pika连接 RabbitMQ。 Devstack 默认的是用kombu连接 RabbitMQ。所以先要安装pika。 sudo apt-get install python-pip git-core sudo pip install pika==0.9.8 环境搭建起来后就可以按照教程试验了。 3,实例教程 3.1 生产/消费者 最简单的示例,先尝试发送一个message给接受者(这里的例子和官网教程有点差别,设置连接参数和官网不同,因为devstack默认安装的Rabbitmq时修改了guest的密码为nova) receiver.py [html]  view plain  copy #!/usr/bin/env python   import pika      credentials = pika.PlainCredentials('guest', 'nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials    )   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.queue_declare(queue='hello')      print ' [*] Waiting for messages. To exit press CTRL+C'      def callback(ch, method, properties, body):       print " [x] Received %r" % (body,)      channel.basic_consume(callback,                         queue='hello',                         no_ack=True)      channel.start_consuming()   send.py [python]  view plain  copy #!/usr/bin/env python   import pika      credentials = pika.PlainCredentials('guest''nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials)   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.queue_declare(queue='hello')      channel.basic_publish(exchange='',                         routing_key='hello',                         body='Hello World!')   print " [x] Sent 'Hello World!'"   connection.close()   python receiver.py python send.py 3.2 工作队列 生产者生产消息,放入由routing_key指定的队列。消费者绑定该queue消费message。 new_task.py [html]  view plain  copy #!/usr/bin/env python   import sys   import pika      credentials = pika.PlainCredentials('guest', 'nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials)   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.queue_declare(queue='task_queue'durable=True)      message = ' '.join(sys.argv[1:]) or "Hello World!"   channel.basic_publish(exchange='',                         routing_key='task_queue',                         body=message,                         properties=pika.BasicProperties(                            delivery_mode = 2, # make message persistent                         ))   print " [x] Sent %r" % (message,)   connection.close()   worker.py [python]  view plain  copy #!/usr/bin/env python   import time   import pika      credentials = pika.PlainCredentials('guest''nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials    )   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.queue_declare(queue='task_queue', durable=True)   print ' [*] Waiting for messages. To exit press CTRL+C'      def callback(ch, method, properties, body):       print " [x] Received %r" % (body,)       time.sleep( body.count('.') )       print " [x] Done"       ch.basic_ack(delivery_tag = method.delivery_tag)      channel.basic_qos(prefetch_count=1)   channel.basic_consume(callback,                         queue='task_queue')      channel.start_consuming()   shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C shell3$ python new_task.py First message. shell3$ python new_task.py Second message.. shell3$ python new_task.py Third message... shell3$ python new_task.py Fourth message.... shell3$ python new_task.py Fifth message..... 3.2 发布订阅 发送者发送消息到制定的exchange,接受者绑定队列到exchange接受消息。 emit_log.py [python]  view plain  copy #!/usr/bin/env python   import sys   import pika      credentials = pika.PlainCredentials('guest''nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials)   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.exchange_declare(exchange='logs',                            type='fanout')      message = ' '.join(sys.argv[1:]) or "info: Hello World!"   channel.basic_publish(exchange='logs',                         routing_key='',                         body=message)   print " [x] Sent %r" % (message,)   connection.close()   receive_log.py [python]  view plain  copy #!/usr/bin/env python   import time   import pika      credentials = pika.PlainCredentials('guest''nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials    )   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.exchange_declare(exchange='logs',                            type='fanout')      result = channel.queue_declare(exclusive=True)   queue_name = result.method.queue      channel.queue_bind(exchange='logs',                      queue=queue_name)      print ' [*] Waiting for logs. To exit press CTRL+C'      def callback(ch, method, properties, body):       print " [x] %r" % (body,)      channel.basic_consume(callback,                         queue=queue_name,                         no_ack=True)      channel.start_consuming()   $ python emit_log.py $ python receive_logs.py 3.2 消息路由 发送者不但可以指定消息的exchange,还可以指定消息的routing_key。进一步增加了消息投递的灵活性。 emit_logs_direct.py [python]  view plain  copy #!/usr/bin/env python   import sys   import pika      credentials = pika.PlainCredentials('guest''nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials)   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.exchange_declare(exchange='direct_logs',                            type='direct')   severity = sys.argv[1if len(sys.argv) > 1 else 'info'   message = ' '.join(sys.argv[2:]) or "info: Hello World!"   channel.basic_publish(exchange='direct_logs',                         routing_key=severity,                         body=message)   print " [x] Sent %r:%r" % (severity,message)   connection.close()   receive_logs_direct.py [python]  view plain  copy #!/usr/bin/env python   import sys   import time   import pika      credentials = pika.PlainCredentials('guest''nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials    )   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.exchange_declare(exchange='direct_logs',                            type='direct')      result = channel.queue_declare(exclusive=True)   queue_name = result.method.queue   severities = sys.argv[1:]   if not severities:       print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \                            (sys.argv[0],)       sys.exit(1)      for severity in severities:       channel.queue_bind(exchange='direct_logs',                          queue=queue_name,                          routing_key=severity)      print ' [*] Waiting for logs. To exit press CTRL+C'      def callback(ch, method, properties, body):       print " [x] %r:%r" % (method.routing_key, body,)      channel.basic_consume(callback,                         queue=queue_name,                         no_ack=True)      channel.start_consuming()   $ python receive_logs_direct.py info warning error [*] Waiting for logs. To exit press CTRL+C $ python emit_log_direct.py error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.' 3.2 Topics 比exchange+routing_key更加灵活,routing_key可以支持通配符。 emit_logs_topic.py [python]  view plain  copy #!/usr/bin/env python   import pika   import sys      credentials = pika.PlainCredentials('guest''nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials)   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.exchange_declare(exchange='topic_logs',                            type='topic')      routing_key = sys.argv[1if len(sys.argv) > 1 else 'anonymous.info'   message = ' '.join(sys.argv[2:]) or 'Hello World!'   channel.basic_publish(exchange='topic_logs',                         routing_key=routing_key,                         body=message)   print " [x] Sent %r:%r" % (routing_key, message)   connection.close()   receive_logs_topic.py [python]  view plain  copy #!/usr/bin/env python   import pika   import sys      credentials = pika.PlainCredentials('guest''nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials)   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.exchange_declare(exchange='topic_logs',                            type='topic')      result = channel.queue_declare(exclusive=True)   queue_name = result.method.queue      binding_keys = sys.argv[1:]   if not binding_keys:       print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)       sys.exit(1)      for binding_key in binding_keys:       channel.queue_bind(exchange='topic_logs',                          queue=queue_name,                          routing_key=binding_key)      print ' [*] Waiting for logs. To exit press CTRL+C'      def callback(ch, method, properties, body):       print " [x] %r:%r" % (method.routing_key, body,)      channel.basic_consume(callback,                         queue=queue_name,                         no_ack=True)      channel.start_consuming()   python emit_log_topic.py "kern.critical" "A critical kernel error" python receive_logs_topic.py "kern.*" "*.critical" 3.2 远程过程调用 直接从一个进程调用另外一个进程。 rpc_client.py [python]  view plain  copy #!/usr/bin/env python   import pika   import uuid      class FibonacciRpcClient(object):       def __init__(self):           credentials = pika.PlainCredentials('guest''nova')           parameters = pika.ConnectionParameters('localhost',5672,'/',credentials)           self.connection = pika.BlockingConnection(parameters)           self.channel = self.connection.channel()              result = self.channel.queue_declare(exclusive=True)           self.callback_queue = result.method.queue              self.channel.basic_consume(self.on_response, no_ack=True,                                      queue=self.callback_queue)          def on_response(self, ch, method, props, body):           if self.corr_id == props.correlation_id:               self.response = body          def call(self, n):           self.response = None           self.corr_id = str(uuid.uuid4())           self.channel.basic_publish(exchange='',                                      routing_key='rpc_queue',                                      properties=pika.BasicProperties(                                            reply_to = self.callback_queue,                                            correlation_id = self.corr_id,                                            ),                                      body=str(n))           while self.response is None:               self.connection.process_data_events()           return int(self.response)      fibonacci_rpc = FibonacciRpcClient()      print " [x] Requesting fib(30)"   response = fibonacci_rpc.call(30)   print " [.] Got %r" % (response,)   rpc_server.py [python]  view plain  copy #!/usr/bin/env python   import pika      credentials = pika.PlainCredentials('guest''nova')   parameters = pika.ConnectionParameters('localhost',5672,'/',credentials)   connection = pika.BlockingConnection(parameters)   channel = connection.channel()      channel.queue_declare(queue='rpc_queue')      def fib(n):       if n == 0:           return 0       elif n == 1:           return 1       else:           return fib(n-1) + fib(n-2)      def on_request(ch, method, props, body):       n = int(body)          print " [.] fib(%s)"  % (n,)       response = fib(n)          ch.basic_publish(exchange='',                        routing_key=props.reply_to,                        properties=pika.BasicProperties(correlation_id = \                                                        props.correlation_id),                        body=str(response))       ch.basic_ack(delivery_tag = method.delivery_tag)      channel.basic_qos(prefetch_count=1)   channel.basic_consume(on_request, queue='rpc_queue')      print " [x] Awaiting RPC requests"   channel.start_consuming()   $ python rpc_server.py [x] Awaiting RPC requests $ python rpc_client.py [x] Requesting fib(30) 3,结束 Rabbitmq的topics方式,远程过程调用方式在openstack的源码中大量使用,如果不理解这几种方式,理解openstack源码还是比较费力的。 kombu的例子直接在kombu官网可以找到,在devstack环境可以直接跑。这里就不列了。
    转载请注明原文地址: https://ju.6miu.com/read-1310079.html
    最新回复(0)