消息队列完全指南:从一次"异步下单后积分没加、还重复扣了款"看懂可靠消息投递

2023 年我做一个电商的下单系统。下单成功后要做一连串事发短信通知给用户加积分通知仓库发货更新销量统计。第一版我做得很直接在下单接口里一件接一件同步地调结果接口慢得感人。我很快想到用消息队列下单成功后只往 MQ 里发一条订单已创建的消息就立刻返回后面那些活交给消费者异步处理接口快得飞起。我心里很踏实消息发出去了后面的事自然有人处理。可上线之后客服的投诉一条接一条第一类用户明明下单成功了积分却没加短信也没收到消息像是凭空消失了第二类有用户的积分加了两次优惠券被重复核销等于重复扣了款第三类大促那天用户下单后过了快一个小时才收到短信消息严重积压了。我才彻底想明白第一版错在我以为消息发进 MQ 就等于这件事一定会被可靠地不多不少地处理一次。可一条消息从生产者出发要经过网络到达 Broker 被存下来再投递给消费者被处理完这一路上的每一个环节都可能把消息弄丢而你为了不丢做的重试又会让消息被处理不止一次。本文从头梳理为什么发了消息不等于消息被处理消息会在哪几个环节丢生产端 Broker 消费端各自怎么保证不丢为什么必须做幂等消费以及消息顺序消息积压死信队列这些把消息队列真正做对要避开的坑。

2023 年我做一个电商的下单系统。下单成功后,要做一连串事:发短信通知、给用户加积分、通知仓库发货、更新销量统计。第一版我做得很直接:在下单接口里,一件接一件同步地调——下完单,调短信服务,再调积分服务,再调仓库……结果接口慢得感人,用户点一次"提交订单"要转好几秒圈。我很快想到用消息队列:下单成功后,我只往 MQ 里发一条"订单已创建"的消息就立刻返回,后面那些发短信、加积分的活,交给消费者慢慢异步处理。这套异步解耦加上去,效果立竿见影——下单接口快得飞起,一点就好。我心里很踏实:"消息发出去了,后面的事自然有人处理。"可上线之后,客服的投诉一条接一条,而且每次现象都不一样。第一类:用户明明下单成功了,积分却没加、短信也没收到——消息像是凭空消失了。第二类更可怕:有用户的积分加了两次,还有一笔订单的优惠券被重复核销、等于重复扣了款。第三类:大促那天,用户下单后过了快一个小时才收到短信——消息严重积压了。我盯着这三类各不相同的事故想了很久才彻底想明白,第一版错在一个根本的认知上:我以为"消息发进 MQ,就等于这件事一定会被可靠地、不多不少地处理一次"。可这个想法处处都错。一条消息从生产者出发,要经过网络、到达 Broker、被 Broker 存下来、再投递给消费者、被消费者处理完——这一路上的每一个环节,都可能把消息弄丢;而你为了"不丢"做的重试,又会让消息被处理不止一次。"消息发出去了"和"消息被恰好处理了一次",中间隔着一整套可靠性工程。这篇文章就把它梳理一遍:为什么"发了消息"不等于"消息被处理"、消息会在哪几个环节丢、生产端/Broker/消费端各自怎么保证不丢、为什么必须做幂等消费,以及消息顺序、消息积压、死信队列这些把消息队列真正做对要避开的坑。

问题背景

先把那三类事故和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。

现象:一个下单系统,把发短信、加积分等后续操作改成往 MQ 发消息、异步处理。接口确实变快了,但上线后出现三类事故:一是消息丢失(下单成功但积分没加);二是重复消费(积分加两次、优惠券重复核销);三是消息积压(大促时消费跟不上,用户一小时后才收到短信)。

我当时的错误认知:"消息只要发进了 MQ,就等于这件事一定会被可靠地、不多不少地处理一次。"

真相:一条消息要经过生产者→网络→Broker→存储→消费者→处理完这一长串环节,每个环节都可能丢消息。为了不丢而做的重试,又必然带来重复投递。所以可靠消息是一套组合工程:生产端确认、Broker 持久化、消费端手动 ack 三道关一起保证"至少一次"不丢;再用幂等消费把"至少一次"收敛成业务上的"恰好一次"。

