消息队列完全指南:从一次库存扣三次的事故看懂丢失、重复、顺序怎么治

2023 年我维护一个电商订单系统,用消息队列把下单后的扣库存/加积分/发短信改成异步——订单落库后发一条"订单已创建"消息交给各消费者处理。某次大促运营找来说爆款商品库存对不上:系统显示卖 1200 件实际出库只有 800 多,库存被扣多了。拉扣库存消费者日志顺着一个订单号查,愣住了:同一个订单的"扣库存"消息被消费了三次。我第一反应是"MQ 出 bug 了怎么会重复投递",准备提工单找 MQ 团队,直到真去翻 MQ 文档"投递语义"那节才明白错得多离谱——MQ 默认的"至少一次 at-least-once"投递本来就意味着消息会重复,重复不是 bug 是写在说明书第一页的设计。梳理:三种投递语义先认清,at-most-once 允许丢消息(提前 ack)、at-least-once 不丢但会重复(处理完再 ack 是默认语义)、exactly-once 极难实现工程主流是用 at-least-once+消费端幂等达到"效果上恰好一次"。消息丢失要在生产端(同步发送+检查 SEND_OK+失败补偿)、Broker(同步刷盘+集群多副本)、消费端(成功才 ack)三处都设防。消息重复别想消灭它,用业务唯一键(订单号非 messageId)做幂等键、靠数据库唯一约束和状态机乐观锁让重复执行 N 次和 1 次结果一样。消息顺序你要的是局部有序不是全局有序,把同一订单号的消息哈希进同一分区+消费端顺序消费即可,顺序消费时绝不能再开线程池。消息堆积要分清突发(削峰正常会自己消)和持续(故障信号),扩容消费者别忘一个分区只能被一个消费者消费的上限,应急可临时加转存消费者高速接住消息。四个工程坑:死信队列要有人管别当垃圾桶、消息回溯靠重置消费位点是后悔药、本地事务与发消息不原子要用事务消息或本地消息表、监控盯死堆积量 lag。核心一句:消息队列让系统解耦削峰异步,但不会顺便把可靠性送给你——可靠性是你一行一行设计出来的。

2023 年,我维护一个电商的订单系统。下单这个动作的链路很长:扣库存、加积分、发短信、通知物流……如果全塞在一个同步请求里做,用户点一下「提交订单」要转好几秒。所以我们用了消息队列(MQ):订单主流程只管把订单落库,然后往 MQ 发一条「订单已创建」的消息,后面那些扣库存、加积分的事,交给各自的消费者去异步处理。这套架构跑得挺好,直到一次大促。运营找过来,说有几个爆款商品的库存对不上——系统里显示卖了 1200 件,实际出库记录只有 800 多件,库存被扣多了。我把扣库存的消费者日志拉出来,顺着一个订单号查,当场就愣住了:同一个订单的「扣库存」消息,被消费了三次。我的第一反应是「MQ 出 bug 了,怎么会重复投递」。我甚至准备提工单找 MQ 团队。可当我真正去翻 MQ 的文档,翻到「投递语义」那一节,才明白自己错得有多离谱:MQ 默认的「至少一次(at-least-once)」投递,本来就意味着消息会重复——重复不是 bug,是这套机制写在说明书第一页的设计。是我,从头到尾都没想过这件事。那次事故逼着我把消息队列最核心的几个问题——消息会不会丢、会不会重、会不会乱序、堆积了怎么办——彻底搞懂了一遍。本文是这份复盘。

问题背景:一次"库存被扣了三次"的大促事故

背景:电商订单系统,用 MQ 把下单后的扣库存/加积分/
     发短信改成异步 —— 订单落库后发一条"订单已创建"消息

某次大促,出事:
- ★★ 爆款商品库存对不上:系统显示卖 1200 件,
     实际出库记录只有 800 多 —— 库存被扣多了

★ 拉扣库存消费者日志,顺着一个订单号查 —— 愣住了:
  同一个订单的"扣库存"消息,被消费了【三次】

★ 第一反应:"MQ 出 bug 了,怎么会重复投递",
  准备提工单找 MQ 团队

★★ 真去翻 MQ 文档"投递语义"那节,才明白错得多离谱:
- MQ 默认的【至少一次 at-least-once】投递,
  本来就意味着消息会重复
- 重复不是 bug,是写在说明书第一页的设计
- 是我从头到尾没想过这件事

★ 本文要做的:把消息会不会丢、会不会重、会不会乱序、
  堆积了怎么办,彻底搞懂一遍。

三种投递语义:先认清你用的 MQ"承诺"了什么

# === ★ 一切问题的源头,是没搞懂"投递语义" ===

# === ★ 语义一:at-most-once(至多一次)===
# ★ ★ 消息【最多】被投递一次 —— 可能一次,也可能零次。
#   它的潜台词是:【允许丢消息】。
# ★ ★ 怎么会丢?比如消费者一拿到消息,【立刻就告诉
#   MQ"我收到了"】(提前 ack),然后才慢慢处理。要是
#   ack 之后、处理之前崩了,这条消息就【永远丢了】——
#   MQ 以为它处理完了,不会再投。
# ★ ★ 适用:能容忍丢一点的场景(如打点日志、监控埋点)。

