2024 年我们的订单系统用 RocketMQ 做异步解耦:下单后发消息,库存、积分、通知三个服务各自消费。某次对账发现 — 当天有 230 笔订单的积分没加上(消息丢了),又有 80 笔订单积分加了两次(消息重了)。投了两周做消息可靠性治理,从生产、broker、消费三端补齐,之后连续三个月对账零差异。本文复盘 RocketMQ 消息不丢、不重、不乱序的完整实战,覆盖可靠投递、持久化、消费幂等、堆积治理、顺序消息。
问题背景
消息中间件:RocketMQ 4.9,2 主 2 从
业务:订单下单 → 发消息 → 库存/积分/通知 三个消费组
日均消息:1200w
对账发现的问题:
- 消息丢失:230 笔/天,积分没加
- 消息重复:80 笔/天,积分加两次
- 偶发乱序:同一订单的"创建""支付"消息消费颠倒
逐端排查:
# 生产端
1. 用了 oneway 发送(发完不管结果),broker 没收到也不知道
producer.sendOneway(msg); ← 性能好但完全不可靠
2. 部分代码用 async 发送,回调里只打日志没重试
# broker 端
3. 刷盘策略是 ASYNC_FLUSH(异步刷盘)
→ broker 进程崩溃,page cache 里没落盘的消息全丢
4. 主从复制是 ASYNC(异步)
→ 主挂了,从还没同步的消息丢
# 消费端
5. 消费逻辑没幂等,RocketMQ 本身保证"至少一次",重复必然发生
6. 消费代码先 ACK 再处理,处理失败消息却已确认 → 丢
7. 同一订单消息发到不同 queue → 并行消费乱序
结论:消息可靠性是生产 + broker + 消费三端协同,缺一环就漏
修复 1:生产端可靠投递
// === 错误:oneway,发完不管 ===
// producer.sendOneway(msg);
// === 正确:同步发送 + 检查结果 + 重试 ===
public void sendReliable(Message msg) {
DefaultMQProducer producer = getProducer();
producer.setRetryTimesWhenSendFailed(3); // 同步发送失败重试 3 次
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setSendMsgTimeout(3000);
try {
SendResult result = producer.send(msg); // 同步发送,阻塞等结果
// 必须检查发送状态!
if (result.getSendStatus() != SendStatus.SEND_OK) {
// SLAVE_NOT_AVAILABLE / FLUSH_DISK_TIMEOUT 等 → 视为不可靠
log.warn("send not ok: {}, fallback", result.getSendStatus());
saveToLocalTable(msg); // 落本地消息表,定时补发
}
} catch (Exception e) {
// 重试耗尽仍失败 → 落本地表,绝不丢
log.error("send failed after retry", e);
saveToLocalTable(msg);
}
}
// === 业务消息防丢的终极方案:本地消息表(可靠消息最终一致)===
@Transactional
public void createOrder(Order order) {
orderDao.insert(order); // 1. 写业务数据
// 2. 同一个本地事务里,写一条"待发送消息"
LocalMessage lm = new LocalMessage(order.getId(),
buildOrderMsgBody(order), MsgStatus.PENDING);
localMessageDao.insert(lm);
// 事务提交 → 业务和"待发消息"原子性一起成功
}
// 3. 独立定时任务扫描 PENDING 消息,发 MQ,发成功改 SENT
@Scheduled(fixedDelay = 1000)
public void dispatchPendingMessages() {
for (LocalMessage lm : localMessageDao.findPending(200)) {
try {
producer.send(toMessage(lm));
localMessageDao.updateStatus(lm.getId(), MsgStatus.SENT);
} catch (Exception e) {
localMessageDao.incrRetry(lm.getId()); // 失败下次再扫
}
}
}
// 这样:只要业务事务提交了,消息一定会发出去(最终一定)
修复 2:RocketMQ 事务消息
// 本地消息表要自己维护表 + 定时任务,RocketMQ 事务消息是托管版方案
// 解决"发消息"与"本地事务"的原子性
public class OrderTransactionProducer {
private TransactionMQProducer producer;
public void init() throws MQClientException {
producer = new TransactionMQProducer("order_tx_group");
producer.setNamesrvAddr("ns1:9876;ns2:9876");
// 注册事务监听器
producer.setTransactionListener(new TransactionListener() {
// 1. 发送 half 消息成功后,执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(
Message msg, Object arg) {
try {
Order order = (Order) arg;
orderService.doCreateOrder(order); // 本地事务
return LocalTransactionState.COMMIT_MESSAGE; // 提交 → 消息可投递
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚 → 消息丢弃
}
}
// 2. broker 回查:本地事务状态不明时,broker 来问
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getKeys();
// 查订单是否真的创建成功了
return orderService.exists(orderId)
? LocalTransactionState.COMMIT_MESSAGE
: LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
}
public void createOrderWithTxMsg(Order order) throws MQClientException {
Message msg = new Message("ORDER_TOPIC", "CREATE",
order.getId(), serialize(order).getBytes());
// 发送事务消息:先发 half(消费者看不到)→ 执行本地事务 → 提交/回滚
producer.sendMessageInTransaction(msg, order);
}
}
// 流程:half 消息 → 本地事务 → COMMIT 才真正投递 → 状态不明时 broker 回查
// 保证"订单创建成功"和"消息发出"要么都成,要么都不成
修复 3:broker 端持久化
# broker.conf —— 不丢消息的 broker 配置
# === 1. 刷盘策略 ===
# ASYNC_FLUSH:异步刷盘,性能高,但 broker 崩溃丢 page cache 里的消息
# SYNC_FLUSH: 同步刷盘,消息落盘才返回成功,慢但不丢
flushDiskType = SYNC_FLUSH
# 金融级订单消息用 SYNC_FLUSH;允许极小概率丢的日志类可 ASYNC
# === 2. 主从复制方式 ===
# ASYNC_MASTER:异步复制,主返回成功时从可能还没同步,主挂丢消息
# SYNC_MASTER: 同步复制,主从都写成功才返回,可靠
brokerRole = SYNC_MASTER
# 配合 SYNC_FLUSH,做到主从都落盘才算成功
# === 3. 消息保留与磁盘保护 ===
fileReservedTime = 72 # 消息文件保留 72 小时
diskMaxUsedSpaceRatio = 88 # 磁盘用到 88% 触发清理
# 磁盘满会导致 broker 拒绝写入,务必监控
# === 4. 消息大小与堆积 ===
maxMessageSize = 4194304 # 单消息上限 4MB
# === 5. 强烈建议:用 Dledger 模式(基于 Raft 的多副本)===
# RocketMQ 4.5+ 支持 Dledger,自动选主,比传统主从更可靠
# enableDLegerCommitLog = true
# dLegerGroup = order-broker-group
# dLegerPeers = n0-broker1:40911;n1-broker2:40911;n2-broker3:40911
# 可靠性等级权衡:
# 最高:SYNC_FLUSH + SYNC_MASTER(或 Dledger 3 副本)→ 订单/支付
# 平衡:ASYNC_FLUSH + SYNC_MASTER → 一般业务
# 最快:ASYNC_FLUSH + ASYNC_MASTER → 日志/埋点(可容忍丢)
修复 4:消费端幂等
// RocketMQ 只保证"至少一次"投递,重复消费是必然,消费端必须幂等
// 重复来源:生产重发、broker 重投、消费超时未 ACK 被重发、rebalance
// === 方案 1:数据库唯一索引(最可靠,强一致)===
// msg_consume_record 表对 (msg_key, consumer_group) 建唯一索引
public ConsumeResult consume(MessageExt msg) {
String msgKey = msg.getKeys(); // 业务唯一键,如订单号
try {
// 插入消费记录,唯一索引冲突 = 已消费过
consumeRecordDao.insert(new ConsumeRecord(msgKey, GROUP));
} catch (DuplicateKeyException e) {
log.info("msg {} already consumed, skip", msgKey);
return ConsumeResult.SUCCESS; // 重复 → 直接成功,不再处理
}
// 首次消费,执行业务
addPoints(msgKey);
return ConsumeResult.SUCCESS;
}
// === 方案 2:Redis SETNX 去重(高性能,适合海量消息)===
public ConsumeResult consumeWithRedis(MessageExt msg) {
String key = "mq:dedup:" + GROUP + ":" + msg.getKeys();
// setIfAbsent:不存在才设,返回 true 表示首次
Boolean first = redis.opsForValue()
.setIfAbsent(key, "1", Duration.ofDays(3)); // 去重窗口 3 天
if (Boolean.FALSE.equals(first)) {
return ConsumeResult.SUCCESS; // 重复
}
try {
addPoints(msg.getKeys());
return ConsumeResult.SUCCESS;
} catch (Exception e) {
redis.delete(key); // 业务失败,删标记允许重试
return ConsumeResult.RECONSUME_LATER;
}
}
// === 方案 3:业务状态机天然幂等(最优雅)===
// 例:订单状态 待支付→已支付,UPDATE ... WHERE status='待支付'
// 重复消费时 status 已是"已支付",UPDATE 影响行数 0,天然幂等
public void onOrderPaid(String orderId) {
int rows = orderDao.updateStatusIfMatch(orderId, "PAID", "UNPAID");
if (rows == 0) {
log.info("order {} not in UNPAID state, skip", orderId);
}
}
// === 消费监听:务必处理完再返回 SUCCESS,失败返回 RECONSUME_LATER ===
consumer.registerMessageListener((MessageListenerConcurrently)
(msgs, ctx) -> {
for (MessageExt msg : msgs) {
try {
if (consume(msg) != ConsumeResult.SUCCESS) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 失败重投
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
修复 5:消息堆积与死信
// === 消息堆积:消费速度跟不上生产 ===
// 排查:
// sh mqadmin consumerProgress -g points_consumer_group
// 看 Diff(未消费数),持续增大就是堆积
// 提升消费能力:
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64); // 加大消费线程
consumer.setPullBatchSize(32); // 一次拉多条
consumer.setConsumeMessageBatchMaxSize(1); // 但逐条处理,失败隔离
// 注意:topic 的 queue 数决定消费并行度上限
// 消费者实例数 ≤ queue 数,加机器超过 queue 数也没用 → 提前规划 queue 数
// === 死信队列(DLQ):重试 16 次仍失败的消息进死信 ===
// RocketMQ 自动建 %DLQ%consumer_group,要专门消费它
consumer.subscribe("%DLQ%points_consumer_group", "*");
// 死信消息:人工介入排查 + 补偿,绝不能无视
// 常见原因:消息体格式错、依赖服务长期不可用、业务 bug
// === 重试次数与退避 ===
// RocketMQ 默认重试 16 次,间隔递增(10s,30s,1m...2h)
consumer.setMaxReconsumeTimes(5); // 改成 5 次,快速进死信
// 太多次重试:无效消息长期占用消费资源
// 太少次重试:偶发抖动没机会自愈
// === 消费失败的正确姿势 ===
// 1. 可重试错误(下游抖动)→ RECONSUME_LATER
// 2. 不可重试错误(消息格式错)→ 记录日志 + 返回 SUCCESS 跳过,别一直重试
public ConsumeConcurrentlyStatus handle(MessageExt msg) {
try {
Order order = parse(msg); // 解析失败 = 不可重试
process(order);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (ParseException e) {
log.error("bad message, skip: {}", msg.getMsgId(), e);
alertService.notify("坏消息", msg.getMsgId());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 跳过,不重试
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 重试
}
}
修复 6:顺序消息
// 问题:同一订单的"创建""支付""完成"消息被分到不同 queue,并行消费乱序
// RocketMQ 顺序消息:同一业务键的消息进同一 queue,且单线程顺序消费
// === 生产端:用 MessageQueueSelector 把同订单消息发到同一 queue ===
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
// 按 orderId 取模,同一订单永远落同一个 queue → 保证入队有序
int idx = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(idx);
}
}, order.getId());
// === 消费端:用 MessageListenerOrderly,单 queue 单线程顺序消费 ===
consumer.registerMessageListener((MessageListenerOrderly)
(msgs, ctx) -> {
for (MessageExt msg : msgs) {
try {
processInOrder(msg);
} catch (Exception e) {
// 顺序消费失败:挂起当前 queue 一会再重试,不跳过
// (跳过会破坏顺序)
ctx.setSuspendCurrentQueueTimeMillis(1000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
});
// 顺序消息的代价:
// 1. 并行度受限于 queue 数,吞吐比并发消费低
// 2. 一条消息卡住,同 queue 后面全部阻塞
// 3. 所以:只对真正需要顺序的业务用,大部分场景"业务幂等 + 状态机"更好
// —— 与其依赖消息有序,不如让消费逻辑对乱序免疫
// 例:收到"支付"但订单还没"创建" → 暂存,等"创建"到了再处理
优化效果
指标 治理前 治理后
=============================================================
消息丢失 230 笔/天 0
消息重复导致业务异常 80 笔/天 0(消费幂等)
消息乱序 偶发 0(顺序消息 + 状态机)
对账差异 每天都有 连续 3 个月 0
生产发送方式 oneway/async 事务消息 + 本地消息表
刷盘/复制 ASYNC/ASYNC SYNC/SYNC
消费幂等 无 唯一索引 + Redis 去重
死信处理 无人管 DLQ 告警 + 人工补偿
消息堆积 偶发无告警 消费能力调优 + 堆积告警
性能代价(可接受):
- SYNC_FLUSH + SYNC_MASTER 后,发送 RT 从 1ms 升到 4ms
- 吞吐从 8w TPS 降到 5w TPS,仍远超业务需求(峰值 1.5w)
- 结论:订单级消息,可靠性远比那几毫秒重要
排查与改造:
- 三端逐一排查定位:3 天
- 生产端事务消息 + 本地消息表改造:4 天
- 消费端幂等改造(涉及 12 个消费者):5 天
- 全链路压测验证:2 天
避坑清单
- 消息可靠性是生产+broker+消费三端协同,任一端漏都会丢/重
- 生产端禁用 oneway,用同步发送并检查 SendStatus 是否 SEND_OK
- "业务事务 + 发消息"的原子性用事务消息或本地消息表保证
- broker 订单级消息用 SYNC_FLUSH + SYNC_MASTER,或 Dledger 三副本
- RocketMQ 只保证至少一次,消费端必须幂等,这是底线
- 幂等优选业务状态机,其次唯一索引,再次 Redis SETNX 去重
- 消费先处理再 ACK,失败返回 RECONSUME_LATER,绝不先 ACK
- 不可重试的坏消息要跳过 + 告警,别让它无限重试占资源
- 死信队列必须有人消费 + 告警 + 补偿,不能无视
- 顺序消息代价大,能用"业务对乱序免疫"就别依赖消息有序
总结
消息中间件最大的认知陷阱是"以为发出去就万事大吉"。这次对账事故让我们彻底明白:消息可靠性不是 MQ 单方面的承诺,而是生产、broker、消费三端共同的契约,任何一端偷懒,消息就会从那个缺口漏掉。最大的认知改变是理解了"至少一次"的真正含义 —— RocketMQ 从设计上就只保证消息至少投递一次,这意味着重复消费不是 bug 而是规格,指望"消息不重复"是缘木求鱼,正确的做法是承认重复必然发生、让消费端幂等,而幂等的最优解往往不是去重表,而是业务状态机:一个"待支付→已支付"的 UPDATE WHERE,天然对重复免疫。最被低估的是生产端的可靠投递,很多人用 oneway 或 async 发送图快,却从不检查结果,broker 没收到也浑然不觉,事务消息和本地消息表的价值就在于把"业务成功"和"消息发出"绑成一个原子操作。最容易踩的坑是 broker 的刷盘和复制配置,默认的 ASYNC 性能确实好看,但 broker 一崩、主一挂,page cache 和未同步的消息就没了 —— 对订单这种钱相关的消息,那几毫秒的性能根本不值得用可靠性去换。最后一个反直觉的结论:与其费劲维护严格的消息顺序,不如把消费逻辑写成对乱序免疫 —— 收到"支付"消息但订单还没"创建",就先暂存等一等,这比顺序消息那套"同 queue 单线程、一卡全堵"的机制健壮得多。
—— 别看了 · 2026