默认创建的topic读写队列数是4,用的3.5.8版本。生产者的源代码DEMO如下:
package com.xd.rocketmq;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* Created by dong on 2016/12/26.
*/
public class Producer {
public static void main(String[] args)
throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
final DefaultMQProducer producer =
new DefaultMQProducer(
"ProducerGroupName");
//设置namesrv地址
producer.setNamesrvAddr(
"192.168.100.1:9876;192.168.100.1:9876");
producer.setInstanceName(
"Producer");
// Producer对象在使用之前必须要调用start初始化,初始化一次即可
//不可以在每次发送消息时,都调用start方法
producer.start();
for (
int i =
0; i <
10; i++) {
{
Message msg =
new Message(
"TopicTest",
"Tag",
"OrderId",
(
"你好,TopicTest,my name is xxd_"+i).getBytes());
System.
out.println(
"message:"+(
new String(msg.getBody())));
System.
out.println(msg);
SendResult sendResult = producer.send(msg);
System.
out.println(
"Producer.main:sendResult:"+sendResult);
}
{
Message msg =
new Message(
"TopicTest1",
"Tag1",
"OrderId1",
(
"你好,TopicTest1,my name is xxd_"+i).getBytes());
System.
out.println(
"message:"+(
new String(msg.getBody())));
SendResult sendResult = producer.send(msg);
System.
out.println(
"Producer.main:sendResult:"+sendResult);
}
TimeUnit.
MILLISECONDS.sleep(
1000);
}
System.
out.println(
"group name:"+producer.getProducerGroup());
//shutdown
Runtime.
getRuntime().addShutdownHook(
new Thread(
new Runnable() {
public void run() {
producer.shutdown();
}
}));
System.
exit(
0);
}
}
消费者的源代码DEMO如下:
package com.xd.rocketmq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* Created by dong on 2016/12/26.
*/
public class Consumer {
public static void main(String[] args)
throws MQClientException {
final DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer(
"ConsumberGroupName");
consumer.setNamesrvAddr(
"192.168.100.1:9876;192.168.100.1:9876");
consumer.setInstanceName(
"Consumer");
consumer.subscribe(
"TopicTest",
"Tag");
consumer.subscribe(
"TopicTest1",
"*");
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.
out.println(Thread.
currentThread().getName() +
" Receive New Messages: " + msgs);
for (Message msg :
msgs) {
System.
out.println(
"Consumer.consumeMessage:"+(
new String(msg.getBody())));
}
return ConsumeConcurrentlyStatus.
CONSUME_SUCCESS;
}
});
System.
out.println(
"clientIP:"+consumer.getClientIP());;
consumer.start();
System.
out.println(
"Consumer Started.");
}
}
转载请注明原文地址: https://ju.6miu.com/read-664348.html