RocketMQ 月丢 387 笔订单事故复盘:零丢失零重复消费全链路修法

月度对账发现订单库少 387 笔但支付库正常,根因 RocketMQ 异步刷盘 + oneway + 无事务消息 + 消费无幂等多重叠加。两周治理:SYNC_MASTER + 同步发送 + 事务消息 half/commit + Redis setnx + 唯一索引 + DLQ 监控 + MessageTrace 全链路可查。月丢失 0,重复消费 0。

2023 年我们一个订单核对系统:每天对账 1000w 订单,某次月度对账时发现订单库少 387 笔,但支付库都正常成交。彻查后确认是消息中间件丢失:RocketMQ 主从异步刷盘 + 生产者没用事务消息 + 消费者无 idempotent + 重试上限太低,多重故障叠加把消息搞丢。投了两周做全链路消息可靠性治理,达成"消息零丢失、零重复消费"的工程目标。本文复盘 RocketMQ 消息可靠性的完整修法,覆盖生产端、Broker、消费端、监控。

事故现场

业务:支付完成 → 发 MQ → 订单服务消费 → 写订单库
TPS:平均 8000,峰值 30000
MQ:RocketMQ 4.9.x,3 Master + 3 Slave
配置(出事时):
- 异步刷盘 ASYNC_FLUSH
- 主从异步 ASYNC_MASTER
- 生产者 SendOneway(性能优先)
- 消费 RECONSUME_LATER 16 次后 → DLQ
- 没有事务消息

时间线:
事故发生时:
1. 某 Broker Master 磁盘 IO 异常,async flush 滞后 2 秒
2. 同时 Master 宕机(机柜断电)
3. Slave 没 ack 完异步同步,部分消息丢失
4. 生产者 oneway 没收到 ack,以为成功
5. 订单服务自然没收到消息,订单缺失

排查:
- 支付库:387 笔订单状态 PAID
- 订单库:对应 387 笔订单不存在
- MQ 监控:那一时段消息丢失高峰
- 业务损失:用户付款没收到货,客服赔付 12w

修复 1:Broker 同步双写 + 同步刷盘

# broker.conf(关键业务)
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
brokerRole=SYNC_MASTER           # 主从同步(写完 master + 至少 1 slave 才 ack)
flushDiskType=SYNC_FLUSH         # 同步刷盘(写磁盘 fsync 后才 ack)

# 性能 vs 可靠性权衡:
# ASYNC_FLUSH + ASYNC_MASTER:吞吐 80000 TPS,有数据丢失风险
# SYNC_FLUSH + SYNC_MASTER:吞吐 12000 TPS,极端情况下 0 丢失
# 我们选 SYNC_MASTER + ASYNC_FLUSH 折中(吞吐 50000,99.99% 可靠)

# 业务分级:
# 核心业务(支付、订单)→ SYNC_MASTER + SYNC_FLUSH
# 一般业务(日志、统计)→ ASYNC

# 集群部署
3 个 Master + 每个 Master 2 个 Slave
跨机柜跨可用区,任一 AZ 挂了不丢消息

# DLedger 模式(RocketMQ 4.5+,自动主从切换)
brokerRole=ASYNC_MASTER
enableDLegerCommitLog=true
storePathRootDir=/data/rocketmq/store
dLegerGroup=broker-1
dLegerPeers=n0-host1:40911;n1-host2:40911;n2-host3:40911
dLegerSelfId=n0

修复 2:生产端用同步发送 + 重试

// 不好:oneway 无 ack,消息丢了不知道
producer.sendOneway(msg);

// 好:同步发送 + 重试 + 检查 sendStatus
public void sendReliably(Message msg) throws Exception {
    int maxRetries = 3;
    Exception lastEx = null;

    for (int i = 0; i < maxRetries; i++) {
        try {
            SendResult result = producer.send(msg, 3000);    // 3s 超时
            switch (result.getSendStatus()) {
                case SEND_OK:
                    log.info("send ok msgId={}", result.getMsgId());
                    return;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                    // 这些情况消息已写 master,但未确认 slave/disk
                    // 可能未持久,要重试(下次幂等去重)
                    log.warn("send partial ok: {}", result.getSendStatus());
                    Thread.sleep(100 * (i + 1));
                    continue;
                default:
                    throw new RuntimeException("unknown status: " + result.getSendStatus());
            }
        } catch (Exception e) {
            lastEx = e;
            Thread.sleep(100 * (i + 1));
        }
    }

    // 3 次都失败,落地到"待重发表",后台异步重发
    fallbackTable.save(msg, lastEx);
    throw lastEx;
}

// Producer 配置
DefaultMQProducer producer = new DefaultMQProducer("order-producer");
producer.setNamesrvAddr("ns1:9876;ns2:9876;ns3:9876");
producer.setSendMsgTimeout(3000);
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);   // 一个 broker 失败重选
producer.setCompressMsgBodyOverHowmuch(4096);          // 4KB 压缩
producer.setMaxMessageSize(4 * 1024 * 1024);
producer.start();

