订单消息一会丢一会重:RocketMQ 消息可靠性与消费幂等实战

订单系统用 RocketMQ 异步解耦,对账发现每天 230 笔积分消息丢失、80 笔重复。两周治理:生产端事务消息 + 本地消息表 + 同步发送检查;broker 端 SYNC_FLUSH + SYNC_MASTER;消费端唯一索引 + 状态机幂等 + 死信补偿 + 顺序消息。连续 3 个月对账零差异。

2024 年我们的订单系统用 RocketMQ 做异步解耦:下单后发消息,库存、积分、通知三个服务各自消费。某次对账发现 — 当天有 230 笔订单的积分没加上(消息丢了),又有 80 笔订单积分加了两次(消息重了)。投了两周做消息可靠性治理,从生产、broker、消费三端补齐,之后连续三个月对账零差异。本文复盘 RocketMQ 消息不丢、不重、不乱序的完整实战,覆盖可靠投递、持久化、消费幂等、堆积治理、顺序消息。

问题背景

消息中间件:RocketMQ 4.9,2 主 2 从
业务:订单下单 → 发消息 → 库存/积分/通知 三个消费组
日均消息:1200w

对账发现的问题:
- 消息丢失:230 笔/天,积分没加
- 消息重复:80 笔/天,积分加两次
- 偶发乱序:同一订单的"创建""支付"消息消费颠倒

逐端排查:
# 生产端
1. 用了 oneway 发送(发完不管结果),broker 没收到也不知道
   producer.sendOneway(msg);          ← 性能好但完全不可靠
2. 部分代码用 async 发送,回调里只打日志没重试

# broker 端
3. 刷盘策略是 ASYNC_FLUSH(异步刷盘)
   → broker 进程崩溃,page cache 里没落盘的消息全丢
4. 主从复制是 ASYNC(异步)
   → 主挂了,从还没同步的消息丢

# 消费端
5. 消费逻辑没幂等,RocketMQ 本身保证"至少一次",重复必然发生
6. 消费代码先 ACK 再处理,处理失败消息却已确认 → 丢
7. 同一订单消息发到不同 queue → 并行消费乱序

结论:消息可靠性是生产 + broker + 消费三端协同,缺一环就漏

修复 1:生产端可靠投递

// === 错误:oneway,发完不管 ===
// producer.sendOneway(msg);

// === 正确:同步发送 + 检查结果 + 重试 ===
public void sendReliable(Message msg) {
    DefaultMQProducer producer = getProducer();
    producer.setRetryTimesWhenSendFailed(3);          // 同步发送失败重试 3 次
    producer.setRetryTimesWhenSendAsyncFailed(3);
    producer.setSendMsgTimeout(3000);

    try {
        SendResult result = producer.send(msg);       // 同步发送,阻塞等结果
        // 必须检查发送状态!
        if (result.getSendStatus() != SendStatus.SEND_OK) {
            // SLAVE_NOT_AVAILABLE / FLUSH_DISK_TIMEOUT 等 → 视为不可靠
            log.warn("send not ok: {}, fallback", result.getSendStatus());
            saveToLocalTable(msg);                     // 落本地消息表,定时补发
        }
    } catch (Exception e) {
        // 重试耗尽仍失败 → 落本地表,绝不丢
        log.error("send failed after retry", e);
        saveToLocalTable(msg);
    }
}

// === 业务消息防丢的终极方案:本地消息表(可靠消息最终一致)===
@Transactional
public void createOrder(Order order) {
    orderDao.insert(order);                            // 1. 写业务数据
    // 2. 同一个本地事务里,写一条"待发送消息"
    LocalMessage lm = new LocalMessage(order.getId(),
        buildOrderMsgBody(order), MsgStatus.PENDING);
    localMessageDao.insert(lm);
    // 事务提交 → 业务和"待发消息"原子性一起成功
}

// 3. 独立定时任务扫描 PENDING 消息,发 MQ,发成功改 SENT
@Scheduled(fixedDelay = 1000)
public void dispatchPendingMessages() {
    for (LocalMessage lm : localMessageDao.findPending(200)) {
        try {
            producer.send(toMessage(lm));
            localMessageDao.updateStatus(lm.getId(), MsgStatus.SENT);
        } catch (Exception e) {
            localMessageDao.incrRetry(lm.getId());     // 失败下次再扫
        }
    }
}
// 这样:只要业务事务提交了,消息一定会发出去(最终一定)

修复 2:RocketMQ 事务消息

