RocketMQ提供了强大的消息系统功能,RocketMQ提供了java客户端,可以提供使用。下面代码来自RocketMQ4.0.0中的example代码。
Producer消息生产端:
public class Producer { public static void main(String[] args) { // 创建一个Produer Group DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 指定NameServer地址 producer.setNamesrvAddr("192.168.1.163:9876"); try { // 启动producer producer.start(); // 创建一个Message ,并指定topic、Tag和消息主体 Message msg = new Message("ZZZZWWWW", "TagA", ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)); //向broker发送一个消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } //当produer不再使用时,关闭produer producer.shutdown(); } }Consumer消息消费端代码:
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //指定一个Consumer Group 来创建一个consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); //指定NameServer 地址,consumer和NameServer建立长链接,并且获取topic信息以及broker的ip和地址 consumer.setNamesrvAddr("192.168.1.163:9876"); //指定消费offset的位置。TIMESTAMP表示从consumer建立后,producer向broker新发送的消息开始 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); //指定消费topic,和过滤的TAG consumer.subscribe("ZZZZWWWW", "*"); //注册一个回调函数,当comsumer接收到消息时,执行的动作 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动comsumer实例 consumer.start(); System.out.printf("Consumer Started.%n"); } }代码分析:
在Consumer端注册了一个MessageListener 接口实现,当Consumer端接收到异步分发的消息后,会执行MessageListener中的方法。
MessageListener提供了MessageListenerConcurrently 和MessageListenerOrderly 两种策略。
根据MessageListenerConcurrently 和MessageListenerOrderly两种策略,当pull到消息后,消息会被push到ConsumeMessageConcurrentlyService 和ConsumeMessageOrderlyService 被消费掉。
1、ConsumeMessageConcurrentlyService ,其实内部为一个ThreadPoolExecutor线程池,每一个Message会被转换成一个任务class ConsumeRequest implements Runnable .ConsumerRequest Task 会提交到ThreadPoolExecutor中等待被并行执行。
在ConsumerMessageConcurrentlyService 内部,会根据ConsumeConcurrentlyStatus 处理状态的结果,来继续处理后续的事情processConsumeResult。
2、ConsumerMessageOrderlyService 里面也是一个线程缓存池,在ConsumerRequest Task任务中对每一个MessageQueue进行加锁,从而保证顺序消费。
同时,processConsumeResult也会处理消费失败的情况。