修复 3:事务消息(强一致场景)

// 场景:DB 写订单 + 发 MQ 通知,要么都成功要么都失败
// 普通做法:先写 DB 再发 MQ,中间宕机就丢
// 事务消息:half-message + 提交/回滚

public class OrderTransactionListener implements TransactionListener {

    // 1. 执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            CreateOrderRequest req = JSON.parseObject(new String(msg.getBody()), CreateOrderRequest.class);

            // 本地事务:写订单表 + 写"事务状态表"
            orderMapper.insert(req.toEntity());
            txStateMapper.insert(new TxState(msg.getTransactionId(), "committed"));

            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            log.error("local tx failed", e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    // 2. Broker 回查(如果 producer 宕机没提交)
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        TxState state = txStateMapper.findByTxId(msg.getTransactionId());
        if (state == null) {
            return LocalTransactionState.ROLLBACK_MESSAGE;     // 本地事务没执行
        }
        if ("committed".equals(state.getStatus())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;                   // 继续回查
    }
}

// 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("order-tx-producer");
producer.setTransactionListener(new OrderTransactionListener());
producer.start();

Message msg = new Message("ORDER_TOPIC", "CREATE", JSON.toJSONBytes(req));
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);

// 流程:
// 1. 发 half message → Broker(对消费者不可见)
// 2. Broker 返回成功
// 3. Producer 执行本地事务(写订单 + 写 tx 状态表)
// 4. Producer commit / rollback(改 half message 状态)
// 5. 若 Producer 宕机,Broker 回查 producer(checkLocalTransaction)
// 6. commit 后消息对消费者可见

修复 4:消费端幂等

// 消息可能重复(网络抖动、Broker 重投、消费失败重试)
// 消费端必须保证幂等

@Service
public class OrderConsumer {

    @Autowired private RedisTemplate<String, String> redis;
    @Autowired private OrderMapper orderMapper;

    @RocketMQMessageListener(
        topic = "ORDER_TOPIC",
        consumerGroup = "order-consumer-group",
        consumeMode = ConsumeMode.CONCURRENTLY,
        maxReconsumeTimes = 16              // 16 次后进 DLQ
    )
    public void consume(MessageExt msg) {
        String msgId = msg.getMsgId();
        String key = "msg:consumed:" + msgId;

        // 1. 用 Redis SETNX 防重复(简单粗暴)
        Boolean firstTime = redis.opsForValue().setIfAbsent(key, "1", Duration.ofDays(7));
        if (Boolean.FALSE.equals(firstTime)) {
            log.info("duplicate msg skipped: {}", msgId);
            return;
        }

        try {
            CreateOrderRequest req = JSON.parseObject(new String(msg.getBody()), CreateOrderRequest.class);

            // 2. 业务层也要幂等(用 orderNo 唯一索引)
            try {
                orderMapper.insert(req.toEntity());
            } catch (DuplicateKeyException e) {
                log.info("order already exists: {}", req.getOrderNo());
                return;
            }

            // 3. 业务幂等更通用做法:状态机
            // - 检查当前订单状态
            // - 只在允许的状态下执行
        } catch (Exception e) {
            // 失败抛异常,RocketMQ 自动重投
            redis.delete(key);              // 清除标记,允许重试
            throw e;
        }
    }
}

// 唯一索引(数据库层兜底)
CREATE UNIQUE INDEX uk_order_no ON t_order (order_no);

// 状态机
public boolean updateOrderStatus(Long orderId, OrderStatus from, OrderStatus to) {
    int affected = orderMapper.updateStatus(orderId, from, to);
    return affected > 0;        // 用 UPDATE WHERE 当前状态来保证只迁移一次
}

修复 5:死信队列(DLQ)处理

// 默认重试 16 次后进 DLQ,但没人监控就丢了
// 解决:订阅 DLQ topic,告警 + 人工处理

@RocketMQMessageListener(
    topic = "%DLQ%order-consumer-group",   // DLQ topic 命名规则
    consumerGroup = "dlq-monitor-group"
)
@Service
public class DlqMonitor {

    public void consume(MessageExt msg) {
        // 1. 告警:DLQ 来了消息就是异常
        alertService.send("DLQ message: " + msg.getMsgId(), msg);

        // 2. 落库:供人工排查 + 重发
        DlqRecord record = new DlqRecord();
        record.setMsgId(msg.getMsgId());
        record.setBody(new String(msg.getBody()));
        record.setOriginalTopic(msg.getProperties().get("ORIGIN_MESSAGE_TOPIC"));
        record.setFailReason(msg.getProperties().get("RECONSUME_TIME"));
        record.setCreatedAt(LocalDateTime.now());
        dlqRecordMapper.insert(record);

        // 3. 不能再 throw,DLQ 消费失败也要 ack
    }
}

