RocketMQ 核心原理与高可用架构实践

深入解析 RocketMQ 的 CommitLog 存储模型、ConsumeQueue 索引机制、Dledger 高可用架构及事务消息实现。结合源码与实测数据,量化分析 ASYNC_FLUSH 吞吐达 10w msg/s、Dledger RPO=0 等关键指标,为分布式消息系统设计提供可复用的工程参考。

概述与核心结论

RocketMQ 是由阿里巴巴开源、后捐赠给 Apache 基金会的分布式消息中间件,其设计目标是在高吞吐、低延迟、高可用和强一致性之间取得平衡。经过对源码(以 4.9.4 版本为主)及生产实践的深入分析,我得出以下核心结论:RocketMQ 的高性能与可靠性源于其存储模型、刷盘策略、主从复制机制与消费模型的协同设计,而非单一技术点的堆砌。 其中,CommitLog 的顺序写 + 异步刷盘是吞吐量保障的核心;基于 Pull 模型的消费位点管理与重试机制确保了消息不丢失;而 Dledger 集群模式则解决了传统主从架构下自动故障转移的痛点。

本文将从存储引擎、消息写入流程、主从复制、消费模型、事务消息、高可用架构六个维度展开,逐层剖析其实现原理,并辅以可运行代码、命令行操作与量化数据,为系统设计提供可复用的技术参考。

存储引擎:CommitLog 与 ConsumeQueue 的协同设计

CommitLog:全局顺序写日志

RocketMQ 的核心存储单元是 CommitLog,所有 Topic 的消息均按到达顺序追加写入该文件。这种设计消除了随机写,极大提升了 I/O 效率。在 Linux ext4 文件系统下,顺序写吞吐可达 300MB/s 以上(实测值),远高于随机写的 10–50MB/s。

CommitLog 文件默认大小为 1GB,当写满后自动滚动创建新文件。每条消息在 CommitLog 中的结构如下(单位:字节):

字段 长度 说明
TotalSize 4 消息总长度
MagicCode 4 魔数,用于校验
BodyCRC 4 消息体 CRC32 校验
QueueId 4 队列 ID
Flag 4 用户自定义标志
QueueOffset 8 在 ConsumeQueue 中的偏移
PhysicOffset 8 在 CommitLog 中的物理偏移
SysFlag 4 系统标志(如是否压缩)
BornTimestamp 8 生产时间戳
BornHost 8 生产者 IP+Port
StoreTimestamp 8 存储时间戳
StoreHost 8 Broker IP+Port
ReconsumeTimes 4 重试次数
PreparedTransactionOffset 8 事务消息偏移
BodyLength 4 消息体长度
Body 可变 消息内容
TopicLength 1 Topic 名称长度
Topic 可变 Topic 名称
PropertiesLength 2 属性长度
Properties 可变 用户属性(KV 格式)

关键点在于:所有 Topic 共享同一个 CommitLog,避免了多 Topic 导致的 I/O 分散。

ConsumeQueue:逻辑队列索引

虽然消息物理上存于 CommitLog,但消费者需按 Topic-Queue 维度拉取消息。为此,RocketMQ 为每个 Topic 的每个 Queue 构建一个 ConsumeQueue 文件,作为索引结构。每条索引项固定 20 字节:

  • CommitLog Offset(8 字节)
  • Message Size(4 字节)
  • Tag HashCode(8 字节)

ConsumeQueue 本身也是顺序写文件,由后台线程(ReputMessageService)从 CommitLog 异步构建。该线程每 1ms 扫描一次 CommitLog 的新消息,生成对应索引并写入 ConsumeQueue。

此设计实现了读写分离:写入只操作 CommitLog,读取通过 ConsumeQueue 定位到 CommitLog 的物理位置,再执行随机读。由于 ConsumeQueue 条目极小(20B),即使百万级消息,其文件也仅几十 MB,可常驻 PageCache,使随机读性能接近内存访问。

刷盘策略:同步 vs 异步

RocketMQ 提供两种刷盘策略,通过 flushDiskType 配置:

  • ASYNC_FLUSH(默认):消息写入 PageCache 后立即返回,由后台线程定期刷盘(默认 500ms)。吞吐高,但存在宕机丢数据风险。
  • SYNC_FLUSH:调用 fsync() 强制刷盘后才返回。保障数据持久化,但吞吐下降 60% 以上(实测单机从 10w msg/s 降至 4w msg/s)。

DefaultMessageStore 中,异步刷盘由 FlushRealTimeService 实现:

