简单介绍
RocketMQ
生产者启动流程
RocketMQ 生产者启动流程
还记得上一个章节的生产者发送消息的代码吗?我们在构造函数中新建了 DefaultMQProducer
类,并且设置了 NamesrvAddr
属性,它标志着 Namesrv
的地址,随后我们调用了 prodeucer.start()
来启动这个生产者,接下来我们就要探究 start()
方法究竟做了什么。
在书写本章时的代码版本为
RocketMQ-Client 4.3.0
, git节点在b4240d5cea8
,如果代码有所不同请见谅
DefaultMQProducer
类
DefaultMQProducer
类是 RocketMQ
生产者的默认实现,它继承了 ClientConfig
类 并间接实现了了 MQAdmin
类,类图见下图,接下来我们来讲一下这一个类一个接口。
ClientConfig
在这里我列举了 ClientConfig
的几个重要属性,和本章中比较重要的方法:buildMQClientId
,这里ClientID
主要是为了区分多台服务器的不同JVM实例,可以看到 ClientID
由 ClientIP
、InstanceName
、 unitName
组成,其中 ClientIP
分区了不同的服务器, Instancename
区分了不同JVM实例,unitName
区分了一个JVM实例中的不同子实例。如果我们不设置 InstanceName
和unitName
,就可以使得 在JVM实例中只存在一个ClientID
。
private String namesrvAddr
private String clientIP;
private String instanceName;
private boolean unitMode = false;
private String unitName;
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
MQAdmin
MQAdmin
接口定义了一些非常基本的方法,比如说createTopic
、searchOffset
之类的。
生产者启动流程
Step1
我们从 DefaultMQProducer.start()
方法开始追踪
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
public DefaultMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
首先我们重新设置了生产者的生产者组名称,这里是为了兼容 NameSpace
,其中 NameSpace
在 ClientConfig
中配置,如果我们未配置的话则为原样的ProducerGroup
,之后我们调用了 defaultMQProducerImpl
来启动生产者,那么这个defaultMQProducerImpl
是什么呢。我们可以从上述代码中的第10行看出,他是一个DefaultMQProducerImpl
类,也就是默认的生产者实现类,实际上生产者的消息发送都是通过这个类来实现的 ,注意:我们通过构造方法把我们自身传递给了DefaultMQProducerImpl
,这也是说在DefaultMQProducerImpl
可以访问生产者中的任何公开信息。
Step2
那么我们切换目标来跟踪defaultMQProducerImpl.start();
方法
private ServiceState serviceState = ServiceState.CREATE_JUST;
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 具体的生产者启动流程
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
这一大段代码是为了保证每一个生产者只启动一次。接下来我们就开始看看具体的流程吧
Step3
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
首先检查prodeucerGroup是否符合要求,如果 producerGroup
不是 CLIENT_INNER_PRODUCER_GROUP
并且没有手动设置 InstanceName
的话就会重新设置一下 InstanceName
为 当前JVM程序运行的 PID
。(这里其实可以思考一下为什么要这样做?)
Step4
# DefaultMQProducerImpl.java
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
# MQClientManager.java
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
这里可以跳转到 附录 查看MQClientManager
类的介绍。这段代码就是为了创建 MQClientInstance
对象,这里我们可以看出其实我们是基于ClientID
来获取 MQClientInstance
对象的。而根据上述代码我们可以轻松的知道:如果我们未手动配置 InstanceName
和 unitName
的话,一个JVM实例只会有一个 ClientID
(ClientID=ClientIP+PID
) ,并且通过代码我们知道这个实例是缓存在了 factoryTable
中,也就是说,在一个JVM实例中一般只会存在一个MQClientInstance
实例。
MQClientInstance
封住了RocketMQ网络处理API,是Producer、Consumer与NamServer、Broker打交道的网络对象
Step5
我们获取到了 MQClientInstance
之后,我们就要开始往 MQClientInstance
中注册我们的生产者。
mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
注册的过程也很简单,只是更新了一下 MQClientInstance
的 producerTable
生产者列表而已。这里我们可以得出一个简单的结论:一个JVM实例中只存在一个 ProducerGroup
的生产者,这是为了使得一个JVM程序崩溃之后不会影响一个生产者组中的多个生产者。(其实)
Step6
增加默认的Topic信息。并且启动 MQClientInstance
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
MQClientManager类
MQClientManager
是用来维护 MQClientInstance
的。一个JVM实例中只会存在一个 MQClientInstance
实例,同时也为 MQClientManager
的实现是在是太轻量化了,所以RocketMQ
的作者yukong
大佬也是直接使用了饿汉式的单例模式。
public class MQClientManager {
private static MQClientManager instance = new MQClientManager();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
private MQClientManager() {
}
public static MQClientManager getInstance() {
return instance;
}
}
同时内部维护了一套 ClientID
和 MQClientInstance
的对应关系,并且提供了方法去添加、移除,这里不做更多的介绍。
写在最后
讲道理写生产者启动流程还是蛮累的。本以为会比较轻松,因为相对于一些其他流程来说应该是相对比较简单的,没想到还是花了两天的时间陆陆续续的写完。
现在是6月27日的晚上9点45分,周日,没想到一周唯一的休息日就这样在睡觉中度过去了(是真的睡了一天),最后到了晚上吃完饭、吃完水果才想到博客没有写完,今天施队还说我吃饭吃的太少,水果吃的多,怕是会的糖尿病,整个人估计是废了吧。