要把消息队列做对,需要几块认知:

  • 为什么"发了消息"不等于"消息被处理"——消息会在三个环节丢;
  • 生产端怎么确保消息真的发到了 Broker;
  • Broker 和消费端怎么确保消息不丢——持久化与手动 ack;
  • 为什么"不丢"必然带来"重复",消费端为什么必须幂等;
  • 消息顺序、消息积压、死信队列这些工程坑怎么处理。

一、为什么"发了消息"不等于"消息被处理"

先把这件最根本的事钉死:一条消息从生产者到被处理完,要走过生产者、Broker、消费者三大段路;这三段路上的任何一个环节崩溃或断网,消息都可能丢——"调用了发送方法"和"消息被可靠处理了一次",完全是两回事。

下面这段代码,就是我那个"消息会凭空消失"的第一版——它发完就不管了:

import pika
import json

conn = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = conn.channel()
channel.queue_declare(queue="order_events")   # 注意:没设 durable


def on_order_created(order: dict):
    # 反面教材:发完消息就当万事大吉。
    channel.basic_publish(
        exchange="",
        routing_key="order_events",
        body=json.dumps(order),
        # 没有 delivery_mode,消息不持久化
    )
    # 这行返回了,我就以为"消息稳了"。可实际上:
    # 1. 这条消息可能根本没到 Broker(网络丢包),我不知道;
    # 2. queue 没 durable、消息没持久化,Broker 一重启就没了;
    # 3. 消费者还没处理完就崩了,这条消息也跟着没了。
def consume_naive():
    # 反面教材:消费端【自动 ack】—— 收到就算处理完。
    def callback(ch, method, properties, body):
        order = json.loads(body)
        add_points(order["user_id"])      # 万一这里抛异常?
        send_sms(order["user_id"])        # 万一进程在这里崩了?

    channel.basic_consume(
        queue="order_events",
        on_message_callback=callback,
        auto_ack=True,                    # 致命:消息一到手就被确认
    )
    channel.start_consuming()
    # auto_ack=True 意味着:Broker 把消息发给消费者的【那一刻】,
    # 就认为它已被处理、随即从队列删除。可如果 callback 执行到
    # 一半崩了,这条消息【已经从队列没了】,这件事就永远丢了。

这两段代码没有任何语法错误,在一切顺利时也工作得很好。它们的问题不在代码本身,而在一个错误的可靠性假设:它默认"调用了 basic_publish、收到了消息"就等于"这件事办成了"。可一条消息的生命线上,有三个截然不同的"断点":

  • 生产端到 Broker:basic_publish 只是把消息交给了网络。网络可能丢包、Broker 可能正好宕机——消息根本没到,而你毫不知情
  • Broker 自身:消息到了 Broker,但如果队列和消息没做持久化,Broker 一重启,内存里的消息全没了
  • Broker 到消费端:消息投给了消费者,但消费者处理到一半崩了。若用了 auto_ack,消息早已从队列删除,这件事无可挽回

这三个断点,要分三段去堵。先看第一段——生产端。

二、生产端可靠:确保消息真的到达了 Broker

第一个断点:你调了 basic_publish,但不知道消息到底有没有到 Broker。要堵住它,需要 Broker 给你一个明确的回执——这就是发送方确认机制(Publisher Confirms):开启它之后,Broker 成功接收并落盘一条消息后,会回一个 ack 给生产者;生产者收到这个 ack,才能确信"消息真的到了"。

def publish_with_confirm(order: dict) -> bool:
    """生产端可靠:开启 confirm 模式,确认 Broker 真的收到了。"""
    channel.confirm_delivery()            # 开启发送方确认
    try:
        channel.basic_publish(
            exchange="",
            routing_key="order_events",
            body=json.dumps(order),
            properties=pika.BasicProperties(
                delivery_mode=2,          # 消息持久化(配合下一节)
            ),
            mandatory=True,
        )
        return True                       # 收到 Broker 的 ack,确认到达
    except pika.exceptions.UnroutableError:
        return False                      # 没确认:消息没到,需重发或记录