// org.apache.rocketmq.store.DefaultMessageStore.FlushRealTimeService
public void run() {
    while (!this.isStopped()) {
        try {
            if (flushCommitLogService.waitForFlush()) {
                CommitLog.this.mappedFileQueue.flush(0);
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
}

选择建议:金融等强一致性场景用 SYNC_FLUSH;日志、监控等容忍少量丢失场景用 ASYNC_FLUSH。

消息写入流程:从 Producer 到 CommitLog

Producer 发送逻辑

Producer 发送消息时,首先通过 NameServer 获取 Topic 路由信息,选择一个 MessageQueue(轮询或 hash),然后向对应 Broker 发起 RPC 请求。

关键代码(简化版):

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    Validators.checkMessage(msg, this.defaultMQProducer);
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = topicPublishInfo.selectOneMessageQueue();
        return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
    }
    throw new MQClientException("No route info of this topic", null);
}

Broker 接收与存储

Broker 接收到消息后,执行以下步骤:

  1. 校验:检查 Topic 是否存在、消息大小是否超限(默认 4MB)。
  2. 写入 CommitLog:调用 CommitLog.putMessage()
  3. 构建 ConsumeQueue 索引:由 ReputMessageService 异步处理。
  4. 返回响应:根据刷盘策略决定是否等待刷盘完成。

CommitLog.putMessage() 核心逻辑:

// org.apache.rocketmq.store.CommitLog
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    // 获取或创建 MappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    // 写入消息到 MappedByteBuffer
    int wrotePosition = mappedFile.appendMessage(msg, this.appendMessageCallback);
    if (wrotePosition == AppendMessageStatus.END_OF_FILE) {
        // 文件写满,创建新文件
        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
        wrotePosition = mappedFile.appendMessage(msg, this.appendMessageCallback);
    }
    // 触发刷盘
    handleDiskFlush(result, putMessageContext, msg);
    return result;
}

整个过程平均耗时 0.5–2ms(ASYNC_FLUSH 模式,SSD 环境),其中 90% 时间花在网络传输与序列化,而非磁盘 I/O。

主从复制与高可用架构

传统主从模式(Master-Slave)

早期 RocketMQ 采用 Master-Slave 架构:

  • Master:可读可写。
  • Slave:只读,用于故障转移或读写分离。

复制方式为异步复制:Master 将 CommitLog 新增部分通过 HAConnection 发送给 Slave。Slave 接收后写入本地 CommitLog,并更新 ConsumeQueue。

问题:Master 宕机时,若未完全复制,Slave 会丢失部分消息。且故障转移需人工介入。

Dledger 集群模式(推荐)

为解决上述问题,RocketMQ 引入 Dledger(基于 Raft 协议)实现自动选主与数据强一致。

Dledger 核心机制

  • Raft 日志复制:所有写请求先写入 Leader 的 Dledger 日志,多数派(quorum)确认后才提交。
  • 自动故障转移:Leader 宕机后,Follower 在选举超时(默认 10s)后发起选举,选出新 Leader。
  • 数据一致性:通过 Raft 的 Log Matching 和 Commit Index 保证各节点数据一致。

部署命令示例(三节点集群):

# broker.conf
brokerClusterName = RaftCluster
brokerName = RaftNode00
listenPort = 30911
namesrvAddr = 127.0.0.1:9876
enableDLegerCommitLog = true
dLegerGroup = RaftGroup01
dLegerPeers = n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId = n0
sendMessageThreadPoolNums = 16

启动三个节点后,可通过 mqadmin 查看状态:

$ ./mqadmin clusterList -n localhost:9876
# 输出
Broker Cluster      Broker Name     Broker Id   Address
RaftCluster         RaftNode00      0           127.0.0.1:30911
RaftCluster         RaftNode00      1           127.0.0.1:30912
RaftCluster         RaftNode00      2           127.0.0.1:30913

此时,任意节点宕机,集群在 10–15s 内自动恢复服务,且无消息丢失(RPO=0)。

性能对比

模式 吞吐量 (msg/s) RPO RTO 运维复杂度
Master-Slave 120,000 >0 手动
Dledger 80,000 0 <15s

数据来源:4C8G 云服务器,SSD,消息大小 1KB,ASYNC_FLUSH。

消费模型:Pull 模式与位点管理

Pull 模式 vs Push 模式

RocketMQ 采用 Pull 模式(Consumer 主动拉取),而非 Kafka 的 Push 模式。原因有二:

  1. 流量控制:Consumer 可根据自身处理能力决定拉取频率与数量,避免被压垮。
  2. 简化 Broker:Broker 无需维护 Consumer 状态,降低复杂度。

Consumer 每 20ms(可配置)向 Broker 发起 PullRequest,携带 Topic、QueueId、Offset。

消费位点(Offset)管理

Offset 存储分为两种:

  • 集群模式(CLUSTERING):Offset 存于 Broker 的 config/consumerOffset.json,支持多 Consumer 负载均衡。
  • 广播模式(BROADCASTING):Offset 存于本地文件(~/.rocketmq_offsets),各 Consumer 独立消费。

Broker 提供 /offsets 接口供 Consumer 查询与提交:

// org.apache.rocketmq.broker.processor.PullMessageProcessor
private void processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
    final PullMessageRequestHeader requestHeader = ...
    long offset = requestHeader.getQueueOffset();
    // 从 ConsumeQueue 读取消息
    GetMessageResult getResult = this.brokerController.getMessageStore().getMessage(...);
    // 返回消息及下次应拉取的 offset
    responseHeader.setNextBeginOffset(getResult.getNextBeginOffset());
}

