RocketMQ 生产者

主要介绍了RocketMQ生产者最佳实践代码,以Java为例

RocketMQ 最佳实践

本文章的程序运行环境如下:

JKD1.8

RocketMQ-Client 4.3.0

RocketMQ-Server 4.8.0

NameServer Addr: 127.0.0.1:9876

前期准备

首先我们新建一个Demo类来存储所有的代码,我们使用默认的生产者实现 DefaultMQProducer 来操作,并且制定默认的生产者组,该构造方法的第二个参数是 RPCHookRPCHook 是一个接口,具体实现交由业务端实现,两个方法分别是:doBeforeRequestdoAfterResponse,表示在执行请求之前和接收返回之后分别执行相关逻辑

@Slf4j
public class ProducerDemo {
    private DefaultMQProducer producer;
    private String topicName = "test-topic";

    public ProducerDemo(String namesrvAdder, String producerGroupName) throws MQClientException {
        // 实例化消息生产者Producer
        // 这里是因为方便测试才这样做的,实际上应该由Ioc传递进来的
        producer = new DefaultMQProducer(producerGroupName);
        producer.setNamesrvAddr(namesrvAdder);
        producer.start();
    }

    public ProducerDemo(String namesrvAdder, String producerGroupName, String topicName) throws MQClientException {
        this(namesrvAdder, producerGroupName);
        this.topicName = topicName;
    }
    
	public void closeProducers() {
        producer.shutdown();
    }
}

我们再来新建一个测试类来测试我们的代码,并指定默认行为,以后测试的时候我们只需要使用producer就可以了

public class ProducerDemoTest {
    private ProducerDemo producer;

    @Before
    public void init() throws MQClientException {
        producer = new ProducerDemo("localhost:9876", "test_producer");
    }
    
 	@After
    public void close() {
        producer.closeProducers();
    }
}

同步发送消息

同步发送消息,根据消息类型不同 增加了 tag、keys等

public void sync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {

    Message message = new Message(topicName, "sync,no tag".getBytes(Charset.defaultCharset()));
    SendResult result = producer.send(message);
    log.info("sync :{}\n", result);

    Message hasTagMessage = new Message(topicName, "test_tag", "sync,has tag".getBytes(StandardCharsets.UTF_8));
    result = producer.send(hasTagMessage);
    log.info("has tag{}\n", result);

    Message hasTagAndKeysMessage = new Message(topicName, "test_tag", "test_keys", "sync,has tag and keys".getBytes(StandardCharsets.UTF_8));
    result = producer.send(hasTagAndKeysMessage);
    log.info("has tag and key result:{}\n", result);
}

异步发送消息

异步发送消息,在 send 方法执行之后程序会继续往下执行

public void async() throws RemotingException, InterruptedException, MQClientException {
    Message message = new Message(topicName, "async, no tag".getBytes(StandardCharsets.UTF_8));

    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
        	log.info("async result:{}", sendResult);
        }

        @Override
        public void onException(Throwable throwable) {
        	log.error("async got exception:{}", throwable.toString());
        }
    });

    Thread.sleep(2000);
}

批量发送消息

public void batchSend() throws Exception {
    ArrayList<Message> list = new ArrayList<>();
    list.add(new Message(topicName, "batch message no.1".getBytes(StandardCharsets.UTF_8)));
    list.add(new Message(topicName, "batch message no.2".getBytes(StandardCharsets.UTF_8)));
    list.add(new Message(topicName, "batch message no.3".getBytes(StandardCharsets.UTF_8)));
    SendResult result = producer.send(list);
    log.info("batch send got :{}", result);
}

指定消息队列发送消息

public void sendToMessageQueue() throws Exception {
    Message message = new Message(topicName, "send 2 message queue".getBytes(StandardCharsets.UTF_8));
    List<MessageQueue> queues = producer.fetchPublishMessageQueues(topicName);
    MessageQueue queue = queues.get(queues.size() - 1);
    producer.send(message, queue);
}
请用钱砸死我!!!