使用RocketMQ的客户端使用

    xiaoxiao2021-03-25  87

    RocketMQ提供了强大的消息系统功能,RocketMQ提供了java客户端,可以提供使用。下面代码来自RocketMQ4.0.0中的example代码。

    Producer消息生产端:

    public class Producer { public static void main(String[] args) { // 创建一个Produer Group DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 指定NameServer地址 producer.setNamesrvAddr("192.168.1.163:9876"); try { // 启动producer producer.start(); // 创建一个Message ,并指定topic、Tag和消息主体 Message msg = new Message("ZZZZWWWW", "TagA", ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)); //向broker发送一个消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } //当produer不再使用时,关闭produer producer.shutdown(); } }

    Consumer消息消费端代码:

    public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //指定一个Consumer Group 来创建一个consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); //指定NameServer 地址,consumer和NameServer建立长链接,并且获取topic信息以及broker的ip和地址 consumer.setNamesrvAddr("192.168.1.163:9876"); //指定消费offset的位置。TIMESTAMP表示从consumer建立后,producer向broker新发送的消息开始 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); //指定消费topic,和过滤的TAG consumer.subscribe("ZZZZWWWW", "*"); //注册一个回调函数,当comsumer接收到消息时,执行的动作 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动comsumer实例 consumer.start(); System.out.printf("Consumer Started.%n"); } }

    代码分析:

    在Consumer端注册了一个MessageListener 接口实现,当Consumer端接收到异步分发的消息后,会执行MessageListener中的方法。

    MessageListener提供了MessageListenerConcurrently 和MessageListenerOrderly 两种策略。

    根据MessageListenerConcurrently 和MessageListenerOrderly两种策略,当pull到消息后,消息会被push到ConsumeMessageConcurrentlyService 和ConsumeMessageOrderlyService 被消费掉。

    1、ConsumeMessageConcurrentlyService  ,其实内部为一个ThreadPoolExecutor线程池,每一个Message会被转换成一个任务class ConsumeRequest implements Runnable .ConsumerRequest Task 会提交到ThreadPoolExecutor中等待被并行执行。

    在ConsumerMessageConcurrentlyService 内部,会根据ConsumeConcurrentlyStatus 处理状态的结果,来继续处理后续的事情processConsumeResult。

    2、ConsumerMessageOrderlyService 里面也是一个线程缓存池,在ConsumerRequest  Task任务中对每一个MessageQueue进行加锁,从而保证顺序消费。

    同时,processConsumeResult也会处理消费失败的情况。

    转载请注明原文地址: https://ju.6miu.com/read-20372.html

    最新回复(0)