由于消息存储篇幅过长,这里只指明大致的消息存储过程
大致的消息存储过程
存储文件
RocketMQ
的主要存储三个文件:
CommitLog
,在RocketMQ
的设计中,它存储了所有的消息ConsumerQueue
,每一个消息队列对应一个文件IndexFile
,加速了消息的检索速度
存储过程
- 检查消息是否可以写入,如该Broker是Slave、磁盘已满、队列没有写权限则不能写入
- 检查是否是延时消息,如果是延时消息则保存具体的Topic信息、MessageID到消息体中,并替换为延时消息的Topic和MessageID
- 获取当前可以写入的CommitLog文件
- 申请
putMessageLock
,即写入文件是串行写入的 - 设置消息的存储时间,创建全局唯一的 Message ID
- 获取该消息在消息队列中的偏移量,重新计算消息的总长度,并写入Commit Log文件中,如果无法存储所有的内容则创建新的CommitLog
- 把消息写入到CommitLog中,根据配置文件进行刷盘
- 更新逻辑偏移量,并释放
putMessageLock
- 根据
CommitLog
偏移量、消息体大小、tag的hash值写入ConsumerQueue中 - 插入数据到IndexFile中
- Consumer Queue间隔一定时间刷盘