# === ★★ 语义二:at-least-once(至少一次)===
# ★ ★ 消息【至少】被投递一次 —— 可能一次,也可能多次。
#   它的潜台词是:【不丢,但会重复】。
# ★ ★ 怎么会重?消费者【处理完业务,再 ack】。要是
#   业务处理成功了、但 ack 还没发出去就崩了 —— MQ 没
#   收到 ack,会认为"这条没人消费",于是【重新投递】。
#   于是同一条消息被处理了两次。
# ★ ★★ 这是【绝大多数 MQ 的默认语义】,也是开头那次
#   "扣三次"事故的根源。你用 Kafka、RocketMQ、RabbitMQ,
#   默认拿到的就是它 —— 重复,是你必须自己消化的代价。

# === ★ 语义三:exactly-once(恰好一次)===
# ★ ★ 消息【不多不少,恰好】被处理一次。听起来最完美,
#   人人都想要。
# ★ ★★ 但要清醒:严格的、端到端的 exactly-once【极难
#   实现】,代价也极高。市面上号称 exactly-once 的方案
#   (如 Kafka 事务),往往只覆盖"MQ 内部"那一段,
#   一旦消息要落到【你的数据库】,这个保证就断了。
# ★ ★ 所以工程上的主流答案不是去追求 exactly-once,
#   而是:【用 at-least-once + 消费端幂等】,达到一个
#   "效果上恰好一次"的结果。

# === 小结 ===
# ★ 一切问题源头是没搞懂投递语义。★ at-most-once 至多
#   一次:消息最多投递一次可能一次也可能零次,潜台词是
#   允许丢消息 —— 消费者一拿到就提前 ack 再慢慢处理,
#   ack 后处理前崩了消息就永远丢了,适用能容忍丢的场景
#   (日志埋点)。★★ at-least-once 至少一次:消息至少
#   投递一次可能一次也可能多次,潜台词是不丢但会重复 ——
#   消费者处理完业务再 ack,业务成功但 ack 没发出就崩了
#   MQ 没收到 ack 会重新投递,同一条被处理两次;这是
#   绝大多数 MQ 的默认语义,也是开头"扣三次"事故根源,
#   重复是你必须自己消化的代价。★ exactly-once 恰好一次:
#   不多不少恰好处理一次听起来最完美,但严格端到端的
#   exactly-once 极难实现代价极高,号称的方案往往只覆盖
#   MQ 内部、消息落到你的数据库保证就断了;工程主流答案
#   不是追求 exactly-once 而是用 at-least-once+消费端
#   幂等,达到"效果上恰好一次"。
// ★ 对比:at-most-once vs at-least-once,差别只在"何时 ack"
import org.apache.rocketmq.spring.core.RocketMQListener;

// ────────── ✗ at-most-once:先 ack 再处理 —— 会丢消息 ──────────
public class AtMostOnceConsumer {
    public void onMessage(OrderMessage msg, AckCallback ack) {
        ack.confirm();                       // ★✗ 一拿到就 ack
        // ★★ 若这之后、deductStock 之前进程崩了 ->
        //    MQ 以为已消费,不再投递 -> 这条消息永久丢失
        deductStock(msg.getOrderId());
    }
}

// ────────── ✓ at-least-once:先处理再 ack —— 不丢但会重 ──────────
public class AtLeastOnceConsumer {
    public void onMessage(OrderMessage msg, AckCallback ack) {
        deductStock(msg.getOrderId());       // ★ 先把业务做完
        ack.confirm();                       // ★ 成功了才 ack
        // ★★ 若 deductStock 成功、但 ack 之前崩了 ->
        //    MQ 没收到 ack -> 认为没人消费 -> 重新投递 ->
        //    同一条消息被处理两次(开头事故就是这么来的)
    }

    private void deductStock(String orderId) { /* 扣库存 */ }
}
// ★ 结论:默认就是 at-least-once。别想着消灭重复,
//   要做的是"让重复处理多少次,结果都一样" —— 即幂等

消息丢失:生产端、Broker、消费端,三处都会丢

# === ★ "消息丢了"不是一个点,是一条链路上的三个点 ===

# === ★ 丢失点一:生产端 —— 消息没发到 Broker ===
# ★ ★ 你的代码调了 send(),但网络抖动、Broker 繁忙,
#   这条消息【根本没到】Broker。如果你用的是【异步发送】
#   且【不检查回调结果】,你的代码会以为"发成功了",
#   实际它已经丢在半路。
# ★ ★ 治:用【同步发送】并检查返回值,或异步发送但在
#   回调里处理失败 —— 失败了就【重试】或【落库补偿】。
#   关键是:发送结果,你必须【真的去确认】。

# === ★★ 丢失点二:Broker —— 消息到了但没存住 ===
# ★ ★ 消息到了 Broker,但 Broker 为了快,先把它放在
#   【内存】里,还没来得及刷到磁盘。这时 Broker 宕机,
#   内存里那批消息就【全没了】。
# ★ ★ 治两条:① 刷盘策略尽量用【同步刷盘】(消息落盘
#   了才算成功,慢一点但稳);② Broker 要做【集群 +
#   多副本】—— 一条消息要复制到多个节点都存了,才算
#   写成功,单个节点挂了不影响。

# === ★ 丢失点三:消费端 —— 消息处理失败却 ack 了 ===
# ★ ★ 这就是上一节说的:提前 ack(at-most-once),
#   或者业务抛了异常你却把它吞掉、照样 ack。MQ 一看
#   ack 了,就再也不投这条消息了 —— 等于消费端丢了它。
# ★ ★ 治:消费成功才 ack;消费失败【不要 ack】(或
#   显式返回失败),让 MQ 重新投递。处理逻辑里别用
#   catch 把异常一吞了之。

