RocketMQ 生产者消息发送

还是书接上文 [RocketMQ生产者] ,我们把目光投入到消息发送中,由于篇幅限制,本文章我们只介绍同步发送,中间可能会穿插一些其余的内容。

RocketMQ 生产者消息发送

我们一般在使用RocketMQ客户端的时候一般把它分为三层:业务层、消息处理层、通信层,其中业务层一般为调用 producer.send(message) 的那一层,消息处理层主要接受业务层传递过来的消息体,做一些处理:检查消息是否合规、压缩消息等,并为通信层做准备。通信层是指Rocket MQ基于Netty封装的一个RPC通信服务。

消息发送流程的主要步骤为:验证消息、查找路由、发送消息(包括异常处理等),接下来我们会一一讲解这三个步骤。

验证消息

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    Validators.checkMessage(msg, this);
    msg.setTopic(withNamespace(msg.getTopic()));
    return this.defaultMQProducerImpl.send(msg);
}

这一部分主要体现在 DefaultMQProducerValidators.checkMessage(msg, this); 步骤中,其中主要包含了验证消息是否为空、检查topic是否合规、检查消息体是否为空是否大于最大长度(默认为4M)。同时为了支持 namespace 重新设置一下消息的 topic 信息。

之后又是我们的老朋友 DefaultMQProducerImpl ,上一篇文章中我们讲到了 DefaultMQProducerImpl 是生产者的默认实现,可以认为它负责生产者的所有消息发送操作。

private int sendMsgTimeout = 3000;
/**
* DEFAULT SYNC -------------------------------------------------------
*/
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);
    // do send
}

从上面这几段代码可以知道:消息默认以同步的方式发送,默认超时时间为3S,同时检查一下当前生产者是否处于可用状态。

Q:为什么 DefaultMQProducer 验证了一遍消息,DefaultMQProducerImpl 还需要再次验证!!!

A:实际上答案就出现在了问题中:因为他们不是同一个类,你不能保证 DefaultMQProducerImpl 只有 DefaultMQProducer 使用了,所以 DefaultMQProducerImpl 是一定需要验证消息的,至于 DefaultMQProducer 验证不验证就看个人喜好,其实不验证问题也不大

路由查找

在发送消息之前,首先需要获取主题的路由信息,只有获取到了路由信息我们才能找到消息要发送到哪个Broker 中,具体表现在 TopicPublishInfo info = this.tryToFindTopicPublishInfo(msg.getTopic()); 方法中。返回的消息就是具体的路由信息,我们首先介绍一下什么是 TopicPublishInfo

TopicPublishInfo

它存放了关于一个topic中的全部信息。我们先来看一下它的内部构造,

public class TopicPublishInfo {
    // 是否是有序消息
    private boolean orderTopic = false;
    // 是否有路由信息
    private boolean haveTopicRouterInfo = false;
    // message queue 信息
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    // 用于选择消息队列,每次选择一次消息队列,该值都会自增
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    // 主题元数据
    private TopicRouteData topicRouteData;
	public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        // 选择与上次broker不同的队列,如果发现都是在同一个broker中则随机选择一个队列
    }
    
    public MessageQueue selectOneMessageQueue() {
        // 随机选择一个队列
    }   
}