但 confirm 机制还留了一个尾巴:如果 basic_publish 之后、ack 回来之前,生产者自己崩了呢?或者更现实的场景——下单这件事,本身要写数据库(创建订单),又要发消息。这两个动作怎么保证要么都成功、要么都不做?数据库事务管不到 MQ。工业界对这个问题最经典的解法,是本地消息表(事务性发件箱):把"要发的消息"当成一条数据,和订单写在同一个数据库事务里;再由一个独立的轮询任务,把这张表里还没发出去的消息捞出来发

def create_order_with_outbox(order: dict):
    """本地消息表:订单和"待发消息"写在同一个事务里,保证原子。"""
    with db.transaction() as tx:
        tx.execute("INSERT INTO orders(id, user_id, amount) "
                   "VALUES(%s, %s, %s)",
                   (order["id"], order["user_id"], order["amount"]))
        # 关键:消息不直接发 MQ,而是作为一行数据,
        # 和订单【在同一个事务里】落库。事务成功 = 两者都成。
        tx.execute("INSERT INTO outbox(msg_id, payload, status) "
                   "VALUES(%s, %s, 'pending')",
                   (order["id"], json.dumps(order)))


def outbox_poller():
    """独立轮询任务:把 outbox 里 pending 的消息真正发到 MQ。"""
    rows = db.query("SELECT msg_id, payload FROM outbox "
                    "WHERE status='pending' LIMIT 100")
    for row in rows:
        if publish_with_confirm(json.loads(row["payload"])):
            db.execute("UPDATE outbox SET status='sent' "
                       "WHERE msg_id=%s", (row["msg_id"],))
        # 发送失败的,留着 status='pending',下一轮重试 —— 绝不会丢

本地消息表的精髓,是把"发消息"这个不可靠的动作,转化成了"写一行数据库记录"这个可靠且能进事务的动作。订单和消息同生共死;消息发出去之前,它稳稳地躺在表里,轮询任务会不停重试直到发成功。生产端这道关算是堵上了。消息到了 Broker,下一个问题:Broker 自己会不会把它弄丢?

三、Broker 与消费端可靠:持久化与手动 ack

第二个断点在 Broker 自身:消息到了,但 Broker 重启就没了。堵法是持久化——要让消息落到磁盘,需要两个设置同时到位:队列声明为 durable,消息本身设 delivery_mode=2少一个都不行:队列不持久化,队列本身重启就没了;消息不持久化,队列还在、消息却没了。

def setup_durable_queue():
    """Broker 端可靠:队列和消息都要持久化,缺一不可。"""
    channel.queue_declare(
        queue="order_events",
        durable=True,             # 队列持久化:Broker 重启后队列还在
    )
    # 发消息时,properties 里必须带 delivery_mode=2(见上一节),
    # 消息本身才会被写入磁盘。两者【同时】具备,消息才真正落盘。
    # 注意:durable 属性不能改 —— 已存在的非 durable 队列
    # 要先删掉再重新声明,否则 queue_declare 会报参数不一致。

第三个断点在消费端:消息投出去了,消费者处理到一半崩了。第一节的 auto_ack=True万恶之源——它让消息"一投出就被确认"。正确的做法是手动 ack:消费者把业务真正处理完,手动告诉 Broker"这条我处理好了,可以删了";处理失败就 nack,让消息回到队列等待重投。

def consume_with_manual_ack():
    """消费端可靠:手动 ack —— 业务处理完才确认,失败则 nack 重回队列。"""
    channel.basic_qos(prefetch_count=10)  # 限流:一次最多取 10 条未确认的

    def callback(ch, method, properties, body):
        try:
            order = json.loads(body)
            handle_order_event(order)         # 真正的业务处理
            # 关键:业务确实做完了,才手动 ack
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            log_error(e)
            # 处理失败:nack,requeue=True 让消息重回队列等重试
            ch.basic_nack(delivery_tag=method.delivery_tag,
                          requeue=True)

    channel.basic_consume(queue="order_events",
                          on_message_callback=callback)  # 默认手动 ack
    channel.start_consuming()