# === ★ 一个贯穿三处的思路:可靠性是有成本的 ===
# ★ ★ 同步发送比异步慢、同步刷盘比异步刷盘慢、多副本
#   比单副本占资源。"一条消息绝对不丢",是用吞吐和延迟
#   换来的。所以要分场景:支付、订单这种【一条都不能丢】,
#   就得付这个成本;而日志、埋点,丢一点无所谓,就别为
#   它上最重的配置。

# === 小结 ===
# ★ "消息丢了"不是一个点是一条链路上的三个点。★ 丢失点
#   一生产端:代码调了 send() 但网络抖动/Broker 繁忙消息
#   根本没到 Broker,若用异步发送且不检查回调结果代码会
#   以为发成功了实际丢在半路,治法是同步发送并检查返回值
#   或异步发送在回调里处理失败(重试或落库补偿)。★★ 丢失
#   点二 Broker:消息到了 Broker 但为了快先放内存还没刷
#   磁盘,这时 Broker 宕机内存那批全没了,治两条 —— 刷盘
#   尽量用同步刷盘(落盘才算成功)、Broker 做集群+多副本
#   (复制到多节点才算写成功)。★ 丢失点三消费端:提前
#   ack 或业务抛异常被吞掉照样 ack,MQ 一看 ack 就不再投
#   等于消费端丢了它,治法是消费成功才 ack、失败不要 ack
#   让 MQ 重投、别用 catch 把异常一吞了之。★ 贯穿三处的
#   思路:可靠性有成本 —— 同步发送比异步慢、同步刷盘比
#   异步慢、多副本比单副本占资源,"一条都不丢"是用吞吐
#   和延迟换的,支付订单付这个成本、日志埋点别上最重配置。
// ★ 防丢失:生产端同步发送 + 检查结果 + 失败补偿
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

public class ReliableProducer {

    private RocketMQTemplate rocketMQTemplate;
    private MsgBackupMapper backupMapper;     // ★ 失败消息落库表

    public void sendOrderMessage(OrderMessage msg) {
        try {
            // ★★ 用同步发送 —— 它会【等 Broker 真正确认】才返回
            SendResult result = rocketMQTemplate.syncSend(
                    "order-topic", msg);

            // ★★ 关键:发送结果必须【真的去检查】,不能调完就不管
            if (result.getSendStatus() != SendStatus.SEND_OK) {
                // ★ 没到"SEND_OK" —— 视为发送失败,走补偿
                handleSendFailure(msg);
            }
        } catch (Exception e) {
            // ★ 抛异常(网络断、超时)同样是发送失败
            handleSendFailure(msg);
        }
    }

    // ★ 补偿:把发失败的消息落库,由定时任务后续重发
    private void handleSendFailure(OrderMessage msg) {
        backupMapper.insert(new MsgBackup(
                msg.getOrderId(), toJson(msg), "PENDING_RETRY"));
        // ★★ 这样即使现在发不出去,消息也不会凭空消失 ——
        //    定时任务扫 PENDING_RETRY 的记录,重发,发成功再标记
    }

    private String toJson(OrderMessage msg) { /* 序列化 */ return ""; }
}

消息重复:别想着消灭它,让消费幂等

# === ★ 接受现实:at-least-once 下,重复无法被消灭 ===

# === ★ 为什么重复无法根除 ===
# ★ ★ 上面讲过:消费者"业务成功、ack 之前崩了",MQ
#   就会重投。除此之外,生产端发送超时后【自动重试】、
#   消费端【负载均衡 rebalance】,都会制造重复。
# ★ ★★ 你不可能堵死所有这些缝隙。所以正确的心态不是
#   "我要让消息绝不重复",而是【"我假设消息一定会重复,
#   我让我的消费逻辑,重复执行 N 次和执行 1 次,结果
#   完全一样"】—— 这就叫【幂等】。

# === ★ 幂等的核心:给每条消息一个"唯一身份" ===
# ★ ★ 要识别"这条我处理过了",前提是每条消息有一个
#   全局唯一、且【业务上稳定】的 ID。
# ★ ★★ 注意:别用 MQ 自带的 messageId 当幂等键 ——
#   同一条业务消息重投时,messageId 可能是变的。要用
#   【业务自己的唯一标识】:订单号、或生产者生成消息
#   时就带上的一个 UUID。这个 ID,重投多少次都不变。

# === ★★ 方案一:唯一约束 —— 让数据库帮你挡 ===
# ★ ★ 最稳的幂等,是借数据库的【唯一索引】。比如扣库存
#   时,往一张"扣减流水表"插一条记录,把订单号设成唯一
#   索引。第一次插入成功 -> 执行扣库存;重复消息再来,
#   插入会【因唯一键冲突而失败】-> 你捕获这个冲突,
#   直接跳过,不再扣。
# ★ ★ 好处:判重和业务在【同一个事务】里,天然原子,
#   不会出现"判重过了但业务没做"的中间态。

# === ★ 方案二:状态机 —— 用业务状态天然防重 ===
# ★ ★ 很多业务自带状态。比如订单有"待支付 -> 已支付"。
#   处理"支付成功"消息时,先看订单当前状态:已经是
#   "已支付"了?那这条消息就是重复的,直接跳过。
# ★ ★ 更新时用【乐观锁】:UPDATE ... SET status='已支付'
#   WHERE status='待支付'。只有第一条消息能更新成功,
#   后到的重复消息,WHERE 条件不满足,影响行数为 0,
#   自然就被挡住了。

