Vertx, RabbitMq (1)

    xiaoxiao2021-03-25  132

    安装RabbitMq

    1.安装erlang

    2.官网下载压缩包,解压,用的win10,默认的配置路径是在C盘 ../user/AppData/Roaming/RabbitMq下

    3.安装http插件,rabbitMq默认启用amqp,其他协议需要安装插件。进入sbin,cmd运行rabbitmq-plugins enable rabbitmq_management,其他协议启用方式参照官网 插件安装

    4.配置端口号,将etc目录下的config文件改名为rabbitmq.config,放到上边提到的配置路径,ampq默认端口号是5672,http默认端口号忘了,修改rabbitmq_management节点,插入{listener, [{port,12345}]},就可以用http://localhost:12345登录查看Mq信息了,默认用户密码是guest,启动rabbitMq的时候什么鬼信息提示都没有,差评。

    5.启动mq,进入sbin目录下,运行rabbitmq-server.bat

    创建生产者和消费者

    这里用的是vertx的RabbitMqClient,vertx-RabbitMq-client 大部分Api使用和rabbitMq官网给的java client差不多,有区别的就是vertx没有channel,大概是因为,channel是要在单线程内使用的,一个vertx对应着一个线程池,所以给移掉了吧。说下遇到的几个坑

    1.关于queue的持久化,和RabbitMq官网Api一样,创建queue的时候,通过参数指定。但是消息持久化却不是,看了下api文档,没有跟这个参数有关的api,后来试着发送了下消息,打印接受到的消息,才发现是要在传送的json消息里边配置properties.deliveryMode,等于2表示将消息持久化。

    def message = [:] message.body = "test deliveryMode" def property = [:] property.contentType = "text/plain" property.deliveryMode = 2 message.properties = property client.basicPublish("exchangeName", "queueName",message, {rs -> println "sent" })

    Direct ExchangeType

    RabbitMq支持4种类型的exchange, 1.direct 2.fanout 3.headers 4.topic Exchange类似于一个适配器,根据不同的type,将routeKey匹配到对应的queue,在将queue绑定到exchange上时候需要一个routeKey参数,direct类型则表示将消息发送到routeKey完全匹配的queue上。例如

    client.queueBind("queue1", "exchange1", "route1", null)

    当消费者发送消息时,

    channel.basicPublish("exchange1", "route1", null, message)

    中间件接受到该消息,到id为exchange1的Exchange上,找到routeKey为route1的所有queue,将消息投放到这些队列上。 下边是完整的例子

    class MessageConsumer { static void main(String[] args) { def config = config() def vertx = Vertx.vertx() def client = RabbitMQClient.create(vertx, config) def queueFuture = Future.future() client.start({ conn -> if (conn.succeeded()) { client.exchangeDeclare("exchange1", "direct", true, false, null) client.exchangeDeclare("exchange2", "direct", true, false, null) client.queueDeclare("queue1", true, false, false, null) client.queueDeclare("queue2", true, false, false, null) client.queueBind("queue1", "exchange1", "route1", null) createConsumer(vertx, "address1", client) client.basicConsume("queue1", "address1", true, null) client.queueBind("queue2", "exchange2", "route2", null) createConsumer(vertx, "address2", client) client.basicConsume("queue2", "address2", true, null) queueFuture.complete() } else queueFuture.fail(conn.cause()) }) queueFuture.setHandler({ rs -> if (rs.succeeded()) println "consumer complete" else rs.cause().printStackTrace() }) } static JsonObject config() { def config = [:] config.host = "localhost" config.port = 5672 config } static void createConsumer(Vertx vertx, String name, RabbitMQClient client) { vertx.eventBus().consumer(name, { handler -> def body = handler.body() println body handler.reply(null) }) } }

    生产者

    class MessageSender { static void main(String[] args) { def config = config() def vertx = Vertx.vertx() def client = RabbitMQClient.create(vertx, config) def message = [:] message.body = "test deliveryMode" def property = [:] property.contentType = "text/plain" property.deliveryMode = 2 message.properties = property client.start({ conn -> if (conn.succeeded()) { client.basicPublish("exchange1", "route1", message, { send -> if (send.succeeded()) { println "sent" } else send.cause().printStackTrace() }) } else conn.cause().printStackTrace() }) } static JsonObject config() { def config = [:] config.host = "localhost" config.port = 5672 config.includeProperties = true def json = new JsonObject(config) return json } }
    转载请注明原文地址: https://ju.6miu.com/read-4249.html

    最新回复(0)