2023 年我们一个订单核对系统:每天对账 1000w 订单,某次月度对账时发现订单库少 387 笔,但支付库都正常成交。彻查后确认是消息中间件丢失:RocketMQ 主从异步刷盘 + 生产者没用事务消息 + 消费者无 idempotent + 重试上限太低,多重故障叠加把消息搞丢。投了两周做全链路消息可靠性治理,达成"消息零丢失、零重复消费"的工程目标。本文复盘 RocketMQ 消息可靠性的完整修法,覆盖生产端、Broker、消费端、监控。
事故现场
业务:支付完成 → 发 MQ → 订单服务消费 → 写订单库
TPS:平均 8000,峰值 30000
MQ:RocketMQ 4.9.x,3 Master + 3 Slave
配置(出事时):
- 异步刷盘 ASYNC_FLUSH
- 主从异步 ASYNC_MASTER
- 生产者 SendOneway(性能优先)
- 消费 RECONSUME_LATER 16 次后 → DLQ
- 没有事务消息
时间线:
事故发生时:
1. 某 Broker Master 磁盘 IO 异常,async flush 滞后 2 秒
2. 同时 Master 宕机(机柜断电)
3. Slave 没 ack 完异步同步,部分消息丢失
4. 生产者 oneway 没收到 ack,以为成功
5. 订单服务自然没收到消息,订单缺失
排查:
- 支付库:387 笔订单状态 PAID
- 订单库:对应 387 笔订单不存在
- MQ 监控:那一时段消息丢失高峰
- 业务损失:用户付款没收到货,客服赔付 12w
修复 1:Broker 同步双写 + 同步刷盘
# broker.conf(关键业务)
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
brokerRole=SYNC_MASTER # 主从同步(写完 master + 至少 1 slave 才 ack)
flushDiskType=SYNC_FLUSH # 同步刷盘(写磁盘 fsync 后才 ack)
# 性能 vs 可靠性权衡:
# ASYNC_FLUSH + ASYNC_MASTER:吞吐 80000 TPS,有数据丢失风险
# SYNC_FLUSH + SYNC_MASTER:吞吐 12000 TPS,极端情况下 0 丢失
# 我们选 SYNC_MASTER + ASYNC_FLUSH 折中(吞吐 50000,99.99% 可靠)
# 业务分级:
# 核心业务(支付、订单)→ SYNC_MASTER + SYNC_FLUSH
# 一般业务(日志、统计)→ ASYNC
# 集群部署
3 个 Master + 每个 Master 2 个 Slave
跨机柜跨可用区,任一 AZ 挂了不丢消息
# DLedger 模式(RocketMQ 4.5+,自动主从切换)
brokerRole=ASYNC_MASTER
enableDLegerCommitLog=true
storePathRootDir=/data/rocketmq/store
dLegerGroup=broker-1
dLegerPeers=n0-host1:40911;n1-host2:40911;n2-host3:40911
dLegerSelfId=n0
修复 2:生产端用同步发送 + 重试
// 不好:oneway 无 ack,消息丢了不知道
producer.sendOneway(msg);
// 好:同步发送 + 重试 + 检查 sendStatus
public void sendReliably(Message msg) throws Exception {
int maxRetries = 3;
Exception lastEx = null;
for (int i = 0; i < maxRetries; i++) {
try {
SendResult result = producer.send(msg, 3000); // 3s 超时
switch (result.getSendStatus()) {
case SEND_OK:
log.info("send ok msgId={}", result.getMsgId());
return;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
// 这些情况消息已写 master,但未确认 slave/disk
// 可能未持久,要重试(下次幂等去重)
log.warn("send partial ok: {}", result.getSendStatus());
Thread.sleep(100 * (i + 1));
continue;
default:
throw new RuntimeException("unknown status: " + result.getSendStatus());
}
} catch (Exception e) {
lastEx = e;
Thread.sleep(100 * (i + 1));
}
}
// 3 次都失败,落地到"待重发表",后台异步重发
fallbackTable.save(msg, lastEx);
throw lastEx;
}
// Producer 配置
DefaultMQProducer producer = new DefaultMQProducer("order-producer");
producer.setNamesrvAddr("ns1:9876;ns2:9876;ns3:9876");
producer.setSendMsgTimeout(3000);
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setRetryAnotherBrokerWhenNotStoreOK(true); // 一个 broker 失败重选
producer.setCompressMsgBodyOverHowmuch(4096); // 4KB 压缩
producer.setMaxMessageSize(4 * 1024 * 1024);
producer.start();
修复 3:事务消息(强一致场景)
// 场景:DB 写订单 + 发 MQ 通知,要么都成功要么都失败
// 普通做法:先写 DB 再发 MQ,中间宕机就丢
// 事务消息:half-message + 提交/回滚
public class OrderTransactionListener implements TransactionListener {
// 1. 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
CreateOrderRequest req = JSON.parseObject(new String(msg.getBody()), CreateOrderRequest.class);
// 本地事务:写订单表 + 写"事务状态表"
orderMapper.insert(req.toEntity());
txStateMapper.insert(new TxState(msg.getTransactionId(), "committed"));
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("local tx failed", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 2. Broker 回查(如果 producer 宕机没提交)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
TxState state = txStateMapper.findByTxId(msg.getTransactionId());
if (state == null) {
return LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务没执行
}
if ("committed".equals(state.getStatus())) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW; // 继续回查
}
}
// 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("order-tx-producer");
producer.setTransactionListener(new OrderTransactionListener());
producer.start();
Message msg = new Message("ORDER_TOPIC", "CREATE", JSON.toJSONBytes(req));
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
// 流程:
// 1. 发 half message → Broker(对消费者不可见)
// 2. Broker 返回成功
// 3. Producer 执行本地事务(写订单 + 写 tx 状态表)
// 4. Producer commit / rollback(改 half message 状态)
// 5. 若 Producer 宕机,Broker 回查 producer(checkLocalTransaction)
// 6. commit 后消息对消费者可见
修复 4:消费端幂等
// 消息可能重复(网络抖动、Broker 重投、消费失败重试)
// 消费端必须保证幂等
@Service
public class OrderConsumer {
@Autowired private RedisTemplate<String, String> redis;
@Autowired private OrderMapper orderMapper;
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.CONCURRENTLY,
maxReconsumeTimes = 16 // 16 次后进 DLQ
)
public void consume(MessageExt msg) {
String msgId = msg.getMsgId();
String key = "msg:consumed:" + msgId;
// 1. 用 Redis SETNX 防重复(简单粗暴)
Boolean firstTime = redis.opsForValue().setIfAbsent(key, "1", Duration.ofDays(7));
if (Boolean.FALSE.equals(firstTime)) {
log.info("duplicate msg skipped: {}", msgId);
return;
}
try {
CreateOrderRequest req = JSON.parseObject(new String(msg.getBody()), CreateOrderRequest.class);
// 2. 业务层也要幂等(用 orderNo 唯一索引)
try {
orderMapper.insert(req.toEntity());
} catch (DuplicateKeyException e) {
log.info("order already exists: {}", req.getOrderNo());
return;
}
// 3. 业务幂等更通用做法:状态机
// - 检查当前订单状态
// - 只在允许的状态下执行
} catch (Exception e) {
// 失败抛异常,RocketMQ 自动重投
redis.delete(key); // 清除标记,允许重试
throw e;
}
}
}
// 唯一索引(数据库层兜底)
CREATE UNIQUE INDEX uk_order_no ON t_order (order_no);
// 状态机
public boolean updateOrderStatus(Long orderId, OrderStatus from, OrderStatus to) {
int affected = orderMapper.updateStatus(orderId, from, to);
return affected > 0; // 用 UPDATE WHERE 当前状态来保证只迁移一次
}
修复 5:死信队列(DLQ)处理
// 默认重试 16 次后进 DLQ,但没人监控就丢了
// 解决:订阅 DLQ topic,告警 + 人工处理
@RocketMQMessageListener(
topic = "%DLQ%order-consumer-group", // DLQ topic 命名规则
consumerGroup = "dlq-monitor-group"
)
@Service
public class DlqMonitor {
public void consume(MessageExt msg) {
// 1. 告警:DLQ 来了消息就是异常
alertService.send("DLQ message: " + msg.getMsgId(), msg);
// 2. 落库:供人工排查 + 重发
DlqRecord record = new DlqRecord();
record.setMsgId(msg.getMsgId());
record.setBody(new String(msg.getBody()));
record.setOriginalTopic(msg.getProperties().get("ORIGIN_MESSAGE_TOPIC"));
record.setFailReason(msg.getProperties().get("RECONSUME_TIME"));
record.setCreatedAt(LocalDateTime.now());
dlqRecordMapper.insert(record);
// 3. 不能再 throw,DLQ 消费失败也要 ack
}
}
// 后台管理页面:看 DLQ 记录 → 修业务问题 → 一键重发到原 topic
@PostMapping("/admin/dlq/replay/{id}")
public String replay(@PathVariable Long id) {
DlqRecord r = dlqRecordMapper.findById(id);
Message msg = new Message(r.getOriginalTopic(), r.getBody().getBytes());
SendResult result = producer.send(msg);
return result.getSendStatus().name();
}
修复 6:消息轨迹(MessageTrace)
# Broker 开启 trace
$ vim broker.conf
traceTopicEnable=true
# Producer 开启 trace
DefaultMQProducer producer = new DefaultMQProducer("g", true); // true=trace
# Consumer 开启 trace
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g", true);
# 查询消息轨迹(RocketMQ Console)
$ open http://console-host:8080/#/messageTrace
# 看到:
# 1. Producer 发送时间、broker、状态
# 2. Broker 存储时间、queue offset
# 3. Consumer Group 拉取时间
# 4. 各 Consumer 实例消费时间、是否成功、重试次数
# 排查丢失:看 producer 发送日志 + broker 接收日志 + consumer 拉取日志
# 任何一环断了,就找到根因
监控告警
# Prometheus + rocketmq-exporter
- alert: BrokerDiskHighUsage
expr: rocketmq_brokeruntime_disk_ratio > 0.85
annotations:
summary: "Broker 磁盘 > 85%,可能写不进"
- alert: MasterSlaveLagHigh
expr: rocketmq_brokeruntime_master_slave_diff > 1024*1024*100 # 100MB
for: 1m
annotations:
summary: "主从延迟 > 100MB,可能丢消息"
- alert: ProducerSendFailed
expr: increase(rocketmq_producer_send_failed_total[5m]) > 10
annotations:
summary: "5min 内 producer send 失败 > 10 次"
- alert: ConsumerLag
expr: sum by(group, topic) (rocketmq_consumer_lag) > 10000
for: 2m
annotations:
summary: "消费 lag > 1w 条"
- alert: DlqMessages
expr: increase(rocketmq_message_dlq_total[1h]) > 0
annotations:
severity: critical
summary: "1h 内有 DLQ 消息"
- alert: TxMessageCheckTooMany
expr: rate(rocketmq_tx_message_check_total[5m]) > 100
annotations:
summary: "事务消息回查频繁,producer 可能有问题"
优化效果
指标 优化前 优化后
=====================================================
消息丢失数(月) 387 0
消息重复消费(月) 数千次 0(幂等保证)
Producer 发送失败 不感知 捕获 + 重试 + 兜底
Broker 主从滞后 10MB-100MB < 1MB
DLQ 消息处理 无监控 秒级告警 + 自助重发
事务消息回查 未启用 99.9% 一次成功
消息轨迹 无 全链路可查
TPS 10w(无可靠性) 5w(有可靠性)
→ 业务可接受的权衡
业务影响:
- 订单不再丢,客服赔付 0
- 对账系统数据 100% 一致
- 财务侧不再每月手工对账
- 故障期间可快速定位(消息轨迹)
避坑清单
- 核心业务 SYNC_MASTER + SYNC_FLUSH,日志类业务 ASYNC 提速
- Producer 绝不用 sendOneway,必须 send + 检查 SendStatus
- SendStatus 不是 SEND_OK 就要重试(FLUSH_DISK_TIMEOUT 等也要)
- 分布式事务用 RocketMQ 事务消息(half + commit/rollback + 回查)
- 消费端必须幂等:Redis setnx 加 DB 唯一索引双保险
- maxReconsumeTimes 设 16,DLQ 必监控
- 状态机更新用 UPDATE WHERE current=expected,天然幂等
- 开 MessageTrace,生产排查神器
- 主从延迟、磁盘使用、DLQ 都要告警
- 大消息 (> 4MB) 走 OSS,MQ 只传 ref
总结
消息中间件的"零丢失"是个工程链路,不是配置一个开关就完事。这次治理覆盖了 Broker、Producer、Consumer 三端 + 监控,缺一不可。最大的认知改变:RocketMQ 默认配置(ASYNC + oneway)是为了演示性能数据,生产业务必须按业务等级调整。最被低估的是事务消息,很多团队怕复杂不用,实际上比"先写 DB 再发 MQ + 补偿"简单太多,RocketMQ 帮你做了 half + 回查 + commit/rollback 整套机制。最容易踩的坑是消费幂等没做好,以为 Redis setnx 万事大吉,但 setnx + 消费失败 + 不清理 → 永久跳过这条消息;正确做法是 setnx 失败立即清除,让重试有机会成功。最后,DLQ 监控很多团队完全没做,消息悄悄进了 DLQ 永远丢失,客户投诉了才发现 — 必须订阅 DLQ topic 告警,这是底线。
—— 别看了 · 2026