# === 小结 ===
# ★ 接受现实:at-least-once 下重复无法被消灭。★ 为什么
#   无法根除:消费者"业务成功 ack 前崩了"会重投,生产端
#   发送超时自动重试、消费端 rebalance 也都会制造重复,
#   你堵不死所有缝隙;正确心态不是"让消息绝不重复"而是
#   "假设消息一定会重复,让消费逻辑重复执行 N 次和 1 次
#   结果完全一样" —— 这就叫幂等。★ 幂等核心是给每条消息
#   一个唯一身份:要识别"这条处理过了"前提是每条消息有
#   全局唯一且业务上稳定的 ID,别用 MQ 自带 messageId
#   (重投时可能变),要用业务自己的唯一标识(订单号或
#   生产时就带上的 UUID)重投多少次都不变。★★ 方案一
#   唯一约束让数据库帮你挡:往"扣减流水表"插记录把订单号
#   设唯一索引,第一次插入成功就执行扣库存、重复消息再来
#   插入因唯一键冲突失败你捕获冲突直接跳过,好处是判重和
#   业务在同一事务里天然原子。★ 方案二状态机用业务状态
#   天然防重:订单有"待支付→已支付",处理"支付成功"消息
#   先看当前状态已是"已支付"就是重复直接跳过,更新用乐观锁
#   UPDATE SET status='已支付' WHERE status='待支付',只有
#   第一条能更新成功、后到的重复消息影响行数为 0 被挡住。
// ★ 消费端幂等:唯一约束 + 状态机,双保险(开头事故的正解)
import org.springframework.dao.DuplicateKeyException;
import org.springframework.transaction.annotation.Transactional;

public class IdempotentStockConsumer {

    private DeductLogMapper deductLogMapper;   // 扣减流水表,order_id 唯一索引
    private StockMapper stockMapper;

    @Transactional
    public void onMessage(OrderMessage msg) {
        // ★ 幂等键:用【业务订单号】,不要用 MQ 的 messageId
        String orderId = msg.getOrderId();

        // === ★★ 第一道:唯一约束判重 ===
        try {
            // ★ order_id 上有唯一索引;重复消息插这一步会冲突
            deductLogMapper.insert(new DeductLog(orderId, msg.getSkuId()));
        } catch (DuplicateKeyException e) {
            // ★★ 插入冲突 = 这条消息处理过了 -> 直接跳过,不再扣
            log.info("重复消息,已跳过扣库存: orderId={}", orderId);
            return;
        }

        // === ★★ 第二道:状态机 + 乐观锁扣库存 ===
        // ★ WHERE stock >= num 保证不会扣成负数;只有库存够才扣
        int affected = stockMapper.deduct(msg.getSkuId(), msg.getNum());
        if (affected == 0) {
            // ★ 一行都没更新 —— 库存不足,抛异常让事务回滚
            //   (流水表那条插入也会跟着回滚,下次重投还能再试)
            throw new IllegalStateException("库存不足: sku=" + msg.getSkuId());
        }
        log.info("扣库存成功: orderId={}, sku={}", orderId, msg.getSkuId());
    }

    private static final org.slf4j.Logger log =
            org.slf4j.LoggerFactory.getLogger(IdempotentStockConsumer.class);
}

消息顺序:为什么会乱,以及怎么保证"该有序的有序"

# === ★ 先想清楚:你真的需要"全局有序"吗 ===

# === ★ 为什么默认是乱序的 ===
# ★ ★ 为了高吞吐,MQ 的一个 Topic 会拆成多个【分区/队列】
#   (Kafka 叫 partition,RocketMQ 叫 queue)。生产者发的
#   消息,会被【打散】到这些分区里;消费者也是多个,
#   每人盯几个分区。
# ★ ★★ 于是:消息 A 进了分区 1、消息 B 进了分区 2,
#   两个分区被两个消费者【并行】处理 —— 谁先处理完,
#   完全看运气。A 比 B 先发,但 B 可能先被处理完。
#   这就是乱序的来源:【多分区 + 多消费者并行】。

# === ★ 关键认知:你几乎从不需要"全局有序" ===
# ★ ★ "全局有序" = 所有消息严格排成一队、一个一个处理。
#   要做到它,只能用【一个分区 + 一个消费者】,吞吐瞬间
#   退化成单线程 —— 代价大到几乎不可接受。
# ★ ★★ 而且你仔细想:订单 X 的"创建->支付->发货"必须
#   有序,订单 Y 的事件也必须有序 —— 但 X 和 Y 之间,
#   谁先谁后【根本无所谓】。你要的从来不是全局有序,
#   是【局部有序】:同一个 key(同一个订单)的消息有序。

# === ★★ 解法:把"同一个 key"的消息,固定发到同一个分区 ===
# ★ ★ 既然乱序来自"消息被打散到不同分区",那就别让
#   同一个订单的消息被打散。发消息时,用【订单号做
#   分区键(hash key)】:MQ 会对这个 key 做哈希,
#   保证同一个订单号的所有消息,永远落在【同一个分区】。
# ★ ★ 同一个分区内,消息是【严格先进先出】的;再让
#   这个分区【由同一个消费者、单线程】处理 —— 同一个
#   订单的消息就有序了。而不同订单仍分散在各分区里,
#   并行不受影响。局部有序,吞吐也保住了。