手动 ack 的保命逻辑是:消费者没 ack 的消息,Broker 会一直保留;一旦发现这个消费者断连了(进程崩了),Broker 会把这条未 ack 的消息重新投递给别的消费者。生产端 confirm、Broker 持久化、消费端手动 ack——三道关合起来,消息"不丢"基本做到了。可你有没有发现,这套机制为了"不丢",处处都在重试、重投——而这,恰恰带来了第二类事故的根源。

四、为什么"不丢"必然带来"重复":消费端必须幂等

上一节那套机制,有一个无法回避的副作用。设想:消费者把积分加好了,正要 ack前一刻,进程崩了。这条消息没被 ack,于是 Broker 认为它没被处理,重新投递。新的消费者拿到它,又把积分加了一次。结果:积分加了两次。这不是 bug,这是"为了不丢而重试"必然付出的代价。

这就是为什么,几乎所有消息队列只能承诺"至少一次"(at-least-once)投递——保证不丢,但可能重复。你想要的"恰好一次"(exactly-once),消息中间件本身给不了。它只能消费端自己实现——办法就是幂等消费:让"同一条消息处理一次"和"处理多次"的最终结果完全一样

def handle_order_event(order: dict):
    """幂等消费:用消息的唯一 ID 去重,重复的消息直接跳过。"""
    msg_id = order["id"]
    # 关键:用数据库唯一约束做"第一次处理"的标记。
    # processed_messages 表的 msg_id 是主键/唯一索引。
    try:
        db.execute("INSERT INTO processed_messages(msg_id) "
                   "VALUES(%s)", (msg_id,))
    except db.UniqueViolation:
        # 插入失败 = 这条消息【之前已处理过】,直接跳过,保证幂等
        log_info(f"消息 {msg_id} 已处理过,跳过")
        return
    # 能走到这,说明是【第一次】处理,放心执行业务
    add_points(order["user_id"], order["amount"])
    send_sms(order["user_id"])

这里的关键,是借数据库的唯一约束来做"这条消息是不是第一次见"的原子判断INSERT 成功,说明是第一次,执行业务;INSERT 撞了唯一约束,说明处理过了,直接跳过。更稳妥的做法,是把"插入去重记录"和"业务操作"放进同一个事务,避免"去重记录插了、业务没做完就崩"的中间态。除了去重表,很多业务操作本身就能做成天然幂等——比如"把订单状态置为已支付"(执行多少次结果都一样),就比"余额加 100"这种累加操作要安全得多。消息不丢、也不会重复生效了。但要把消息队列真正用在生产上,还有几个绕不开的坑。

五、工程坑:消息顺序、积压与死信队列

"不丢"和"幂等"做好了,但还有几个工程坑,不处理就会在生产上出事。坑 1:消息顺序不是天然保证的。设想"订单创建"和"订单支付"两条消息,如果有多个消费者并行消费,"支付"消息完全可能比"创建"先被处理——业务就乱了。如果你的业务强依赖顺序,要做两件事:一是让同一个实体(比如同一个 order_id)的消息进同一个队列分区,二是该分区只让一个消费者顺序消费。但要清醒:严格保序会牺牲并发、拉低吞吐,所以——能不依赖顺序就别依赖。很多时候可以靠业务设计绕开,比如让消费者能处理乱序:

def handle_with_version(event: dict):
    """用版本号/状态机容忍乱序:旧版本的消息直接丢弃,不强求顺序。"""
    order = db.query("SELECT status, version FROM orders "
                     "WHERE id=%s", (event["order_id"],))
    # 关键:消息自带版本号。若它比数据库当前版本还旧,
    # 说明是【过期的乱序消息】,直接忽略 —— 不依赖投递顺序。
    if event["version"] <= order["version"]:
        log_info("收到过期消息,忽略")
        return
    db.execute("UPDATE orders SET status=%s, version=%s WHERE id=%s",
               (event["status"], event["version"], event["order_id"]))