// 本地消息表要自己维护表 + 定时任务,RocketMQ 事务消息是托管版方案
// 解决"发消息"与"本地事务"的原子性

public class OrderTransactionProducer {

    private TransactionMQProducer producer;

    public void init() throws MQClientException {
        producer = new TransactionMQProducer("order_tx_group");
        producer.setNamesrvAddr("ns1:9876;ns2:9876");
        // 注册事务监听器
        producer.setTransactionListener(new TransactionListener() {
            // 1. 发送 half 消息成功后,执行本地事务
            @Override
            public LocalTransactionState executeLocalTransaction(
                    Message msg, Object arg) {
                try {
                    Order order = (Order) arg;
                    orderService.doCreateOrder(order);     // 本地事务
                    return LocalTransactionState.COMMIT_MESSAGE;   // 提交 → 消息可投递
                } catch (Exception e) {
                    return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚 → 消息丢弃
                }
            }
            // 2. broker 回查:本地事务状态不明时,broker 来问
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                String orderId = msg.getKeys();
                // 查订单是否真的创建成功了
                return orderService.exists(orderId)
                    ? LocalTransactionState.COMMIT_MESSAGE
                    : LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });
        producer.start();
    }

    public void createOrderWithTxMsg(Order order) throws MQClientException {
        Message msg = new Message("ORDER_TOPIC", "CREATE",
            order.getId(), serialize(order).getBytes());
        // 发送事务消息:先发 half(消费者看不到)→ 执行本地事务 → 提交/回滚
        producer.sendMessageInTransaction(msg, order);
    }
}
// 流程:half 消息 → 本地事务 → COMMIT 才真正投递 → 状态不明时 broker 回查
// 保证"订单创建成功"和"消息发出"要么都成,要么都不成

修复 3:broker 端持久化

# broker.conf —— 不丢消息的 broker 配置

# === 1. 刷盘策略 ===
# ASYNC_FLUSH:异步刷盘,性能高,但 broker 崩溃丢 page cache 里的消息
# SYNC_FLUSH: 同步刷盘,消息落盘才返回成功,慢但不丢
flushDiskType = SYNC_FLUSH
# 金融级订单消息用 SYNC_FLUSH;允许极小概率丢的日志类可 ASYNC

# === 2. 主从复制方式 ===
# ASYNC_MASTER:异步复制,主返回成功时从可能还没同步,主挂丢消息
# SYNC_MASTER: 同步复制,主从都写成功才返回,可靠
brokerRole = SYNC_MASTER
# 配合 SYNC_FLUSH,做到主从都落盘才算成功

# === 3. 消息保留与磁盘保护 ===
fileReservedTime = 72                  # 消息文件保留 72 小时
diskMaxUsedSpaceRatio = 88             # 磁盘用到 88% 触发清理
# 磁盘满会导致 broker 拒绝写入,务必监控

# === 4. 消息大小与堆积 ===
maxMessageSize = 4194304               # 单消息上限 4MB
# === 5. 强烈建议:用 Dledger 模式(基于 Raft 的多副本)===
# RocketMQ 4.5+ 支持 Dledger,自动选主,比传统主从更可靠
# enableDLegerCommitLog = true
# dLegerGroup = order-broker-group
# dLegerPeers = n0-broker1:40911;n1-broker2:40911;n2-broker3:40911

# 可靠性等级权衡:
# 最高:SYNC_FLUSH + SYNC_MASTER(或 Dledger 3 副本)→ 订单/支付
# 平衡:ASYNC_FLUSH + SYNC_MASTER → 一般业务
# 最快:ASYNC_FLUSH + ASYNC_MASTER → 日志/埋点(可容忍丢)

修复 4:消费端幂等

// RocketMQ 只保证"至少一次"投递,重复消费是必然,消费端必须幂等
// 重复来源:生产重发、broker 重投、消费超时未 ACK 被重发、rebalance

// === 方案 1:数据库唯一索引(最可靠,强一致)===
// msg_consume_record 表对 (msg_key, consumer_group) 建唯一索引
public ConsumeResult consume(MessageExt msg) {
    String msgKey = msg.getKeys();                     // 业务唯一键,如订单号
    try {
        // 插入消费记录,唯一索引冲突 = 已消费过
        consumeRecordDao.insert(new ConsumeRecord(msgKey, GROUP));
    } catch (DuplicateKeyException e) {
        log.info("msg {} already consumed, skip", msgKey);
        return ConsumeResult.SUCCESS;                  // 重复 → 直接成功,不再处理
    }
    // 首次消费,执行业务
    addPoints(msgKey);
    return ConsumeResult.SUCCESS;
}