# === ★ 还有一个坑:消费端的"并发"会重新打乱顺序 ===
# ★ ★ 就算消息有序地到了消费者,如果消费者拿到一批
#   消息后,【丢进线程池并发处理】—— 顺序又乱了。
# ★ ★ 所以"局部有序"要求:同一个分区的消息,消费时
#   也必须【串行】。RocketMQ 的 MessageListenerOrderly
#   就是干这个的:它保证同一队列的消息逐条处理,处理
#   完一条再下一条。

# === 小结 ===
# ★ 先想清楚你真的需要全局有序吗。★ 为什么默认乱序:为了
#   高吞吐 Topic 拆成多个分区/队列,生产者发的消息被打散
#   到这些分区、消费者也是多个,消息 A 进分区 1、B 进分区
#   2 被两个消费者并行处理谁先完看运气,乱序来源是多分区+
#   多消费者并行。★ 关键认知你几乎从不需要全局有序:全局
#   有序要所有消息排成一队一个个处理,只能用一个分区+一个
#   消费者吞吐退化成单线程代价大到不可接受;而且订单 X 的
#   "创建→支付→发货"必须有序、订单 Y 也必须有序,但 X 和
#   Y 之间谁先谁后无所谓,你要的是局部有序(同一个订单的
#   消息有序)。★★ 解法把同一个 key 的消息固定发到同一个
#   分区:发消息时用订单号做分区键,MQ 对 key 做哈希保证
#   同一订单号所有消息落同一分区,分区内严格先进先出、再
#   让这分区由同一消费者单线程处理,同一订单消息就有序、
#   不同订单仍分散并行。★ 还有个坑消费端并发会重新打乱:
#   消息有序到了消费者但丢进线程池并发处理顺序又乱,局部
#   有序要求同一分区消息消费时也必须串行,RocketMQ 的
#   MessageListenerOrderly 保证同一队列逐条处理。
// ★ 局部有序:生产端按订单号分区 + 消费端顺序消费
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;

// ────────── 生产端:用订单号做分区键(hashKey)──────────
public class OrderedProducer {
    private RocketMQTemplate rocketMQTemplate;

    public void sendOrderEvent(OrderEvent event) {
        // ★★ 关键:syncSendOrderly 的第三个参数 = 分区键
        //    MQ 对它哈希 -> 同一个 orderId 的消息永远落同一队列
        rocketMQTemplate.syncSendOrderly(
                "order-event-topic",
                event,
                event.getOrderId());        // ★ 同一订单 = 同一队列 = 有序
        // ★ 不同订单的消息仍会散落到不同队列,并行不受影响
    }
}

// ────────── 消费端:ConsumeMode.ORDERLY 保证同队列串行 ──────────
@RocketMQMessageListener(
        topic = "order-event-topic",
        consumerGroup = "order-event-consumer",
        consumeMode = ConsumeMode.ORDERLY)   // ★★ 顺序消费,不是并发
public class OrderedConsumer
        implements org.apache.rocketmq.spring.core.RocketMQListener<OrderEvent> {

    @Override
    public void onMessage(OrderEvent event) {
        // ★★ ORDERLY 模式下:同一个队列的消息,逐条处理,
        //    处理完一条才下一条 —— 同一订单的 创建/支付/发货
        //    严格按发送顺序到达
        // ★ 注意:这里【绝不能】再把 event 丢进线程池异步处理,
        //    那样会重新打乱顺序,前面的努力全白费
        handleEvent(event);
    }

    private void handleEvent(OrderEvent event) { /* 处理订单事件 */ }
}

消息堆积:消费跟不上生产时,怎么救场

# === ★ 堆积:生产速度 > 消费速度,消息在 Broker 越积越多 ===

# === ★ 先判断:堆积是"突发"还是"持续" ===
# ★ ★ 突发型:大促瞬时流量打进来,生产端短时间猛发。
#   这种堆积是【正常】的 —— MQ 的核心价值之一就是
#   "削峰填谷",让消息先在 Broker 里排队,消费端按
#   自己的节奏慢慢消化。只要消费端没坏,等流量过去,
#   堆积会自己消下去。
# ★ ★★ 持续型:堆积量【一直在涨】,几小时都不降。
#   这是【故障信号】—— 要么消费端挂了 / 卡了,要么
#   消费能力从根上就不够。这才是要紧急处理的。

# === ★ 排查持续堆积:先看是不是"消费端卡死了" ===
# ★ ★ 最常见的元凶:消费逻辑里有一个【慢操作】——
#   一次慢 SQL、一个慢的下游 HTTP 调用、甚至一个死锁。
#   单条消息处理从 10ms 变成 10s,消费速度直接掉 1000 倍。
# ★ ★ 所以排查第一步:看消费者线程在干嘛(jstack),
#   看消费耗时监控。十有八九是某个下游把它拖住了。

# === ★★ 紧急扩容:加消费者,但别忘了分区上限 ===
# ★ ★ 堆积了要提消费速度,最直接的是【加消费者实例】。
#   但有个硬限制:【一个分区,同时只能被一个消费者消费】。
#   如果 Topic 只有 4 个分区,你启了 10 个消费者,也只有
#   4 个在干活,另外 6 个【纯空转】。
# ★ ★ 所以:要么提前把分区数留够,要么扩容的同时也
#   【加分区】。这是堆积时最容易踩空的一脚。