public class TopicRouteData extends RemotingSerializable {
	// 
    private String orderTopicConf;
    // topic 队列元数据
    private List<QueueData> queueDatas;
    // broker 元数据
    private List<BrokerData> brokerDatas;
    // broker上过滤服务器的地址
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

有些小伙伴可能看到代码可能还比较模糊,那么我们再具体一点,这个类可以解决:一个Topic有多少个队列(messageQueueList.size),每个队列的信息(MessageQueue),我们可以自由选择队列(selectOneMessageQueue)、也可以排除掉一个broker来选择队列(均匀分布压力、排除异常broker)

TopicPublishInfo 中的 TopicRouteData 数据则表明了这个topic的数据分布情况,比如说topic、queue分布在哪些broker中,queue有多少个读队列多少个写队列等。

查找算法

我们主要来介绍 DefaultMQProducer.tryToFindTopicPublishInfo 方法,这个方法实现了查找具体的topic信息

private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();
private MQClientInstance mQClientFactory;

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

首先从当前生产者内存缓存中查询topic的信息,如果找到了topic信息则直接返回内存缓存中的信息。如果未找到信息则接着从 NameServer 中查找。注意这个 MQClientInstance 也是一个相当熟悉的类了,我们之前说 MQClientInstance 封住了RocketMQ网络处理API,是一个非常重要的类,生产者就是通过它来和 NameServer 请求数据的。而且从这段代码中我们看到生产者分别请求了两次 NameServer ,并且第二次请求多了几个参数,这是因为第一次请求不到topic信息,所以第二次请求默认主题 createTopicKey 的信息,这也就是第二个参数 isDefault:true 的含义,至于为什么需要传递defaultMQProducer是因为需要获取createTopicKey的值。

看到这里可能就有人要问了:为什么updateTopicRouteInfoFromNameServer不需要传递具体的生产者过去,之后我们就可以直接从生产者内部缓存topicPublishInfoTable直接拿取数据呢?

还记得上篇文章说的生产者注册流程吧, 每一个生产者启动的之后都会把自身注册到 MQClientInstance 中,所以 MQClientInstance 中已经包含了生产者,自然就不需要传递行参了

消息发送

找到了topic信息我们可以来进行消息发送了,首先我们可以先思考一下消息发送的流程,是不是我们直接消息发送一下就可以了呢?并不是的,我们要考虑到网络异常等偶然的原因导致的消息发送失败,一般来说解决这种问题最简单的方法也就是重试

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
for (; times < timesTotal; times++) {
	// 选择消息队列
    // 发送消息   
}

这里是同步方法的重试次数计算公式,异步重试机制在收到消息发送结构之后执行回执回调之前进行重试。接下来其实也很简单,就是选择消息队列、发送消息、发送消息成功就返回,失败就重试。

选择消息队列

// sendDefaultImpl 方法内部,上一段代码的for循环之中
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

// MQFaultStrategy.java
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        // do something
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

可以看出消息队列的选择主要由 MQFaultStrategy 实现,我们直接看上段代码的第11行, 是否开启Broker的故障延迟机制。

如果不开启broker的发送延时故障机制的话,默认就是消息队列轮询投递,如果某一个broker发送一场就排除掉这个broker,这是一个比较简单的算法,也基本能够使得队列消息数分布均匀,但是这也暴露了一个问题,就是有一些队列可能因为自身数量积压等原因,可能投递的时间比较长,对于这样的队列会影响后续投递的效果。

那么如何解决这个问题呢,我们可以统计出每次消息发送的投递时长,根据这个时长不就可以知道哪个队列投递较快了吗?这也就是 sendLatencyFaultEnable 的作用。

首先我们按顺序选择一个投递时间较短的、基本可用的队列,如果找到了就直接返回

int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    if (pos < 0)
        pos = 0;
    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
        return mq;
}

如果找不到的话就选择一个延时较低的队列,

final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();

具体的消息发送

调用 sendKernelImpl 的重载方法

private SendResult sendKernelImpl(final Message msg, // 待发送消息
        final MessageQueue mq, // 要发送的mq
        final CommunicationMode communicationMode, // 消息发送模式
        final SendCallback sendCallback, // 异步消息回调函数
        final TopicPublishInfo topicPublishInfo, // topic路由信息
        final long timeout /*消息发送超时时间*/ ){
    // 消息发送
}

开坑,有时间再写

2021.7.1 17.40

接坑,继续写。。。。

找到Broker地址

首先我们必须要找到要发送到哪个Broker,

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

// MQClientInstance.java
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
public String findBrokerAddressInPublish(final String brokerName) {
    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    if (map != null && !map.isEmpty()) {
        return map.get(MixAll.MASTER_ID);
    }

    return null;
}

这里我们之前确定的消息队列信息来获取Broker的网络地址,这里为什么不从 TopicPublishInfo 中获取 Broker 的地址呢,这是因为 TopicPublishInfo 存放了Broker的地址,但是并没有存放具体的MQ和Broker的对应关系,所以还是要从 MQClientInstance 中获取,可以看到我们从 MQClientInstance 中是获取Broker中的主服务器的地址。如果发现本地没有缓存 Broker 的信息,就从 NameServer 中拉去一次,并再次获取

为消息分配Message ID

首先我们来看一下Message ID的分配吧

if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
    msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

如果该消息是批量消息的话,就会为这一批消息配置一个全局的Message ID,并且尝试压缩消息体并更改状态,如果是事务Prepared消息也更改状态。注意一下这个算法,很明显是使用sysFlag的位来充当一定的信息。这样的话就不必要使用多个状态,而且接下来只要再重复或运算一次就可以得出标识位的状态。

钩子方法

还记得我们之前提到的 Producer 启动时的 RPCHook 对象吧。只是我们当时传递的 null 进去的。在这里就使用到啦。

构造消息体

在这里构造消息体,需要包含一定的信息。比如说生产者组名、Topic名称、默认Topic名称等等。

消息发送

这里会根据消息传递方式的不同选择不同的消息传递方式进行网络传输

switch (communicationMode) {
    case ASYNC:
        // do async
    case ONEWAY:
    case SYNC:
        // do sync
    default:
        assert false;
        break;
}

注意异步是在这里进行重试的,原因是因为没有接受到response,但是调用的入口是在收到服务端响应包的时候进行的。

钩子方法

同上

写在结尾

2021.6.30 20:16

没想到这玩意这么难写,写了快两个小时了还没写完,而且还有很多地方没有写好,比如说:LatencyFaultTolerance 延时策略啊、默认主题啊等等,感觉写一个TODO会好一点,不然都会忘记

今天就这样吧,明天补上

2021.7.1 18:34

结束生产者,虽然没写什么orz。。。

请用钱砸死我!!!