主要介绍了RocketMQ生产者最佳实践代码,以Java为例
RocketMQ 最佳实践
本文章的程序运行环境如下:
JKD1.8
RocketMQ-Client 4.3.0
RocketMQ-Server 4.8.0
NameServer Addr: 127.0.0.1:9876
前期准备
首先我们新建一个Demo类来存储所有的代码,我们使用默认的生产者实现 DefaultMQProducer
来操作,并且制定默认的生产者组,该构造方法的第二个参数是 RPCHook
,RPCHook
是一个接口,具体实现交由业务端实现,两个方法分别是:doBeforeRequest
和doAfterResponse
,表示在执行请求之前和接收返回之后分别执行相关逻辑
@Slf4j
public class ProducerDemo {
private DefaultMQProducer producer;
private String topicName = "test-topic";
public ProducerDemo(String namesrvAdder, String producerGroupName) throws MQClientException {
// 实例化消息生产者Producer
// 这里是因为方便测试才这样做的,实际上应该由Ioc传递进来的
producer = new DefaultMQProducer(producerGroupName);
producer.setNamesrvAddr(namesrvAdder);
producer.start();
}
public ProducerDemo(String namesrvAdder, String producerGroupName, String topicName) throws MQClientException {
this(namesrvAdder, producerGroupName);
this.topicName = topicName;
}
public void closeProducers() {
producer.shutdown();
}
}
我们再来新建一个测试类来测试我们的代码,并指定默认行为,以后测试的时候我们只需要使用producer就可以了
public class ProducerDemoTest {
private ProducerDemo producer;
@Before
public void init() throws MQClientException {
producer = new ProducerDemo("localhost:9876", "test_producer");
}
@After
public void close() {
producer.closeProducers();
}
}
同步发送消息
同步发送消息,根据消息类型不同 增加了 tag、keys等
public void sync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
Message message = new Message(topicName, "sync,no tag".getBytes(Charset.defaultCharset()));
SendResult result = producer.send(message);
log.info("sync :{}\n", result);
Message hasTagMessage = new Message(topicName, "test_tag", "sync,has tag".getBytes(StandardCharsets.UTF_8));
result = producer.send(hasTagMessage);
log.info("has tag{}\n", result);
Message hasTagAndKeysMessage = new Message(topicName, "test_tag", "test_keys", "sync,has tag and keys".getBytes(StandardCharsets.UTF_8));
result = producer.send(hasTagAndKeysMessage);
log.info("has tag and key result:{}\n", result);
}
异步发送消息
异步发送消息,在 send
方法执行之后程序会继续往下执行
public void async() throws RemotingException, InterruptedException, MQClientException {
Message message = new Message(topicName, "async, no tag".getBytes(StandardCharsets.UTF_8));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("async result:{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("async got exception:{}", throwable.toString());
}
});
Thread.sleep(2000);
}
批量发送消息
public void batchSend() throws Exception {
ArrayList<Message> list = new ArrayList<>();
list.add(new Message(topicName, "batch message no.1".getBytes(StandardCharsets.UTF_8)));
list.add(new Message(topicName, "batch message no.2".getBytes(StandardCharsets.UTF_8)));
list.add(new Message(topicName, "batch message no.3".getBytes(StandardCharsets.UTF_8)));
SendResult result = producer.send(list);
log.info("batch send got :{}", result);
}
指定消息队列发送消息
public void sendToMessageQueue() throws Exception {
Message message = new Message(topicName, "send 2 message queue".getBytes(StandardCharsets.UTF_8));
List<MessageQueue> queues = producer.fetchPublishMessageQueues(topicName);
MessageQueue queue = queues.get(queues.size() - 1);
producer.send(message, queue);
}