// 后台管理页面:看 DLQ 记录 → 修业务问题 → 一键重发到原 topic
@PostMapping("/admin/dlq/replay/{id}")
public String replay(@PathVariable Long id) {
    DlqRecord r = dlqRecordMapper.findById(id);
    Message msg = new Message(r.getOriginalTopic(), r.getBody().getBytes());
    SendResult result = producer.send(msg);
    return result.getSendStatus().name();
}

修复 6:消息轨迹(MessageTrace)

# Broker 开启 trace
$ vim broker.conf
traceTopicEnable=true

# Producer 开启 trace
DefaultMQProducer producer = new DefaultMQProducer("g", true);   // true=trace

# Consumer 开启 trace
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g", true);

# 查询消息轨迹(RocketMQ Console)
$ open http://console-host:8080/#/messageTrace

# 看到:
# 1. Producer 发送时间、broker、状态
# 2. Broker 存储时间、queue offset
# 3. Consumer Group 拉取时间
# 4. 各 Consumer 实例消费时间、是否成功、重试次数

# 排查丢失:看 producer 发送日志 + broker 接收日志 + consumer 拉取日志
# 任何一环断了,就找到根因

监控告警

# Prometheus + rocketmq-exporter
- alert: BrokerDiskHighUsage
  expr: rocketmq_brokeruntime_disk_ratio > 0.85
  annotations:
    summary: "Broker 磁盘 > 85%,可能写不进"

- alert: MasterSlaveLagHigh
  expr: rocketmq_brokeruntime_master_slave_diff > 1024*1024*100   # 100MB
  for: 1m
  annotations:
    summary: "主从延迟 > 100MB,可能丢消息"

- alert: ProducerSendFailed
  expr: increase(rocketmq_producer_send_failed_total[5m]) > 10
  annotations:
    summary: "5min 内 producer send 失败 > 10 次"

- alert: ConsumerLag
  expr: sum by(group, topic) (rocketmq_consumer_lag) > 10000
  for: 2m
  annotations:
    summary: "消费 lag > 1w 条"

- alert: DlqMessages
  expr: increase(rocketmq_message_dlq_total[1h]) > 0
  annotations:
    severity: critical
    summary: "1h 内有 DLQ 消息"

- alert: TxMessageCheckTooMany
  expr: rate(rocketmq_tx_message_check_total[5m]) > 100
  annotations:
    summary: "事务消息回查频繁,producer 可能有问题"

优化效果

指标                  优化前         优化后
=====================================================
消息丢失数(月)      387            0
消息重复消费(月)    数千次        0(幂等保证)
Producer 发送失败    不感知         捕获 + 重试 + 兜底
Broker 主从滞后      10MB-100MB     < 1MB
DLQ 消息处理         无监控         秒级告警 + 自助重发
事务消息回查         未启用         99.9% 一次成功
消息轨迹             无             全链路可查

TPS                  10w(无可靠性)  5w(有可靠性)
                    → 业务可接受的权衡

业务影响:
- 订单不再丢,客服赔付 0
- 对账系统数据 100% 一致
- 财务侧不再每月手工对账
- 故障期间可快速定位(消息轨迹)

避坑清单

  1. 核心业务 SYNC_MASTER + SYNC_FLUSH,日志类业务 ASYNC 提速
  2. Producer 绝不用 sendOneway,必须 send + 检查 SendStatus
  3. SendStatus 不是 SEND_OK 就要重试(FLUSH_DISK_TIMEOUT 等也要)
  4. 分布式事务用 RocketMQ 事务消息(half + commit/rollback + 回查)
  5. 消费端必须幂等:Redis setnx 加 DB 唯一索引双保险
  6. maxReconsumeTimes 设 16,DLQ 必监控
  7. 状态机更新用 UPDATE WHERE current=expected,天然幂等
  8. 开 MessageTrace,生产排查神器
  9. 主从延迟、磁盘使用、DLQ 都要告警
  10. 大消息 (> 4MB) 走 OSS,MQ 只传 ref

总结

消息中间件的"零丢失"是个工程链路,不是配置一个开关就完事。这次治理覆盖了 Broker、Producer、Consumer 三端 + 监控,缺一不可。最大的认知改变:RocketMQ 默认配置(ASYNC + oneway)是为了演示性能数据,生产业务必须按业务等级调整。最被低估的是事务消息,很多团队怕复杂不用,实际上比"先写 DB 再发 MQ + 补偿"简单太多,RocketMQ 帮你做了 half + 回查 + commit/rollback 整套机制。最容易踩的坑是消费幂等没做好,以为 Redis setnx 万事大吉,但 setnx + 消费失败 + 不清理 → 永久跳过这条消息;正确做法是 setnx 失败立即清除,让重试有机会成功。最后,DLQ 监控很多团队完全没做,消息悄悄进了 DLQ 永远丢失,客户投诉了才发现 — 必须订阅 DLQ topic 告警,这是底线。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

Webpack 构建 9min 优化到 1.5min:SWC + 缓存 + 分包全实战

2026-5-19 13:13:37

技术教程

监控集群 8TB 失控治理:VictoriaMetrics + 降采样 + 告警去噪实录

2026-5-19 13:19:43

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索