# === ★ 终极手段:临时加一个"转存"消费者 ===
# ★ ★ 如果堆积已经严重到影响业务,而消费逻辑本身又快
#   不起来,有个应急招:临时写一个【极简消费者】,它
#   什么业务都不做,只把消息飞快地【转存到另一个 Topic
#   或数据库】里。先用它把堆积的消息以极高速度"接住、
#   挪走",解除 Broker 的压力;之后再不慌不忙地慢慢
#   消化那批转存的消息。

# === 认知 ===
# ★ 堆积是生产速度>消费速度消息在 Broker 越积越多。
#   ★ 先判断堆积是突发还是持续:突发型是大促瞬时流量
#   生产端短时猛发,这种堆积正常 —— MQ 核心价值之一就是
#   削峰填谷让消息先在 Broker 排队消费端按自己节奏消化,
#   只要消费端没坏流量过去堆积会自己消下去;持续型是堆积
#   量一直涨几小时不降,这是故障信号(消费端挂了/卡了或
#   消费能力从根上不够)才要紧急处理。★ 排查持续堆积先看
#   是不是消费端卡死:最常见元凶是消费逻辑里有个慢操作
#   (慢 SQL、慢的下游 HTTP、死锁),单条处理从 10ms 变
#   10s 消费速度掉 1000 倍,排查第一步看消费者线程在干嘛
#   (jstack)看消费耗时监控。★★ 紧急扩容加消费者但别忘
#   分区上限:一个分区同时只能被一个消费者消费,Topic 只
#   4 个分区你启 10 个消费者也只 4 个干活另外 6 个空转,
#   要么提前留够分区要么扩容同时加分区。★ 终极手段临时加
#   一个转存消费者:堆积严重而消费逻辑快不起来时,临时写
#   极简消费者什么业务都不做只把消息飞快转存到另一个
#   Topic 或数据库,先把堆积消息高速接住挪走解除 Broker
#   压力,之后再慢慢消化转存的消息。
// ★ 应急转存消费者:堆积时先"高速接住"消息,挪走再慢慢消化
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "emergency-rescue-consumer",
        // ★★ 拉大批量 + 多线程,目标是"快",不做任何业务
        consumeThreadMax = 64,
        pullBatchSize = 32)
public class RescueConsumer implements RocketMQListener<OrderMessage> {

    private RocketMQTemplate rocketMQTemplate;
    private RescueLogMapper rescueLogMapper;

    @Override
    public void onMessage(OrderMessage msg) {
        // ★★ 关键:这里【不碰任何业务逻辑】—— 不扣库存、不查 DB
        //    只做一件事:把消息原样挪到一个"缓冲 Topic / 表"
        // ★ 因为不做业务,单条处理是微秒级,消费速度极快,
        //   能迅速把堆积在 Broker 的消息"抽干",解除告警
        try {
            rescueLogMapper.insertRaw(msg.getOrderId(), toJson(msg));
        } catch (Exception e) {
            // ★ 转存失败也别 ack,让它重投 —— 这批消息一条都不能再丢
            throw new RuntimeException("转存失败", e);
        }
        // ★ 事后:再由一个正常速度的消费者,慢慢扫 rescue 表补处理
    }

    private String toJson(OrderMessage msg) { /* 序列化 */ return ""; }
}

工程坑:死信队列、消息回溯、事务消息、监控

# === ★ 主流程通了,还有四件事不做迟早出事 ===

# === ★ 坑一:死信队列 —— 给"反复失败的消息"一个归宿 ===
# ★ ★ 一条消息消费失败,MQ 会重投。但如果它【每次都
#   失败】(比如消息体本身就是坏的、或对应的数据被删了),
#   它会被无限重投,既消化不掉、又堵着后面的消息。
# ★ ★ MQ 的解法是【死信队列(DLQ)】:一条消息重投
#   超过 N 次仍失败,就把它【移进死信队列】,不再骚扰
#   正常流程。★★ 但死信队列【不是垃圾桶】—— 你必须
#   对它做监控和告警,定期有人去看里面积了什么、为什么
#   失败,该补数据补数据、该改代码改代码。没人管的死信
#   队列,等于把事故藏起来了。

# === ★ 坑二:消息回溯 —— 消费端有 bug 时的后悔药 ===
# ★ ★ 消费端代码有 bug,把过去一小时的消息都处理错了。
#   消息已经被 ack、"消费完"了,怎么补救?
# ★ ★ 像 Kafka / RocketMQ 这种【消息消费后不立即删除、
#   按时间保留】的 MQ,支持【重置消费位点(offset)】:
#   把消费进度【拨回】到一小时前,让修好的消费者把这段
#   消息【重新消费一遍】。这是个强大的后悔药 —— 但前提
#   是你的消费逻辑【幂等】,否则重放一遍就是重复处理一遍。

# === ★★ 坑三:本地事务与发消息,不是原子的 ===
# ★ ★ 一个经典坑:你在一个方法里"写数据库 + 发 MQ
#   消息"。这两件事【没法保证同时成功】:可能 DB 写成功
#   了、消息没发出去(下游永远收不到);也可能消息发了、
#   DB 事务回滚了(下游收到一条"幽灵消息")。
# ★ ★ 解法:① MQ 的【事务消息】(RocketMQ 原生支持),
#   先发"半消息"、本地事务成功后再确认投递;② 或者更
#   通用的【本地消息表】:把"要发的消息"和业务数据写在
#   【同一个本地事务】里,再由定时任务把消息表里的记录
#   投到 MQ。后者不依赖特定 MQ,更普适。

