2023 年,我维护一个电商的订单系统。下单这个动作的链路很长:扣库存、加积分、发短信、通知物流……如果全塞在一个同步请求里做,用户点一下「提交订单」要转好几秒。所以我们用了消息队列(MQ):订单主流程只管把订单落库,然后往 MQ 发一条「订单已创建」的消息,后面那些扣库存、加积分的事,交给各自的消费者去异步处理。这套架构跑得挺好,直到一次大促。运营找过来,说有几个爆款商品的库存对不上——系统里显示卖了 1200 件,实际出库记录只有 800 多件,库存被扣多了。我把扣库存的消费者日志拉出来,顺着一个订单号查,当场就愣住了:同一个订单的「扣库存」消息,被消费了三次。我的第一反应是「MQ 出 bug 了,怎么会重复投递」。我甚至准备提工单找 MQ 团队。可当我真正去翻 MQ 的文档,翻到「投递语义」那一节,才明白自己错得有多离谱:MQ 默认的「至少一次(at-least-once)」投递,本来就意味着消息会重复——重复不是 bug,是这套机制写在说明书第一页的设计。是我,从头到尾都没想过这件事。那次事故逼着我把消息队列最核心的几个问题——消息会不会丢、会不会重、会不会乱序、堆积了怎么办——彻底搞懂了一遍。本文是这份复盘。
问题背景:一次"库存被扣了三次"的大促事故
背景:电商订单系统,用 MQ 把下单后的扣库存/加积分/
发短信改成异步 —— 订单落库后发一条"订单已创建"消息
某次大促,出事:
- ★★ 爆款商品库存对不上:系统显示卖 1200 件,
实际出库记录只有 800 多 —— 库存被扣多了
★ 拉扣库存消费者日志,顺着一个订单号查 —— 愣住了:
同一个订单的"扣库存"消息,被消费了【三次】
★ 第一反应:"MQ 出 bug 了,怎么会重复投递",
准备提工单找 MQ 团队
★★ 真去翻 MQ 文档"投递语义"那节,才明白错得多离谱:
- MQ 默认的【至少一次 at-least-once】投递,
本来就意味着消息会重复
- 重复不是 bug,是写在说明书第一页的设计
- 是我从头到尾没想过这件事
★ 本文要做的:把消息会不会丢、会不会重、会不会乱序、
堆积了怎么办,彻底搞懂一遍。
三种投递语义:先认清你用的 MQ"承诺"了什么
# === ★ 一切问题的源头,是没搞懂"投递语义" ===
# === ★ 语义一:at-most-once(至多一次)===
# ★ ★ 消息【最多】被投递一次 —— 可能一次,也可能零次。
# 它的潜台词是:【允许丢消息】。
# ★ ★ 怎么会丢?比如消费者一拿到消息,【立刻就告诉
# MQ"我收到了"】(提前 ack),然后才慢慢处理。要是
# ack 之后、处理之前崩了,这条消息就【永远丢了】——
# MQ 以为它处理完了,不会再投。
# ★ ★ 适用:能容忍丢一点的场景(如打点日志、监控埋点)。
# === ★★ 语义二:at-least-once(至少一次)===
# ★ ★ 消息【至少】被投递一次 —— 可能一次,也可能多次。
# 它的潜台词是:【不丢,但会重复】。
# ★ ★ 怎么会重?消费者【处理完业务,再 ack】。要是
# 业务处理成功了、但 ack 还没发出去就崩了 —— MQ 没
# 收到 ack,会认为"这条没人消费",于是【重新投递】。
# 于是同一条消息被处理了两次。
# ★ ★★ 这是【绝大多数 MQ 的默认语义】,也是开头那次
# "扣三次"事故的根源。你用 Kafka、RocketMQ、RabbitMQ,
# 默认拿到的就是它 —— 重复,是你必须自己消化的代价。
# === ★ 语义三:exactly-once(恰好一次)===
# ★ ★ 消息【不多不少,恰好】被处理一次。听起来最完美,
# 人人都想要。
# ★ ★★ 但要清醒:严格的、端到端的 exactly-once【极难
# 实现】,代价也极高。市面上号称 exactly-once 的方案
# (如 Kafka 事务),往往只覆盖"MQ 内部"那一段,
# 一旦消息要落到【你的数据库】,这个保证就断了。
# ★ ★ 所以工程上的主流答案不是去追求 exactly-once,
# 而是:【用 at-least-once + 消费端幂等】,达到一个
# "效果上恰好一次"的结果。
# === 小结 ===
# ★ 一切问题源头是没搞懂投递语义。★ at-most-once 至多
# 一次:消息最多投递一次可能一次也可能零次,潜台词是
# 允许丢消息 —— 消费者一拿到就提前 ack 再慢慢处理,
# ack 后处理前崩了消息就永远丢了,适用能容忍丢的场景
# (日志埋点)。★★ at-least-once 至少一次:消息至少
# 投递一次可能一次也可能多次,潜台词是不丢但会重复 ——
# 消费者处理完业务再 ack,业务成功但 ack 没发出就崩了
# MQ 没收到 ack 会重新投递,同一条被处理两次;这是
# 绝大多数 MQ 的默认语义,也是开头"扣三次"事故根源,
# 重复是你必须自己消化的代价。★ exactly-once 恰好一次:
# 不多不少恰好处理一次听起来最完美,但严格端到端的
# exactly-once 极难实现代价极高,号称的方案往往只覆盖
# MQ 内部、消息落到你的数据库保证就断了;工程主流答案
# 不是追求 exactly-once 而是用 at-least-once+消费端
# 幂等,达到"效果上恰好一次"。
// ★ 对比:at-most-once vs at-least-once,差别只在"何时 ack"
import org.apache.rocketmq.spring.core.RocketMQListener;
// ────────── ✗ at-most-once:先 ack 再处理 —— 会丢消息 ──────────
public class AtMostOnceConsumer {
public void onMessage(OrderMessage msg, AckCallback ack) {
ack.confirm(); // ★✗ 一拿到就 ack
// ★★ 若这之后、deductStock 之前进程崩了 ->
// MQ 以为已消费,不再投递 -> 这条消息永久丢失
deductStock(msg.getOrderId());
}
}
// ────────── ✓ at-least-once:先处理再 ack —— 不丢但会重 ──────────
public class AtLeastOnceConsumer {
public void onMessage(OrderMessage msg, AckCallback ack) {
deductStock(msg.getOrderId()); // ★ 先把业务做完
ack.confirm(); // ★ 成功了才 ack
// ★★ 若 deductStock 成功、但 ack 之前崩了 ->
// MQ 没收到 ack -> 认为没人消费 -> 重新投递 ->
// 同一条消息被处理两次(开头事故就是这么来的)
}
private void deductStock(String orderId) { /* 扣库存 */ }
}
// ★ 结论:默认就是 at-least-once。别想着消灭重复,
// 要做的是"让重复处理多少次,结果都一样" —— 即幂等
消息丢失:生产端、Broker、消费端,三处都会丢
# === ★ "消息丢了"不是一个点,是一条链路上的三个点 ===
# === ★ 丢失点一:生产端 —— 消息没发到 Broker ===
# ★ ★ 你的代码调了 send(),但网络抖动、Broker 繁忙,
# 这条消息【根本没到】Broker。如果你用的是【异步发送】
# 且【不检查回调结果】,你的代码会以为"发成功了",
# 实际它已经丢在半路。
# ★ ★ 治:用【同步发送】并检查返回值,或异步发送但在
# 回调里处理失败 —— 失败了就【重试】或【落库补偿】。
# 关键是:发送结果,你必须【真的去确认】。
# === ★★ 丢失点二:Broker —— 消息到了但没存住 ===
# ★ ★ 消息到了 Broker,但 Broker 为了快,先把它放在
# 【内存】里,还没来得及刷到磁盘。这时 Broker 宕机,
# 内存里那批消息就【全没了】。
# ★ ★ 治两条:① 刷盘策略尽量用【同步刷盘】(消息落盘
# 了才算成功,慢一点但稳);② Broker 要做【集群 +
# 多副本】—— 一条消息要复制到多个节点都存了,才算
# 写成功,单个节点挂了不影响。
# === ★ 丢失点三:消费端 —— 消息处理失败却 ack 了 ===
# ★ ★ 这就是上一节说的:提前 ack(at-most-once),
# 或者业务抛了异常你却把它吞掉、照样 ack。MQ 一看
# ack 了,就再也不投这条消息了 —— 等于消费端丢了它。
# ★ ★ 治:消费成功才 ack;消费失败【不要 ack】(或
# 显式返回失败),让 MQ 重新投递。处理逻辑里别用
# catch 把异常一吞了之。
# === ★ 一个贯穿三处的思路:可靠性是有成本的 ===
# ★ ★ 同步发送比异步慢、同步刷盘比异步刷盘慢、多副本
# 比单副本占资源。"一条消息绝对不丢",是用吞吐和延迟
# 换来的。所以要分场景:支付、订单这种【一条都不能丢】,
# 就得付这个成本;而日志、埋点,丢一点无所谓,就别为
# 它上最重的配置。
# === 小结 ===
# ★ "消息丢了"不是一个点是一条链路上的三个点。★ 丢失点
# 一生产端:代码调了 send() 但网络抖动/Broker 繁忙消息
# 根本没到 Broker,若用异步发送且不检查回调结果代码会
# 以为发成功了实际丢在半路,治法是同步发送并检查返回值
# 或异步发送在回调里处理失败(重试或落库补偿)。★★ 丢失
# 点二 Broker:消息到了 Broker 但为了快先放内存还没刷
# 磁盘,这时 Broker 宕机内存那批全没了,治两条 —— 刷盘
# 尽量用同步刷盘(落盘才算成功)、Broker 做集群+多副本
# (复制到多节点才算写成功)。★ 丢失点三消费端:提前
# ack 或业务抛异常被吞掉照样 ack,MQ 一看 ack 就不再投
# 等于消费端丢了它,治法是消费成功才 ack、失败不要 ack
# 让 MQ 重投、别用 catch 把异常一吞了之。★ 贯穿三处的
# 思路:可靠性有成本 —— 同步发送比异步慢、同步刷盘比
# 异步慢、多副本比单副本占资源,"一条都不丢"是用吞吐
# 和延迟换的,支付订单付这个成本、日志埋点别上最重配置。
// ★ 防丢失:生产端同步发送 + 检查结果 + 失败补偿
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
public class ReliableProducer {
private RocketMQTemplate rocketMQTemplate;
private MsgBackupMapper backupMapper; // ★ 失败消息落库表
public void sendOrderMessage(OrderMessage msg) {
try {
// ★★ 用同步发送 —— 它会【等 Broker 真正确认】才返回
SendResult result = rocketMQTemplate.syncSend(
"order-topic", msg);
// ★★ 关键:发送结果必须【真的去检查】,不能调完就不管
if (result.getSendStatus() != SendStatus.SEND_OK) {
// ★ 没到"SEND_OK" —— 视为发送失败,走补偿
handleSendFailure(msg);
}
} catch (Exception e) {
// ★ 抛异常(网络断、超时)同样是发送失败
handleSendFailure(msg);
}
}
// ★ 补偿:把发失败的消息落库,由定时任务后续重发
private void handleSendFailure(OrderMessage msg) {
backupMapper.insert(new MsgBackup(
msg.getOrderId(), toJson(msg), "PENDING_RETRY"));
// ★★ 这样即使现在发不出去,消息也不会凭空消失 ——
// 定时任务扫 PENDING_RETRY 的记录,重发,发成功再标记
}
private String toJson(OrderMessage msg) { /* 序列化 */ return ""; }
}
消息重复:别想着消灭它,让消费幂等
# === ★ 接受现实:at-least-once 下,重复无法被消灭 ===
# === ★ 为什么重复无法根除 ===
# ★ ★ 上面讲过:消费者"业务成功、ack 之前崩了",MQ
# 就会重投。除此之外,生产端发送超时后【自动重试】、
# 消费端【负载均衡 rebalance】,都会制造重复。
# ★ ★★ 你不可能堵死所有这些缝隙。所以正确的心态不是
# "我要让消息绝不重复",而是【"我假设消息一定会重复,
# 我让我的消费逻辑,重复执行 N 次和执行 1 次,结果
# 完全一样"】—— 这就叫【幂等】。
# === ★ 幂等的核心:给每条消息一个"唯一身份" ===
# ★ ★ 要识别"这条我处理过了",前提是每条消息有一个
# 全局唯一、且【业务上稳定】的 ID。
# ★ ★★ 注意:别用 MQ 自带的 messageId 当幂等键 ——
# 同一条业务消息重投时,messageId 可能是变的。要用
# 【业务自己的唯一标识】:订单号、或生产者生成消息
# 时就带上的一个 UUID。这个 ID,重投多少次都不变。
# === ★★ 方案一:唯一约束 —— 让数据库帮你挡 ===
# ★ ★ 最稳的幂等,是借数据库的【唯一索引】。比如扣库存
# 时,往一张"扣减流水表"插一条记录,把订单号设成唯一
# 索引。第一次插入成功 -> 执行扣库存;重复消息再来,
# 插入会【因唯一键冲突而失败】-> 你捕获这个冲突,
# 直接跳过,不再扣。
# ★ ★ 好处:判重和业务在【同一个事务】里,天然原子,
# 不会出现"判重过了但业务没做"的中间态。
# === ★ 方案二:状态机 —— 用业务状态天然防重 ===
# ★ ★ 很多业务自带状态。比如订单有"待支付 -> 已支付"。
# 处理"支付成功"消息时,先看订单当前状态:已经是
# "已支付"了?那这条消息就是重复的,直接跳过。
# ★ ★ 更新时用【乐观锁】:UPDATE ... SET status='已支付'
# WHERE status='待支付'。只有第一条消息能更新成功,
# 后到的重复消息,WHERE 条件不满足,影响行数为 0,
# 自然就被挡住了。
# === 小结 ===
# ★ 接受现实:at-least-once 下重复无法被消灭。★ 为什么
# 无法根除:消费者"业务成功 ack 前崩了"会重投,生产端
# 发送超时自动重试、消费端 rebalance 也都会制造重复,
# 你堵不死所有缝隙;正确心态不是"让消息绝不重复"而是
# "假设消息一定会重复,让消费逻辑重复执行 N 次和 1 次
# 结果完全一样" —— 这就叫幂等。★ 幂等核心是给每条消息
# 一个唯一身份:要识别"这条处理过了"前提是每条消息有
# 全局唯一且业务上稳定的 ID,别用 MQ 自带 messageId
# (重投时可能变),要用业务自己的唯一标识(订单号或
# 生产时就带上的 UUID)重投多少次都不变。★★ 方案一
# 唯一约束让数据库帮你挡:往"扣减流水表"插记录把订单号
# 设唯一索引,第一次插入成功就执行扣库存、重复消息再来
# 插入因唯一键冲突失败你捕获冲突直接跳过,好处是判重和
# 业务在同一事务里天然原子。★ 方案二状态机用业务状态
# 天然防重:订单有"待支付→已支付",处理"支付成功"消息
# 先看当前状态已是"已支付"就是重复直接跳过,更新用乐观锁
# UPDATE SET status='已支付' WHERE status='待支付',只有
# 第一条能更新成功、后到的重复消息影响行数为 0 被挡住。
// ★ 消费端幂等:唯一约束 + 状态机,双保险(开头事故的正解)
import org.springframework.dao.DuplicateKeyException;
import org.springframework.transaction.annotation.Transactional;
public class IdempotentStockConsumer {
private DeductLogMapper deductLogMapper; // 扣减流水表,order_id 唯一索引
private StockMapper stockMapper;
@Transactional
public void onMessage(OrderMessage msg) {
// ★ 幂等键:用【业务订单号】,不要用 MQ 的 messageId
String orderId = msg.getOrderId();
// === ★★ 第一道:唯一约束判重 ===
try {
// ★ order_id 上有唯一索引;重复消息插这一步会冲突
deductLogMapper.insert(new DeductLog(orderId, msg.getSkuId()));
} catch (DuplicateKeyException e) {
// ★★ 插入冲突 = 这条消息处理过了 -> 直接跳过,不再扣
log.info("重复消息,已跳过扣库存: orderId={}", orderId);
return;
}
// === ★★ 第二道:状态机 + 乐观锁扣库存 ===
// ★ WHERE stock >= num 保证不会扣成负数;只有库存够才扣
int affected = stockMapper.deduct(msg.getSkuId(), msg.getNum());
if (affected == 0) {
// ★ 一行都没更新 —— 库存不足,抛异常让事务回滚
// (流水表那条插入也会跟着回滚,下次重投还能再试)
throw new IllegalStateException("库存不足: sku=" + msg.getSkuId());
}
log.info("扣库存成功: orderId={}, sku={}", orderId, msg.getSkuId());
}
private static final org.slf4j.Logger log =
org.slf4j.LoggerFactory.getLogger(IdempotentStockConsumer.class);
}
消息顺序:为什么会乱,以及怎么保证"该有序的有序"
# === ★ 先想清楚:你真的需要"全局有序"吗 ===
# === ★ 为什么默认是乱序的 ===
# ★ ★ 为了高吞吐,MQ 的一个 Topic 会拆成多个【分区/队列】
# (Kafka 叫 partition,RocketMQ 叫 queue)。生产者发的
# 消息,会被【打散】到这些分区里;消费者也是多个,
# 每人盯几个分区。
# ★ ★★ 于是:消息 A 进了分区 1、消息 B 进了分区 2,
# 两个分区被两个消费者【并行】处理 —— 谁先处理完,
# 完全看运气。A 比 B 先发,但 B 可能先被处理完。
# 这就是乱序的来源:【多分区 + 多消费者并行】。
# === ★ 关键认知:你几乎从不需要"全局有序" ===
# ★ ★ "全局有序" = 所有消息严格排成一队、一个一个处理。
# 要做到它,只能用【一个分区 + 一个消费者】,吞吐瞬间
# 退化成单线程 —— 代价大到几乎不可接受。
# ★ ★★ 而且你仔细想:订单 X 的"创建->支付->发货"必须
# 有序,订单 Y 的事件也必须有序 —— 但 X 和 Y 之间,
# 谁先谁后【根本无所谓】。你要的从来不是全局有序,
# 是【局部有序】:同一个 key(同一个订单)的消息有序。
# === ★★ 解法:把"同一个 key"的消息,固定发到同一个分区 ===
# ★ ★ 既然乱序来自"消息被打散到不同分区",那就别让
# 同一个订单的消息被打散。发消息时,用【订单号做
# 分区键(hash key)】:MQ 会对这个 key 做哈希,
# 保证同一个订单号的所有消息,永远落在【同一个分区】。
# ★ ★ 同一个分区内,消息是【严格先进先出】的;再让
# 这个分区【由同一个消费者、单线程】处理 —— 同一个
# 订单的消息就有序了。而不同订单仍分散在各分区里,
# 并行不受影响。局部有序,吞吐也保住了。
# === ★ 还有一个坑:消费端的"并发"会重新打乱顺序 ===
# ★ ★ 就算消息有序地到了消费者,如果消费者拿到一批
# 消息后,【丢进线程池并发处理】—— 顺序又乱了。
# ★ ★ 所以"局部有序"要求:同一个分区的消息,消费时
# 也必须【串行】。RocketMQ 的 MessageListenerOrderly
# 就是干这个的:它保证同一队列的消息逐条处理,处理
# 完一条再下一条。
# === 小结 ===
# ★ 先想清楚你真的需要全局有序吗。★ 为什么默认乱序:为了
# 高吞吐 Topic 拆成多个分区/队列,生产者发的消息被打散
# 到这些分区、消费者也是多个,消息 A 进分区 1、B 进分区
# 2 被两个消费者并行处理谁先完看运气,乱序来源是多分区+
# 多消费者并行。★ 关键认知你几乎从不需要全局有序:全局
# 有序要所有消息排成一队一个个处理,只能用一个分区+一个
# 消费者吞吐退化成单线程代价大到不可接受;而且订单 X 的
# "创建→支付→发货"必须有序、订单 Y 也必须有序,但 X 和
# Y 之间谁先谁后无所谓,你要的是局部有序(同一个订单的
# 消息有序)。★★ 解法把同一个 key 的消息固定发到同一个
# 分区:发消息时用订单号做分区键,MQ 对 key 做哈希保证
# 同一订单号所有消息落同一分区,分区内严格先进先出、再
# 让这分区由同一消费者单线程处理,同一订单消息就有序、
# 不同订单仍分散并行。★ 还有个坑消费端并发会重新打乱:
# 消息有序到了消费者但丢进线程池并发处理顺序又乱,局部
# 有序要求同一分区消息消费时也必须串行,RocketMQ 的
# MessageListenerOrderly 保证同一队列逐条处理。
// ★ 局部有序:生产端按订单号分区 + 消费端顺序消费
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
// ────────── 生产端:用订单号做分区键(hashKey)──────────
public class OrderedProducer {
private RocketMQTemplate rocketMQTemplate;
public void sendOrderEvent(OrderEvent event) {
// ★★ 关键:syncSendOrderly 的第三个参数 = 分区键
// MQ 对它哈希 -> 同一个 orderId 的消息永远落同一队列
rocketMQTemplate.syncSendOrderly(
"order-event-topic",
event,
event.getOrderId()); // ★ 同一订单 = 同一队列 = 有序
// ★ 不同订单的消息仍会散落到不同队列,并行不受影响
}
}
// ────────── 消费端:ConsumeMode.ORDERLY 保证同队列串行 ──────────
@RocketMQMessageListener(
topic = "order-event-topic",
consumerGroup = "order-event-consumer",
consumeMode = ConsumeMode.ORDERLY) // ★★ 顺序消费,不是并发
public class OrderedConsumer
implements org.apache.rocketmq.spring.core.RocketMQListener<OrderEvent> {
@Override
public void onMessage(OrderEvent event) {
// ★★ ORDERLY 模式下:同一个队列的消息,逐条处理,
// 处理完一条才下一条 —— 同一订单的 创建/支付/发货
// 严格按发送顺序到达
// ★ 注意:这里【绝不能】再把 event 丢进线程池异步处理,
// 那样会重新打乱顺序,前面的努力全白费
handleEvent(event);
}
private void handleEvent(OrderEvent event) { /* 处理订单事件 */ }
}
消息堆积:消费跟不上生产时,怎么救场
# === ★ 堆积:生产速度 > 消费速度,消息在 Broker 越积越多 ===
# === ★ 先判断:堆积是"突发"还是"持续" ===
# ★ ★ 突发型:大促瞬时流量打进来,生产端短时间猛发。
# 这种堆积是【正常】的 —— MQ 的核心价值之一就是
# "削峰填谷",让消息先在 Broker 里排队,消费端按
# 自己的节奏慢慢消化。只要消费端没坏,等流量过去,
# 堆积会自己消下去。
# ★ ★★ 持续型:堆积量【一直在涨】,几小时都不降。
# 这是【故障信号】—— 要么消费端挂了 / 卡了,要么
# 消费能力从根上就不够。这才是要紧急处理的。
# === ★ 排查持续堆积:先看是不是"消费端卡死了" ===
# ★ ★ 最常见的元凶:消费逻辑里有一个【慢操作】——
# 一次慢 SQL、一个慢的下游 HTTP 调用、甚至一个死锁。
# 单条消息处理从 10ms 变成 10s,消费速度直接掉 1000 倍。
# ★ ★ 所以排查第一步:看消费者线程在干嘛(jstack),
# 看消费耗时监控。十有八九是某个下游把它拖住了。
# === ★★ 紧急扩容:加消费者,但别忘了分区上限 ===
# ★ ★ 堆积了要提消费速度,最直接的是【加消费者实例】。
# 但有个硬限制:【一个分区,同时只能被一个消费者消费】。
# 如果 Topic 只有 4 个分区,你启了 10 个消费者,也只有
# 4 个在干活,另外 6 个【纯空转】。
# ★ ★ 所以:要么提前把分区数留够,要么扩容的同时也
# 【加分区】。这是堆积时最容易踩空的一脚。
# === ★ 终极手段:临时加一个"转存"消费者 ===
# ★ ★ 如果堆积已经严重到影响业务,而消费逻辑本身又快
# 不起来,有个应急招:临时写一个【极简消费者】,它
# 什么业务都不做,只把消息飞快地【转存到另一个 Topic
# 或数据库】里。先用它把堆积的消息以极高速度"接住、
# 挪走",解除 Broker 的压力;之后再不慌不忙地慢慢
# 消化那批转存的消息。
# === 认知 ===
# ★ 堆积是生产速度>消费速度消息在 Broker 越积越多。
# ★ 先判断堆积是突发还是持续:突发型是大促瞬时流量
# 生产端短时猛发,这种堆积正常 —— MQ 核心价值之一就是
# 削峰填谷让消息先在 Broker 排队消费端按自己节奏消化,
# 只要消费端没坏流量过去堆积会自己消下去;持续型是堆积
# 量一直涨几小时不降,这是故障信号(消费端挂了/卡了或
# 消费能力从根上不够)才要紧急处理。★ 排查持续堆积先看
# 是不是消费端卡死:最常见元凶是消费逻辑里有个慢操作
# (慢 SQL、慢的下游 HTTP、死锁),单条处理从 10ms 变
# 10s 消费速度掉 1000 倍,排查第一步看消费者线程在干嘛
# (jstack)看消费耗时监控。★★ 紧急扩容加消费者但别忘
# 分区上限:一个分区同时只能被一个消费者消费,Topic 只
# 4 个分区你启 10 个消费者也只 4 个干活另外 6 个空转,
# 要么提前留够分区要么扩容同时加分区。★ 终极手段临时加
# 一个转存消费者:堆积严重而消费逻辑快不起来时,临时写
# 极简消费者什么业务都不做只把消息飞快转存到另一个
# Topic 或数据库,先把堆积消息高速接住挪走解除 Broker
# 压力,之后再慢慢消化转存的消息。
// ★ 应急转存消费者:堆积时先"高速接住"消息,挪走再慢慢消化
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "emergency-rescue-consumer",
// ★★ 拉大批量 + 多线程,目标是"快",不做任何业务
consumeThreadMax = 64,
pullBatchSize = 32)
public class RescueConsumer implements RocketMQListener<OrderMessage> {
private RocketMQTemplate rocketMQTemplate;
private RescueLogMapper rescueLogMapper;
@Override
public void onMessage(OrderMessage msg) {
// ★★ 关键:这里【不碰任何业务逻辑】—— 不扣库存、不查 DB
// 只做一件事:把消息原样挪到一个"缓冲 Topic / 表"
// ★ 因为不做业务,单条处理是微秒级,消费速度极快,
// 能迅速把堆积在 Broker 的消息"抽干",解除告警
try {
rescueLogMapper.insertRaw(msg.getOrderId(), toJson(msg));
} catch (Exception e) {
// ★ 转存失败也别 ack,让它重投 —— 这批消息一条都不能再丢
throw new RuntimeException("转存失败", e);
}
// ★ 事后:再由一个正常速度的消费者,慢慢扫 rescue 表补处理
}
private String toJson(OrderMessage msg) { /* 序列化 */ return ""; }
}
工程坑:死信队列、消息回溯、事务消息、监控
# === ★ 主流程通了,还有四件事不做迟早出事 ===
# === ★ 坑一:死信队列 —— 给"反复失败的消息"一个归宿 ===
# ★ ★ 一条消息消费失败,MQ 会重投。但如果它【每次都
# 失败】(比如消息体本身就是坏的、或对应的数据被删了),
# 它会被无限重投,既消化不掉、又堵着后面的消息。
# ★ ★ MQ 的解法是【死信队列(DLQ)】:一条消息重投
# 超过 N 次仍失败,就把它【移进死信队列】,不再骚扰
# 正常流程。★★ 但死信队列【不是垃圾桶】—— 你必须
# 对它做监控和告警,定期有人去看里面积了什么、为什么
# 失败,该补数据补数据、该改代码改代码。没人管的死信
# 队列,等于把事故藏起来了。
# === ★ 坑二:消息回溯 —— 消费端有 bug 时的后悔药 ===
# ★ ★ 消费端代码有 bug,把过去一小时的消息都处理错了。
# 消息已经被 ack、"消费完"了,怎么补救?
# ★ ★ 像 Kafka / RocketMQ 这种【消息消费后不立即删除、
# 按时间保留】的 MQ,支持【重置消费位点(offset)】:
# 把消费进度【拨回】到一小时前,让修好的消费者把这段
# 消息【重新消费一遍】。这是个强大的后悔药 —— 但前提
# 是你的消费逻辑【幂等】,否则重放一遍就是重复处理一遍。
# === ★★ 坑三:本地事务与发消息,不是原子的 ===
# ★ ★ 一个经典坑:你在一个方法里"写数据库 + 发 MQ
# 消息"。这两件事【没法保证同时成功】:可能 DB 写成功
# 了、消息没发出去(下游永远收不到);也可能消息发了、
# DB 事务回滚了(下游收到一条"幽灵消息")。
# ★ ★ 解法:① MQ 的【事务消息】(RocketMQ 原生支持),
# 先发"半消息"、本地事务成功后再确认投递;② 或者更
# 通用的【本地消息表】:把"要发的消息"和业务数据写在
# 【同一个本地事务】里,再由定时任务把消息表里的记录
# 投到 MQ。后者不依赖特定 MQ,更普适。
# === ★ 坑四:监控,盯死"堆积量"和"消费延迟" ===
# ★ ★ MQ 出事,最早、最灵敏的信号就是【消息堆积量】
# (lag)。它一旦持续上涨,几乎一定有问题。
# ★ ★ 必须监控的几个指标:① 各 Topic 的堆积量;
# ② 消费延迟(消息从发出到被消费的时间差);
# ③ 消费失败率 / 死信队列消息数;④ 生产发送失败率。
# 这几个接进告警,大多数 MQ 事故能在影响业务前就发现。
# === 认知 ===
# ★ 主流程通了还有四件事不做迟早出事。★ 坑一死信队列给
# 反复失败的消息一个归宿:消息消费失败 MQ 会重投,但若
# 每次都失败(消息体本身坏了或对应数据被删)会被无限
# 重投既消化不掉又堵着后面的,MQ 解法是死信队列(重投
# 超 N 次仍失败就移进 DLQ 不再骚扰正常流程);但 DLQ
# 不是垃圾桶必须监控告警、定期有人看里面积了什么为什么
# 失败,没人管的死信队列等于把事故藏起来。★ 坑二消息
# 回溯是消费端有 bug 时的后悔药:消费端代码有 bug 把过去
# 一小时消息处理错了、消息已 ack 怎么补救,Kafka/RocketMQ
# 这种消费后不立即删除按时间保留的 MQ 支持重置消费位点
# offset,把消费进度拨回一小时前让修好的消费者重新消费
# 一遍,前提是消费逻辑幂等否则重放就是重复处理。★★ 坑三
# 本地事务与发消息不是原子的:一个方法里"写数据库+发 MQ
# 消息"没法保证同时成功 —— 可能 DB 写成功消息没发出
# (下游永远收不到)、也可能消息发了 DB 回滚了(下游收
# 到幽灵消息),解法是 MQ 事务消息(先发半消息本地事务
# 成功后再确认)或更通用的本地消息表(消息和业务数据写
# 同一本地事务再由定时任务投 MQ)。★ 坑四监控盯死堆积量
# 和消费延迟:MQ 出事最早最灵敏的信号是消息堆积量 lag,
# 必须监控各 Topic 堆积量、消费延迟、消费失败率/死信数、
# 生产发送失败率,接进告警大多数事故能在影响业务前发现。
// ★ 坑三正解:本地消息表 —— 让"业务写库"和"消息记录"同事务原子
import org.springframework.transaction.annotation.Transactional;
public class OrderServiceWithOutbox {
private OrderMapper orderMapper;
private OutboxMapper outboxMapper; // ★ 本地消息表(outbox)
private RocketMQTemplate rocketMQTemplate;
// === ① 业务写库 + 消息入表,在【同一个本地事务】里 ===
@Transactional
public void createOrder(Order order) {
orderMapper.insert(order); // ★ 业务数据
// ★★ 关键:不直接发 MQ,而是把"待发消息"写进 outbox 表
// 它和上面的 orderMapper.insert 在同一事务 ——
// 要么都成功,要么都回滚,杜绝"DB 成功但消息丢"
outboxMapper.insert(new OutboxMsg(
order.getOrderId(), "order-topic",
toJson(order), "PENDING"));
}
// === ② 定时任务:扫 outbox 表,把 PENDING 的消息投到 MQ ===
// ★ 建议 @Scheduled 每秒/每几秒跑一次
public void dispatchOutbox() {
for (OutboxMsg m : outboxMapper.selectPending(100)) {
try {
rocketMQTemplate.syncSend(m.getTopic(), m.getPayload());
outboxMapper.markSent(m.getId()); // ★ 发成功才标记
} catch (Exception e) {
// ★ 发失败不标记,下次定时任务会再扫到它、重发
// (所以消费端必须幂等 —— 这条可能被投多次)
log.warn("outbox 消息投递失败,待重试: {}", m.getId(), e);
}
}
}
private String toJson(Order order) { /* 序列化 */ return ""; }
private static final org.slf4j.Logger log =
org.slf4j.LoggerFactory.getLogger(OrderServiceWithOutbox.class);
}
一张图:一条消息从生产到消费的可靠性全链路
关键命令与配置速查
┌──────────────────────────────┬────────────────────────────────┐
│ 场景 / 命令 │ 说明 │
├──────────────────────────────┼────────────────────────────────┤
│ 投递语义 │ 默认 at-least-once = 不丢但会重复 │
│ 防丢-生产端 │ 同步发送 + 检查 SEND_OK + 失败补偿│
│ 防丢-Broker │ 同步刷盘 + 集群多副本 │
│ 防丢-消费端 │ 消费成功才 ack,失败不 ack │
│ 防重 │ 业务唯一键幂等(订单号),非 msgId │
│ 保顺序 │ 同 key 进同分区 + 消费端顺序消费 │
│ syncSendOrderly(topic,msg,key)│ RocketMQ 按 key 顺序发送 │
│ ConsumeMode.ORDERLY │ RocketMQ 顺序消费,同队列串行 │
│ 查堆积 (Kafka) │ kafka-consumer-groups --describe │
│ 重置位点 (Kafka) │ --reset-offsets --to-datetime │
│ 死信 Topic (RocketMQ) │ %DLQ%consumerGroup,需单独监控 │
│ 事务消息 / 本地消息表 │ 解决"写库 + 发消息"非原子问题 │
└──────────────────────────────┴────────────────────────────────┘
★ 排查 MQ 事故第一指标:消息堆积量 lag —— 持续上涨几乎必有问题
★ 心态:别想消灭重复,假设消息一定会重,让消费逻辑幂等
★ 顺序需求:99% 的场景只需局部有序(同订单有序),别上全局有序
避坑清单:接入消息队列前过一遍这 10 条
- 先搞清你的 MQ 是什么投递语义。主流 MQ 默认 at-least-once——不丢消息,但消息一定会重复。重复不是 bug,是写在说明书里的设计,你必须自己消化。
- 别追求 exactly-once,用"at-least-once + 幂等"。严格的端到端恰好一次极难实现且代价高。工程上的正解是接受重复投递,靠消费端幂等达到"效果上恰好一次"。
- 消息丢失要在三处都设防。生产端(同步发送+检查结果)、Broker(同步刷盘+多副本)、消费端(成功才 ack)。任何一环裸奔,消息都会丢。
- 消费端幂等键用业务 ID,不要用 MQ 的 messageId。同一条业务消息重投时 messageId 可能变。用订单号,或生产时就生成的 UUID——重投多少次都不变的那个。
- 幂等优先用数据库唯一约束。往流水表插记录、订单号设唯一索引,重复消息插入冲突即跳过。判重和业务在同一事务,天然原子,比"先查再写"可靠。
- 你要的是局部有序,不是全局有序。全局有序会把吞吐打回单线程。把同一个 key(订单号)的消息哈希到同一分区,分区内有序即可,不同 key 仍并行。
- 顺序消费时,消费端绝不能再开线程池。消息有序到达后,若丢进线程池并发处理,顺序立刻又乱。同一分区的消息,消费时必须串行。
- 分清突发堆积和持续堆积。突发堆积是 MQ 削峰的正常表现,会自己消下去。持续上涨不降才是故障信号,先排查消费端是不是被慢操作卡住了。
- 扩容消费者别忘了分区上限。一个分区同时只能被一个消费者消费。分区数若小于消费者数,多出来的消费者纯空转。扩容要连分区一起加。
- 死信队列必须有人管,监控盯死堆积量。死信队列不是垃圾桶,积压的失败消息要定期排查。堆积量 lag 是 MQ 最灵敏的故障信号,务必接告警。
总结:消息队列的可靠性,是设计出来的
那次「库存被扣三次」的事故复盘完,我最大的感受不是「学会了几个技巧」,而是一个心态上的转变。出事那几天,我满脑子想的是「MQ 怎么能重复投递,这不合理」——我把消息队列当成了一个「我发什么、它就原样、不多不少送达什么」的可靠管道。可它从来不是。MQ 给你的承诺,白纸黑字写在投递语义那一行:默认情况下,它保证「至少一次」——它不会弄丢你的消息,但它明确地、不打算瞒着你地告诉你:同一条消息,它可能送给你好几次。我以为的「bug」,只是我没读完它的说明书。
读懂了这件事,后面的一切就顺了。消息会重复,所以消费端必须幂等——用业务唯一键、用数据库唯一约束、用状态机,让同一条消息处理一次和处理十次,结果完全一样。消息会丢,而且会在生产端、Broker、消费端三个不同的地方丢,所以三个环节都要设防——同步发送并检查结果、同步刷盘加多副本、消费成功才 ack。消息会乱序,因为多分区多消费者天生并行,所以把同一个订单的消息哈希到同一分区、再让它串行消费,换来够用的局部有序。消息会堆积,所以要分清突发和持续、要监控 lag、要预留分区、要备好应急转存的手段。这些都不是「用了 MQ 就自动拥有」的,每一条,都得你自己亲手设计进去。
所以如果让我把这次复盘收成一句话,那就是:消息队列让系统解耦、削峰、异步,但它不会顺便把可靠性也送给你——可靠性,是你一行一行设计出来的。引入 MQ 的那一刻,你其实是把一次「同步的、要么成功要么失败」的简单调用,换成了一个「异步的、可能丢、可能重、可能乱、可能堆」的分布式过程。你换来了性能和解耦,代价是你必须亲自处理这个过程里的每一种「可能」。
那次事故之后,我给团队定了一条规矩:任何人接入一个新的 MQ Topic,提交评审前,必须能回答四个问题——这条消息丢了会怎样、重了会怎样、乱序了会怎样、堆积了会怎样。答不上来,就说明这个 Topic 还没设计完。库存被扣成负数的那张对账单,后来一直贴在我工位旁边。它提醒我的不是「MQ 不可靠」,而是:可靠的系统,从来不是用上了某个可靠的中间件就有的,它是被人一处一处、认认真真设计出来的。
—— 别看了 · 2026