坑 2:消息积压要能监控、能扩容。大促时,消息生产的速度远超消费的速度,队列里的消息越堆越多——这就是积压,表现就是"用户一小时后才收到短信"。应对要两手抓:一是监控队列堆积量,超过阈值就告警;二是让消费者能水平扩容——多加几个消费者实例,分摊压力。

def check_queue_backlog(threshold: int = 10000):
    """监控消息积压:队列堆积量超阈值就告警,提示扩容消费者。"""
    q = channel.queue_declare(queue="order_events", durable=True,
                              passive=True)        # passive:只查不建
    pending = q.method.message_count
    if pending > threshold:
        alert(f"order_events 积压 {pending} 条,需扩容消费者")
    return pending

坑 3:一直处理失败的消息,要进死信队列,不能无限重试。有些消息注定处理不了(比如消息体本身格式错误、关联的数据被删了)。如果它失败了就 nack 重回队列,它会被反复重投、反复失败,陷入死循环,还堵住后面正常的消息。正确做法是设一个最大重试次数,超过了就把这条"毒消息"转移到死信队列(Dead Letter Queue)——既不丢失(留着人工排查),也不再阻塞正常流程。坑 4:消息体要带版本、要尽量小。消息格式会演进,消息体里带一个版本号,消费者才能兼容新旧格式。另外,消息体里别塞大对象,只放必要的 ID,详情让消费者自己回查——消息体越小,吞吐越高。下面这张图,把一条消息从生产到可靠消费的完整链路串起来:

关键概念速查

概念 / 手段 说明
消息的三个断点 生产者到 Broker、Broker 自身、Broker 到消费者,每段都可能丢消息
发送方确认 confirm Broker 收到并落盘后回 ack,生产者收到才能确信消息真的到达
本地消息表 消息当作数据和业务同事务落库,再由轮询任务发出,保证两者原子
队列与消息持久化 队列 durable 加消息 delivery_mode=2 同时具备,Broker 重启消息才不丢
手动 ack 业务真正处理完才确认,失败则 nack 重回队列,杜绝处理一半丢消息
至少一次投递 消息队列只保证不丢但可能重复,这是为不丢而重试的必然代价
幂等消费 用消息唯一 ID 去重,让处理一次和处理多次的最终结果完全一样
消息顺序 多消费者并行不保证顺序,需保序要同实体进同分区且单消费者顺序消费
消息积压 生产快于消费导致堆积,要监控堆积量并支持消费者水平扩容
死信队列 超过最大重试次数的毒消息转入死信队列,不丢失也不阻塞正常流程

避坑清单

  1. 别以为调了发送方法消息就稳了,它要经过生产者 Broker 消费者三段路,每段都可能丢。
  2. 生产端要开 confirm 确认,Broker 落盘后回 ack 收到才算数,否则消息可能根本没到。
  3. 下单写库又发消息要用本地消息表,把消息当数据和业务同事务落库,再轮询发出。
  4. Broker 持久化要队列 durable 和消息 delivery_mode=2 同时具备,缺一个重启就丢。
  5. 消费端绝不能用 auto_ack,要手动 ack,业务处理完才确认,失败 nack 重回队列。
  6. 消息队列只保证至少一次会重复,exactly-once 中间件给不了,只能消费端做幂等。
  7. 幂等消费用消息唯一 ID 配合数据库唯一约束去重,或把业务设计成天然幂等的操作。
  8. 多消费者并行不保证消息顺序,强依赖顺序要同实体进同分区,但保序会牺牲吞吐。
  9. 大促消息会积压,要监控队列堆积量并告警,让消费者能水平扩容分摊压力。
  10. 注定失败的毒消息别无限重试,设最大重试次数超了转入死信队列等人工排查。

总结