// === 方案 2:Redis SETNX 去重(高性能,适合海量消息)===
public ConsumeResult consumeWithRedis(MessageExt msg) {
    String key = "mq:dedup:" + GROUP + ":" + msg.getKeys();
    // setIfAbsent:不存在才设,返回 true 表示首次
    Boolean first = redis.opsForValue()
        .setIfAbsent(key, "1", Duration.ofDays(3));     // 去重窗口 3 天
    if (Boolean.FALSE.equals(first)) {
        return ConsumeResult.SUCCESS;                  // 重复
    }
    try {
        addPoints(msg.getKeys());
        return ConsumeResult.SUCCESS;
    } catch (Exception e) {
        redis.delete(key);                             // 业务失败,删标记允许重试
        return ConsumeResult.RECONSUME_LATER;
    }
}

// === 方案 3:业务状态机天然幂等(最优雅)===
// 例:订单状态 待支付→已支付,UPDATE ... WHERE status='待支付'
// 重复消费时 status 已是"已支付",UPDATE 影响行数 0,天然幂等
public void onOrderPaid(String orderId) {
    int rows = orderDao.updateStatusIfMatch(orderId, "PAID", "UNPAID");
    if (rows == 0) {
        log.info("order {} not in UNPAID state, skip", orderId);
    }
}

// === 消费监听:务必处理完再返回 SUCCESS,失败返回 RECONSUME_LATER ===
consumer.registerMessageListener((MessageListenerConcurrently)
    (msgs, ctx) -> {
        for (MessageExt msg : msgs) {
            try {
                if (consume(msg) != ConsumeResult.SUCCESS) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            } catch (Exception e) {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;  // 失败重投
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });

修复 5:消息堆积与死信

// === 消息堆积:消费速度跟不上生产 ===
// 排查:
// sh mqadmin consumerProgress -g points_consumer_group
// 看 Diff(未消费数),持续增大就是堆积

// 提升消费能力:
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);                      // 加大消费线程
consumer.setPullBatchSize(32);                         // 一次拉多条
consumer.setConsumeMessageBatchMaxSize(1);             // 但逐条处理,失败隔离
// 注意:topic 的 queue 数决定消费并行度上限
// 消费者实例数 ≤ queue 数,加机器超过 queue 数也没用 → 提前规划 queue 数

// === 死信队列(DLQ):重试 16 次仍失败的消息进死信 ===
// RocketMQ 自动建 %DLQ%consumer_group,要专门消费它
consumer.subscribe("%DLQ%points_consumer_group", "*");
// 死信消息:人工介入排查 + 补偿,绝不能无视
// 常见原因:消息体格式错、依赖服务长期不可用、业务 bug

// === 重试次数与退避 ===
// RocketMQ 默认重试 16 次,间隔递增(10s,30s,1m...2h)
consumer.setMaxReconsumeTimes(5);                      // 改成 5 次,快速进死信
// 太多次重试:无效消息长期占用消费资源
// 太少次重试:偶发抖动没机会自愈

// === 消费失败的正确姿势 ===
// 1. 可重试错误(下游抖动)→ RECONSUME_LATER
// 2. 不可重试错误(消息格式错)→ 记录日志 + 返回 SUCCESS 跳过,别一直重试
public ConsumeConcurrentlyStatus handle(MessageExt msg) {
    try {
        Order order = parse(msg);                      // 解析失败 = 不可重试
        process(order);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (ParseException e) {
        log.error("bad message, skip: {}", msg.getMsgId(), e);
        alertService.notify("坏消息", msg.getMsgId());
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;   // 跳过,不重试
    } catch (Exception e) {
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;   // 重试
    }
}

修复 6:顺序消息

// 问题:同一订单的"创建""支付""完成"消息被分到不同 queue,并行消费乱序
// RocketMQ 顺序消息:同一业务键的消息进同一 queue,且单线程顺序消费

// === 生产端:用 MessageQueueSelector 把同订单消息发到同一 queue ===
producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        String orderId = (String) arg;
        // 按 orderId 取模,同一订单永远落同一个 queue → 保证入队有序
        int idx = Math.abs(orderId.hashCode()) % mqs.size();
        return mqs.get(idx);
    }
}, order.getId());