# === ★ 坑四:监控,盯死"堆积量"和"消费延迟" ===
# ★ ★ MQ 出事,最早、最灵敏的信号就是【消息堆积量】
#   (lag)。它一旦持续上涨,几乎一定有问题。
# ★ ★ 必须监控的几个指标:① 各 Topic 的堆积量;
#   ② 消费延迟(消息从发出到被消费的时间差);
#   ③ 消费失败率 / 死信队列消息数;④ 生产发送失败率。
#   这几个接进告警,大多数 MQ 事故能在影响业务前就发现。

# === 认知 ===
# ★ 主流程通了还有四件事不做迟早出事。★ 坑一死信队列给
#   反复失败的消息一个归宿:消息消费失败 MQ 会重投,但若
#   每次都失败(消息体本身坏了或对应数据被删)会被无限
#   重投既消化不掉又堵着后面的,MQ 解法是死信队列(重投
#   超 N 次仍失败就移进 DLQ 不再骚扰正常流程);但 DLQ
#   不是垃圾桶必须监控告警、定期有人看里面积了什么为什么
#   失败,没人管的死信队列等于把事故藏起来。★ 坑二消息
#   回溯是消费端有 bug 时的后悔药:消费端代码有 bug 把过去
#   一小时消息处理错了、消息已 ack 怎么补救,Kafka/RocketMQ
#   这种消费后不立即删除按时间保留的 MQ 支持重置消费位点
#   offset,把消费进度拨回一小时前让修好的消费者重新消费
#   一遍,前提是消费逻辑幂等否则重放就是重复处理。★★ 坑三
#   本地事务与发消息不是原子的:一个方法里"写数据库+发 MQ
#   消息"没法保证同时成功 —— 可能 DB 写成功消息没发出
#   (下游永远收不到)、也可能消息发了 DB 回滚了(下游收
#   到幽灵消息),解法是 MQ 事务消息(先发半消息本地事务
#   成功后再确认)或更通用的本地消息表(消息和业务数据写
#   同一本地事务再由定时任务投 MQ)。★ 坑四监控盯死堆积量
#   和消费延迟:MQ 出事最早最灵敏的信号是消息堆积量 lag,
#   必须监控各 Topic 堆积量、消费延迟、消费失败率/死信数、
#   生产发送失败率,接进告警大多数事故能在影响业务前发现。
// ★ 坑三正解:本地消息表 —— 让"业务写库"和"消息记录"同事务原子
import org.springframework.transaction.annotation.Transactional;

public class OrderServiceWithOutbox {

    private OrderMapper orderMapper;
    private OutboxMapper outboxMapper;        // ★ 本地消息表(outbox)
    private RocketMQTemplate rocketMQTemplate;

    // === ① 业务写库 + 消息入表,在【同一个本地事务】里 ===
    @Transactional
    public void createOrder(Order order) {
        orderMapper.insert(order);            // ★ 业务数据

        // ★★ 关键:不直接发 MQ,而是把"待发消息"写进 outbox 表
        //    它和上面的 orderMapper.insert 在同一事务 ——
        //    要么都成功,要么都回滚,杜绝"DB 成功但消息丢"
        outboxMapper.insert(new OutboxMsg(
                order.getOrderId(), "order-topic",
                toJson(order), "PENDING"));
    }

    // === ② 定时任务:扫 outbox 表,把 PENDING 的消息投到 MQ ===
    // ★ 建议 @Scheduled 每秒/每几秒跑一次
    public void dispatchOutbox() {
        for (OutboxMsg m : outboxMapper.selectPending(100)) {
            try {
                rocketMQTemplate.syncSend(m.getTopic(), m.getPayload());
                outboxMapper.markSent(m.getId());      // ★ 发成功才标记
            } catch (Exception e) {
                // ★ 发失败不标记,下次定时任务会再扫到它、重发
                //   (所以消费端必须幂等 —— 这条可能被投多次)
                log.warn("outbox 消息投递失败,待重试: {}", m.getId(), e);
            }
        }
    }

    private String toJson(Order order) { /* 序列化 */ return ""; }
    private static final org.slf4j.Logger log =
            org.slf4j.LoggerFactory.getLogger(OrderServiceWithOutbox.class);
}

一张图:一条消息从生产到消费的可靠性全链路

关键命令与配置速查

┌──────────────────────────────┬────────────────────────────────┐
│ 场景 / 命令                    │ 说明                            │
├──────────────────────────────┼────────────────────────────────┤
│ 投递语义                       │ 默认 at-least-once = 不丢但会重复 │
│ 防丢-生产端                    │ 同步发送 + 检查 SEND_OK + 失败补偿│
│ 防丢-Broker                   │ 同步刷盘 + 集群多副本             │
│ 防丢-消费端                    │ 消费成功才 ack,失败不 ack        │
│ 防重                          │ 业务唯一键幂等(订单号),非 msgId │
│ 保顺序                        │ 同 key 进同分区 + 消费端顺序消费   │
│ syncSendOrderly(topic,msg,key)│ RocketMQ 按 key 顺序发送          │
│ ConsumeMode.ORDERLY           │ RocketMQ 顺序消费,同队列串行      │
│ 查堆积 (Kafka)                 │ kafka-consumer-groups --describe │
│ 重置位点 (Kafka)               │ --reset-offsets --to-datetime    │
│ 死信 Topic (RocketMQ)          │ %DLQ%consumerGroup,需单独监控    │
│ 事务消息 / 本地消息表          │ 解决"写库 + 发消息"非原子问题      │
└──────────────────────────────┴────────────────────────────────┘