回头看那三类"消息丢了、消息重复了、消息积压了"的事故,以及我后来在消息队列上接连踩的坑,最该记住的不是某一段 ack 代码,而是我动手前那个想当然的判断——"消息发进 MQ,就等于这件事一定会被不多不少地处理一次"。这句话错在它把"发送一条消息",想象成了一个瞬间完成、绝对可靠、有始有终单一动作。可它根本不是。一条消息的一生,是一段漫长的旅程:它要离开生产者、穿过不可靠的网络、降落在 Broker、被存进磁盘、再次起飞奔向消费者、最后被处理。这一路上,每一段航程都可能失事;而你为了"不让它失事"配的那些保险(重试、重投),又会让它意外地飞了两趟。消息队列这件事想清楚的,正是这个:它不是一根"消息进去、结果出来"的魔法管道,而是一套需要你在每个环节都亲手加固传送系统。引入 MQ 解决了"接口慢"的问题,但它同时把一个"本地的、事务性的、确定的"操作,变成了一个"跨网络的、最终一致的、可能重复的"操作——你省下的那几秒响应时间,是用接管这一整套可靠性换来的。

所以做消息队列,真正的工程量不在"basic_publishbasic_consume"那两行主干上。那两行,任何教程的第一页就教完了。真正的工程量,在于你要沿着消息的整条生命线,一个断点一个断点地去焊死:在生产端,你要用 confirm 和本地消息表,确保消息"发得出、且和业务同生共死";在 Broker,你要用持久化,确保消息"存得住、重启不丢";在消费端,你要用手动 ack,确保消息"处理完才算数";然后,你还要清醒地接受"至少一次"这个现实,用幂等去消化掉它必然带来的"重复"。这篇文章的几节,其实就是顺着消息这条生命线展开的:先想清楚消息会在哪三个环节丢,再逐段看生产端、Broker、消费端怎么加固,接着是为什么必须幂等,最后是顺序、积压、死信这几个把消息队列真正做对的工程细节。

你会发现,可靠消息的思路,和现实里寄一件贵重的包裹完全相通。你去寄一个重要的包裹,一个没经验的人,会把包裹往快递柜一塞就走,觉得"放进去了,自然会到"——这就是"发完就不管"的第一版。而一个有经验的人会怎么做?他会当面交给快递员,并拿到一张回执(这是生产端 confirm);他会确认快递公司有正规的仓库、包裹不会在中转时丢在路边(这是 Broker 持久化);他会要求收件人签收——而且是真的收到东西了才签,不是"快递员说放门口了"就算数(这是消费端手动 ack);他还知道,万一系统重复派送了一个包裹,收件人不会因此多付一次钱(这是幂等)。寄包裹的人都本能地明白:"我把它交出去了"和"它被完好地、且只被签收一次地送到了",是两件事,中间隔着一连串需要确认的环节。可程序员一用上 MQ,反而常常忘了这个朴素的道理。

最后想说,消息队列做没做扎实,差距永远不会在开发期暴露——开发时你发一条、收一条,网络又好、进程又不崩,丢不丢、重不重,根本看不出来。它只在真实的、有网络抖动、有进程崩溃、有大促洪峰的生产环境里才显形。那时候它会用最难堪的方式给你结账:做不好,你会像我一样,被客服的投诉逐条叫醒——这个用户下单成功了积分却没到,那个用户的优惠券被重复核销、等于多扣了钱,大促那天所有人的短信都晚了一个小时——你的系统看起来在正常运转,数据却在悄无声息地错乱。而做了,无论网络怎么抖、进程怎么崩、流量怎么涌,每一条"订单已创建"的消息,都会不多不少、不早不晚地,最终让积分加上一次、短信发出一条——哪怕中途经历了重试、重投,业务结果依然分毫不差。所以别等客服的投诉单找上门,在你写下第一行"把它丢进 MQ"的代码时就该想清楚:这条消息发到 Broker 了吗,我怎么知道?Broker 重启了它还在不在?消费者处理一半崩了会怎样?同一条消息被处理两次,我的业务扛得住吗?这几个问题都有了答案,你的消息队列才不只是开发期那个"发得出收得到"的玩具,而是一套无论环境多恶劣都能把每件事可靠交付的传送系统。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

Prompt 注入完全指南:从一次"用户一句忽略以上所有指令、AI 就被策反"看懂大模型安全

2026-5-21 22:04:26

技术教程

大模型上下文管理完全指南:从一次"对话越聊越久、AI 突然忘了开头还报 token 超限"看懂上下文窗口

2026-5-21 22:16:20

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索