// === 消费端:用 MessageListenerOrderly,单 queue 单线程顺序消费 ===
consumer.registerMessageListener((MessageListenerOrderly)
    (msgs, ctx) -> {
        for (MessageExt msg : msgs) {
            try {
                processInOrder(msg);
            } catch (Exception e) {
                // 顺序消费失败:挂起当前 queue 一会再重试,不跳过
                // (跳过会破坏顺序)
                ctx.setSuspendCurrentQueueTimeMillis(1000);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
        return ConsumeOrderlyStatus.SUCCESS;
    });

// 顺序消息的代价:
// 1. 并行度受限于 queue 数,吞吐比并发消费低
// 2. 一条消息卡住,同 queue 后面全部阻塞
// 3. 所以:只对真正需要顺序的业务用,大部分场景"业务幂等 + 状态机"更好
//    —— 与其依赖消息有序,不如让消费逻辑对乱序免疫
//    例:收到"支付"但订单还没"创建" → 暂存,等"创建"到了再处理

优化效果

指标                    治理前          治理后
=============================================================
消息丢失                230 笔/天        0
消息重复导致业务异常    80 笔/天         0(消费幂等)
消息乱序                偶发             0(顺序消息 + 状态机)
对账差异                每天都有         连续 3 个月 0
生产发送方式            oneway/async     事务消息 + 本地消息表
刷盘/复制               ASYNC/ASYNC      SYNC/SYNC
消费幂等                无               唯一索引 + Redis 去重
死信处理                无人管           DLQ 告警 + 人工补偿
消息堆积                偶发无告警       消费能力调优 + 堆积告警

性能代价(可接受):
- SYNC_FLUSH + SYNC_MASTER 后,发送 RT 从 1ms 升到 4ms
- 吞吐从 8w TPS 降到 5w TPS,仍远超业务需求(峰值 1.5w)
- 结论:订单级消息,可靠性远比那几毫秒重要

排查与改造:
- 三端逐一排查定位:3 天
- 生产端事务消息 + 本地消息表改造:4 天
- 消费端幂等改造(涉及 12 个消费者):5 天
- 全链路压测验证:2 天

避坑清单

  1. 消息可靠性是生产+broker+消费三端协同,任一端漏都会丢/重
  2. 生产端禁用 oneway,用同步发送并检查 SendStatus 是否 SEND_OK
  3. "业务事务 + 发消息"的原子性用事务消息或本地消息表保证
  4. broker 订单级消息用 SYNC_FLUSH + SYNC_MASTER,或 Dledger 三副本
  5. RocketMQ 只保证至少一次,消费端必须幂等,这是底线
  6. 幂等优选业务状态机,其次唯一索引,再次 Redis SETNX 去重
  7. 消费先处理再 ACK,失败返回 RECONSUME_LATER,绝不先 ACK
  8. 不可重试的坏消息要跳过 + 告警,别让它无限重试占资源
  9. 死信队列必须有人消费 + 告警 + 补偿,不能无视
  10. 顺序消息代价大,能用"业务对乱序免疫"就别依赖消息有序

总结

消息中间件最大的认知陷阱是"以为发出去就万事大吉"。这次对账事故让我们彻底明白:消息可靠性不是 MQ 单方面的承诺,而是生产、broker、消费三端共同的契约,任何一端偷懒,消息就会从那个缺口漏掉。最大的认知改变是理解了"至少一次"的真正含义 —— RocketMQ 从设计上就只保证消息至少投递一次,这意味着重复消费不是 bug 而是规格,指望"消息不重复"是缘木求鱼,正确的做法是承认重复必然发生、让消费端幂等,而幂等的最优解往往不是去重表,而是业务状态机:一个"待支付→已支付"的 UPDATE WHERE,天然对重复免疫。最被低估的是生产端的可靠投递,很多人用 oneway 或 async 发送图快,却从不检查结果,broker 没收到也浑然不觉,事务消息和本地消息表的价值就在于把"业务成功"和"消息发出"绑成一个原子操作。最容易踩的坑是 broker 的刷盘和复制配置,默认的 ASYNC 性能确实好看,但 broker 一崩、主一挂,page cache 和未同步的消息就没了 —— 对订单这种钱相关的消息,那几毫秒的性能根本不值得用可靠性去换。最后一个反直觉的结论:与其费劲维护严格的消息顺序,不如把消费逻辑写成对乱序免疫 —— 收到"支付"消息但订单还没"创建",就先暂存等一等,这比顺序消息那套"同 queue 单线程、一卡全堵"的机制健壮得多。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

一个共用线程池拖垮全站:线程池隔离与参数调优实录

2026-5-20 12:09:08

技术教程

生产服务器 Too many open files:文件描述符与连接泄漏排查实录

2026-5-20 12:14:34

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索