★ 排查 MQ 事故第一指标:消息堆积量 lag —— 持续上涨几乎必有问题
★ 心态:别想消灭重复,假设消息一定会重,让消费逻辑幂等
★ 顺序需求:99% 的场景只需局部有序(同订单有序),别上全局有序

避坑清单:接入消息队列前过一遍这 10 条

  1. 先搞清你的 MQ 是什么投递语义。主流 MQ 默认 at-least-once——不丢消息,但消息一定会重复。重复不是 bug,是写在说明书里的设计,你必须自己消化。
  2. 别追求 exactly-once,用"at-least-once + 幂等"。严格的端到端恰好一次极难实现且代价高。工程上的正解是接受重复投递,靠消费端幂等达到"效果上恰好一次"。
  3. 消息丢失要在三处都设防。生产端(同步发送+检查结果)、Broker(同步刷盘+多副本)、消费端(成功才 ack)。任何一环裸奔,消息都会丢。
  4. 消费端幂等键用业务 ID,不要用 MQ 的 messageId。同一条业务消息重投时 messageId 可能变。用订单号,或生产时就生成的 UUID——重投多少次都不变的那个。
  5. 幂等优先用数据库唯一约束。往流水表插记录、订单号设唯一索引,重复消息插入冲突即跳过。判重和业务在同一事务,天然原子,比"先查再写"可靠。
  6. 你要的是局部有序,不是全局有序。全局有序会把吞吐打回单线程。把同一个 key(订单号)的消息哈希到同一分区,分区内有序即可,不同 key 仍并行。
  7. 顺序消费时,消费端绝不能再开线程池。消息有序到达后,若丢进线程池并发处理,顺序立刻又乱。同一分区的消息,消费时必须串行。
  8. 分清突发堆积和持续堆积。突发堆积是 MQ 削峰的正常表现,会自己消下去。持续上涨不降才是故障信号,先排查消费端是不是被慢操作卡住了。
  9. 扩容消费者别忘了分区上限。一个分区同时只能被一个消费者消费。分区数若小于消费者数,多出来的消费者纯空转。扩容要连分区一起加。
  10. 死信队列必须有人管,监控盯死堆积量。死信队列不是垃圾桶,积压的失败消息要定期排查。堆积量 lag 是 MQ 最灵敏的故障信号,务必接告警。

总结:消息队列的可靠性,是设计出来的

那次「库存被扣三次」的事故复盘完,我最大的感受不是「学会了几个技巧」,而是一个心态上的转变。出事那几天,我满脑子想的是「MQ 怎么能重复投递,这不合理」——我把消息队列当成了一个「我发什么、它就原样、不多不少送达什么」的可靠管道。可它从来不是。MQ 给你的承诺,白纸黑字写在投递语义那一行:默认情况下,它保证「至少一次」——它不会弄丢你的消息,但它明确地、不打算瞒着你地告诉你:同一条消息,它可能送给你好几次。我以为的「bug」,只是我没读完它的说明书。

读懂了这件事,后面的一切就顺了。消息会重复,所以消费端必须幂等——用业务唯一键、用数据库唯一约束、用状态机,让同一条消息处理一次和处理十次,结果完全一样。消息会丢,而且会在生产端、Broker、消费端三个不同的地方丢,所以三个环节都要设防——同步发送并检查结果、同步刷盘加多副本、消费成功才 ack。消息会乱序,因为多分区多消费者天生并行,所以把同一个订单的消息哈希到同一分区、再让它串行消费,换来够用的局部有序。消息会堆积,所以要分清突发和持续、要监控 lag、要预留分区、要备好应急转存的手段。这些都不是「用了 MQ 就自动拥有」的,每一条,都得你自己亲手设计进去。

所以如果让我把这次复盘收成一句话,那就是:消息队列让系统解耦、削峰、异步,但它不会顺便把可靠性也送给你——可靠性,是你一行一行设计出来的。引入 MQ 的那一刻,你其实是把一次「同步的、要么成功要么失败」的简单调用,换成了一个「异步的、可能丢、可能重、可能乱、可能堆」的分布式过程。你换来了性能和解耦,代价是你必须亲自处理这个过程里的每一种「可能」。

那次事故之后,我给团队定了一条规矩:任何人接入一个新的 MQ Topic,提交评审前,必须能回答四个问题——这条消息丢了会怎样、重了会怎样、乱序了会怎样、堆积了会怎样。答不上来,就说明这个 Topic 还没设计完。库存被扣成负数的那张对账单,后来一直贴在我工位旁边。它提醒我的不是「MQ 不可靠」,而是:可靠的系统,从来不是用上了某个可靠的中间件就有的,它是被人一处一处、认认真真设计出来的。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

大模型 Function Calling 完全指南:从一次 AI 客服乱调工具看懂工具编排

2026-5-21 16:25:25

技术教程

大模型上下文窗口完全指南:为什么 AI 对话越聊越贵、越聊越笨

2026-5-21 16:38:54

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索