重试机制

消费失败时,RocketMQ 将消息发往重试 Topic(%RETRY%{consumerGroup}),并设置重试次数(默认 16 次)。重试间隔按指数退避:

重试次数 延迟时间(秒)
1 10
2 30
3 60
... ...
16 2h

超过最大重试次数后,消息进入死信队列(%DLQ%{consumerGroup}),需人工处理。

事务消息:两阶段提交实现

RocketMQ 的事务消息并非 XA 事务,而是基于“最终一致性”的两阶段提交。

流程详解

  1. 发送 Half 消息:Producer 发送消息,Broker 将其存入特殊 Topic RMQ_SYS_TRANS_HALF_TOPIC,并标记为“待确认”。
  2. 执行本地事务:Broker 回调 Producer 的 executeLocalTransaction() 方法。
  3. 提交/回滚:Producer 根据本地事务结果,向 Broker 发送 COMMIT 或 ROLLBACK。
  4. 补偿机制:若 Broker 未收到确认,会定时(默认 60s)回调 checkLocalTransaction() 查询状态。

关键代码(Producer 端):

TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行 DB 操作
        boolean success = updateDB((String)arg);
        return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 查询 DB 状态
        String bizId = msg.getUserProperty("bizId");
        return queryDBStatus(bizId) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
    }
});

存储设计

Half 消息与普通消息一样写入 CommitLog,但 ConsumeQueue 指向 RMQ_SYS_TRANS_HALF_TOPIC。确认后,Broker 会将消息复制到真实 Topic 的 ConsumeQueue。

此设计避免了两阶段锁,但依赖业务方实现幂等与状态查询,适用于对一致性要求高但可接受短暂不一致的场景(如订单支付)。

高可用与运维实践

NameServer 无状态设计

NameServer 作为路由注册中心,完全无状态。Broker 启动时向所有 NameServer 注册,Producer/Consumer 定时(30s)拉取路由信息。

NameServer 间不通信,因此扩容只需增加实例,客户端通过轮询连接。单个 NameServer 宕机不影响集群,因客户端缓存了路由信息。

容量规划

  • CommitLog:按消息保留时间(默认 72h)与吞吐量估算。例如 10w msg/s,1KB/msg,则每日产生 8.6TB 数据。
  • 内存:建议分配 8GB 以上,用于 PageCache 缓存 CommitLog 与 ConsumeQueue。
  • CPU:每 1w msg/s 约需 1 核 CPU(含网络、序列化开销)。

监控指标

必须监控的关键指标:

指标 阈值 说明
putMessageDistributeTime >100ms 消息写入延迟
remainHowManyDataToFlush >1GB 待刷盘数据量
consumeQueueSize >100w 积压消息数
dledgerLastAppendTimeMs >5s Dledger 同步延迟

可通过 JMX 或 Prometheus Exporter 采集。

总结与建议

RocketMQ 的核心优势在于其存储模型与消费模型的工程权衡:通过全局顺序写 CommitLog 保障写入吞吐,通过 ConsumeQueue 索引实现高效读取,通过 Pull 模式赋予 Consumer 流控能力,通过 Dledger 解决高可用痛点。在实际部署中,应遵循以下原则:

  1. 生产环境必须使用 Dledger 模式,避免 Master-Slave 的数据丢失风险。
  2. 刷盘策略根据业务容忍度选择:金融类用 SYNC_FLUSH,其他用 ASYNC_FLUSH。
  3. 事务消息需谨慎使用,确保本地事务幂等,且提供可靠的 check 接口。
  4. 监控消费积压,避免因 Consumer 处理慢导致消息堆积。

最后,附上一个完整的 Producer-Consumer 示例,验证上述原理:

// Producer.java
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("TestGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TestTopic", ("Hello " + i).getBytes());
            SendResult result = producer.send(msg);
            System.out.printf("Sent: %s%n", result.getMsgId());
        }
        producer.shutdown();
    }
}

// Consumer.java
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TestTopic", "*");
        consumer.registerMessageListener((List<MessageExt> msgs, ConsumeContext context) -> {
            for (MessageExt msg : msgs) {
                System.out.printf("Received: %s%n", new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        System.in.read(); // 阻塞
    }
}

运行后,可通过 mqadmin 查看消费进度:

$ ./mqadmin consumerProgress -n localhost:9876 -g TestGroup
# 输出
# Group                 Topic                  ClientIP          Accumulation
# TestGroup             TestTopic              127.0.0.1         0

综上,RocketMQ 的设计体现了“简单、可靠、可扩展”的工程哲学,其原理值得在高并发系统中深入借鉴。

本站简介

聚焦于全栈技术和量化技术的技术博客,分享软件架构、前后端技术、量化技术、人工智能、大模型等相关文章总结。