2023 年我做一个电商的下单系统。下单成功后,要做一连串事:发短信通知、给用户加积分、通知仓库发货、更新销量统计。第一版我做得很直接:在下单接口里,一件接一件同步地调——下完单,调短信服务,再调积分服务,再调仓库……结果接口慢得感人,用户点一次"提交订单"要转好几秒圈。我很快想到用消息队列:下单成功后,我只往 MQ 里发一条"订单已创建"的消息就立刻返回,后面那些发短信、加积分的活,交给消费者慢慢异步处理。这套异步解耦加上去,效果立竿见影——下单接口快得飞起,一点就好。我心里很踏实:"消息发出去了,后面的事自然有人处理。"可上线之后,客服的投诉一条接一条,而且每次现象都不一样。第一类:用户明明下单成功了,积分却没加、短信也没收到——消息像是凭空消失了。第二类更可怕:有用户的积分加了两次,还有一笔订单的优惠券被重复核销、等于重复扣了款。第三类:大促那天,用户下单后过了快一个小时才收到短信——消息严重积压了。我盯着这三类各不相同的事故想了很久才彻底想明白,第一版错在一个根本的认知上:我以为"消息发进 MQ,就等于这件事一定会被可靠地、不多不少地处理一次"。可这个想法处处都错。一条消息从生产者出发,要经过网络、到达 Broker、被 Broker 存下来、再投递给消费者、被消费者处理完——这一路上的每一个环节,都可能把消息弄丢;而你为了"不丢"做的重试,又会让消息被处理不止一次。"消息发出去了"和"消息被恰好处理了一次",中间隔着一整套可靠性工程。这篇文章就把它梳理一遍:为什么"发了消息"不等于"消息被处理"、消息会在哪几个环节丢、生产端/Broker/消费端各自怎么保证不丢、为什么必须做幂等消费,以及消息顺序、消息积压、死信队列这些把消息队列真正做对要避开的坑。
问题背景
先把那三类事故和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。
现象:一个下单系统,把发短信、加积分等后续操作改成往 MQ 发消息、异步处理。接口确实变快了,但上线后出现三类事故:一是消息丢失(下单成功但积分没加);二是重复消费(积分加两次、优惠券重复核销);三是消息积压(大促时消费跟不上,用户一小时后才收到短信)。
我当时的错误认知:"消息只要发进了 MQ,就等于这件事一定会被可靠地、不多不少地处理一次。"
真相:一条消息要经过生产者→网络→Broker→存储→消费者→处理完这一长串环节,每个环节都可能丢消息。为了不丢而做的重试,又必然带来重复投递。所以可靠消息是一套组合工程:生产端确认、Broker 持久化、消费端手动 ack 三道关一起保证"至少一次"不丢;再用幂等消费把"至少一次"收敛成业务上的"恰好一次"。
要把消息队列做对,需要几块认知:
- 为什么"发了消息"不等于"消息被处理"——消息会在三个环节丢;
- 生产端怎么确保消息真的发到了 Broker;
- Broker 和消费端怎么确保消息不丢——持久化与手动 ack;
- 为什么"不丢"必然带来"重复",消费端为什么必须幂等;
- 消息顺序、消息积压、死信队列这些工程坑怎么处理。
一、为什么"发了消息"不等于"消息被处理"
先把这件最根本的事钉死:一条消息从生产者到被处理完,要走过生产者、Broker、消费者三大段路;这三段路上的任何一个环节崩溃或断网,消息都可能丢——"调用了发送方法"和"消息被可靠处理了一次",完全是两回事。
下面这段代码,就是我那个"消息会凭空消失"的第一版——它发完就不管了:
import pika
import json
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = conn.channel()
channel.queue_declare(queue="order_events") # 注意:没设 durable
def on_order_created(order: dict):
# 反面教材:发完消息就当万事大吉。
channel.basic_publish(
exchange="",
routing_key="order_events",
body=json.dumps(order),
# 没有 delivery_mode,消息不持久化
)
# 这行返回了,我就以为"消息稳了"。可实际上:
# 1. 这条消息可能根本没到 Broker(网络丢包),我不知道;
# 2. queue 没 durable、消息没持久化,Broker 一重启就没了;
# 3. 消费者还没处理完就崩了,这条消息也跟着没了。
def consume_naive():
# 反面教材:消费端【自动 ack】—— 收到就算处理完。
def callback(ch, method, properties, body):
order = json.loads(body)
add_points(order["user_id"]) # 万一这里抛异常?
send_sms(order["user_id"]) # 万一进程在这里崩了?
channel.basic_consume(
queue="order_events",
on_message_callback=callback,
auto_ack=True, # 致命:消息一到手就被确认
)
channel.start_consuming()
# auto_ack=True 意味着:Broker 把消息发给消费者的【那一刻】,
# 就认为它已被处理、随即从队列删除。可如果 callback 执行到
# 一半崩了,这条消息【已经从队列没了】,这件事就永远丢了。
这两段代码没有任何语法错误,在一切顺利时也工作得很好。它们的问题不在代码本身,而在一个错误的可靠性假设:它默认"调用了 basic_publish、收到了消息"就等于"这件事办成了"。可一条消息的生命线上,有三个截然不同的"断点":
- 生产端到 Broker:
basic_publish只是把消息交给了网络。网络可能丢包、Broker 可能正好宕机——消息根本没到,而你毫不知情。 - Broker 自身:消息到了 Broker,但如果队列和消息没做持久化,Broker 一重启,内存里的消息全没了。
- Broker 到消费端:消息投给了消费者,但消费者处理到一半崩了。若用了
auto_ack,消息早已从队列删除,这件事无可挽回。
这三个断点,要分三段去堵。先看第一段——生产端。
二、生产端可靠:确保消息真的到达了 Broker
第一个断点:你调了 basic_publish,但不知道消息到底有没有到 Broker。要堵住它,需要 Broker 给你一个明确的回执——这就是发送方确认机制(Publisher Confirms):开启它之后,Broker 成功接收并落盘一条消息后,会回一个 ack 给生产者;生产者收到这个 ack,才能确信"消息真的到了"。
def publish_with_confirm(order: dict) -> bool:
"""生产端可靠:开启 confirm 模式,确认 Broker 真的收到了。"""
channel.confirm_delivery() # 开启发送方确认
try:
channel.basic_publish(
exchange="",
routing_key="order_events",
body=json.dumps(order),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化(配合下一节)
),
mandatory=True,
)
return True # 收到 Broker 的 ack,确认到达
except pika.exceptions.UnroutableError:
return False # 没确认:消息没到,需重发或记录
但 confirm 机制还留了一个尾巴:如果 basic_publish 之后、ack 回来之前,生产者自己崩了呢?或者更现实的场景——下单这件事,本身要写数据库(创建订单),又要发消息。这两个动作怎么保证要么都成功、要么都不做?数据库事务管不到 MQ。工业界对这个问题最经典的解法,是本地消息表(事务性发件箱):把"要发的消息"当成一条数据,和订单写在同一个数据库事务里;再由一个独立的轮询任务,把这张表里还没发出去的消息捞出来发。
def create_order_with_outbox(order: dict):
"""本地消息表:订单和"待发消息"写在同一个事务里,保证原子。"""
with db.transaction() as tx:
tx.execute("INSERT INTO orders(id, user_id, amount) "
"VALUES(%s, %s, %s)",
(order["id"], order["user_id"], order["amount"]))
# 关键:消息不直接发 MQ,而是作为一行数据,
# 和订单【在同一个事务里】落库。事务成功 = 两者都成。
tx.execute("INSERT INTO outbox(msg_id, payload, status) "
"VALUES(%s, %s, 'pending')",
(order["id"], json.dumps(order)))
def outbox_poller():
"""独立轮询任务:把 outbox 里 pending 的消息真正发到 MQ。"""
rows = db.query("SELECT msg_id, payload FROM outbox "
"WHERE status='pending' LIMIT 100")
for row in rows:
if publish_with_confirm(json.loads(row["payload"])):
db.execute("UPDATE outbox SET status='sent' "
"WHERE msg_id=%s", (row["msg_id"],))
# 发送失败的,留着 status='pending',下一轮重试 —— 绝不会丢
本地消息表的精髓,是把"发消息"这个不可靠的动作,转化成了"写一行数据库记录"这个可靠且能进事务的动作。订单和消息同生共死;消息发出去之前,它稳稳地躺在表里,轮询任务会不停重试直到发成功。生产端这道关算是堵上了。消息到了 Broker,下一个问题:Broker 自己会不会把它弄丢?
三、Broker 与消费端可靠:持久化与手动 ack
第二个断点在 Broker 自身:消息到了,但 Broker 重启就没了。堵法是持久化——要让消息落到磁盘,需要两个设置同时到位:队列声明为 durable,消息本身设 delivery_mode=2。少一个都不行:队列不持久化,队列本身重启就没了;消息不持久化,队列还在、消息却没了。
def setup_durable_queue():
"""Broker 端可靠:队列和消息都要持久化,缺一不可。"""
channel.queue_declare(
queue="order_events",
durable=True, # 队列持久化:Broker 重启后队列还在
)
# 发消息时,properties 里必须带 delivery_mode=2(见上一节),
# 消息本身才会被写入磁盘。两者【同时】具备,消息才真正落盘。
# 注意:durable 属性不能改 —— 已存在的非 durable 队列
# 要先删掉再重新声明,否则 queue_declare 会报参数不一致。
第三个断点在消费端:消息投出去了,消费者处理到一半崩了。第一节的 auto_ack=True 是万恶之源——它让消息"一投出就被确认"。正确的做法是手动 ack:消费者把业务真正处理完,才手动告诉 Broker"这条我处理好了,可以删了";处理失败就 nack,让消息回到队列等待重投。
def consume_with_manual_ack():
"""消费端可靠:手动 ack —— 业务处理完才确认,失败则 nack 重回队列。"""
channel.basic_qos(prefetch_count=10) # 限流:一次最多取 10 条未确认的
def callback(ch, method, properties, body):
try:
order = json.loads(body)
handle_order_event(order) # 真正的业务处理
# 关键:业务确实做完了,才手动 ack
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
log_error(e)
# 处理失败:nack,requeue=True 让消息重回队列等重试
ch.basic_nack(delivery_tag=method.delivery_tag,
requeue=True)
channel.basic_consume(queue="order_events",
on_message_callback=callback) # 默认手动 ack
channel.start_consuming()
手动 ack 的保命逻辑是:消费者没 ack 的消息,Broker 会一直保留;一旦发现这个消费者断连了(进程崩了),Broker 会把这条未 ack 的消息重新投递给别的消费者。生产端 confirm、Broker 持久化、消费端手动 ack——三道关合起来,消息"不丢"基本做到了。可你有没有发现,这套机制为了"不丢",处处都在重试、重投——而这,恰恰带来了第二类事故的根源。
四、为什么"不丢"必然带来"重复":消费端必须幂等
上一节那套机制,有一个无法回避的副作用。设想:消费者把积分加好了,正要 ack 的前一刻,进程崩了。这条消息没被 ack,于是 Broker 认为它没被处理,重新投递。新的消费者拿到它,又把积分加了一次。结果:积分加了两次。这不是 bug,这是"为了不丢而重试"必然付出的代价。
这就是为什么,几乎所有消息队列只能承诺"至少一次"(at-least-once)投递——保证不丢,但可能重复。你想要的"恰好一次"(exactly-once),消息中间件本身给不了。它只能由消费端自己实现——办法就是幂等消费:让"同一条消息处理一次"和"处理多次"的最终结果完全一样。
def handle_order_event(order: dict):
"""幂等消费:用消息的唯一 ID 去重,重复的消息直接跳过。"""
msg_id = order["id"]
# 关键:用数据库唯一约束做"第一次处理"的标记。
# processed_messages 表的 msg_id 是主键/唯一索引。
try:
db.execute("INSERT INTO processed_messages(msg_id) "
"VALUES(%s)", (msg_id,))
except db.UniqueViolation:
# 插入失败 = 这条消息【之前已处理过】,直接跳过,保证幂等
log_info(f"消息 {msg_id} 已处理过,跳过")
return
# 能走到这,说明是【第一次】处理,放心执行业务
add_points(order["user_id"], order["amount"])
send_sms(order["user_id"])
这里的关键,是借数据库的唯一约束来做"这条消息是不是第一次见"的原子判断。INSERT 成功,说明是第一次,执行业务;INSERT 撞了唯一约束,说明处理过了,直接跳过。更稳妥的做法,是把"插入去重记录"和"业务操作"放进同一个事务,避免"去重记录插了、业务没做完就崩"的中间态。除了去重表,很多业务操作本身就能做成天然幂等——比如"把订单状态置为已支付"(执行多少次结果都一样),就比"余额加 100"这种累加操作要安全得多。消息不丢、也不会重复生效了。但要把消息队列真正用在生产上,还有几个绕不开的坑。
五、工程坑:消息顺序、积压与死信队列
"不丢"和"幂等"做好了,但还有几个工程坑,不处理就会在生产上出事。坑 1:消息顺序不是天然保证的。设想"订单创建"和"订单支付"两条消息,如果有多个消费者并行消费,"支付"消息完全可能比"创建"先被处理——业务就乱了。如果你的业务强依赖顺序,要做两件事:一是让同一个实体(比如同一个 order_id)的消息进同一个队列分区,二是该分区只让一个消费者顺序消费。但要清醒:严格保序会牺牲并发、拉低吞吐,所以——能不依赖顺序就别依赖。很多时候可以靠业务设计绕开,比如让消费者能处理乱序:
def handle_with_version(event: dict):
"""用版本号/状态机容忍乱序:旧版本的消息直接丢弃,不强求顺序。"""
order = db.query("SELECT status, version FROM orders "
"WHERE id=%s", (event["order_id"],))
# 关键:消息自带版本号。若它比数据库当前版本还旧,
# 说明是【过期的乱序消息】,直接忽略 —— 不依赖投递顺序。
if event["version"] <= order["version"]:
log_info("收到过期消息,忽略")
return
db.execute("UPDATE orders SET status=%s, version=%s WHERE id=%s",
(event["status"], event["version"], event["order_id"]))
坑 2:消息积压要能监控、能扩容。大促时,消息生产的速度远超消费的速度,队列里的消息越堆越多——这就是积压,表现就是"用户一小时后才收到短信"。应对要两手抓:一是监控队列堆积量,超过阈值就告警;二是让消费者能水平扩容——多加几个消费者实例,分摊压力。
def check_queue_backlog(threshold: int = 10000):
"""监控消息积压:队列堆积量超阈值就告警,提示扩容消费者。"""
q = channel.queue_declare(queue="order_events", durable=True,
passive=True) # passive:只查不建
pending = q.method.message_count
if pending > threshold:
alert(f"order_events 积压 {pending} 条,需扩容消费者")
return pending
坑 3:一直处理失败的消息,要进死信队列,不能无限重试。有些消息注定处理不了(比如消息体本身格式错误、关联的数据被删了)。如果它失败了就 nack 重回队列,它会被反复重投、反复失败,陷入死循环,还堵住后面正常的消息。正确做法是设一个最大重试次数,超过了就把这条"毒消息"转移到死信队列(Dead Letter Queue)——既不丢失(留着人工排查),也不再阻塞正常流程。坑 4:消息体要带版本、要尽量小。消息格式会演进,消息体里带一个版本号,消费者才能兼容新旧格式。另外,消息体里别塞大对象,只放必要的 ID,详情让消费者自己回查——消息体越小,吞吐越高。下面这张图,把一条消息从生产到可靠消费的完整链路串起来:
关键概念速查
| 概念 / 手段 | 说明 |
|---|---|
| 消息的三个断点 | 生产者到 Broker、Broker 自身、Broker 到消费者,每段都可能丢消息 |
| 发送方确认 confirm | Broker 收到并落盘后回 ack,生产者收到才能确信消息真的到达 |
| 本地消息表 | 消息当作数据和业务同事务落库,再由轮询任务发出,保证两者原子 |
| 队列与消息持久化 | 队列 durable 加消息 delivery_mode=2 同时具备,Broker 重启消息才不丢 |
| 手动 ack | 业务真正处理完才确认,失败则 nack 重回队列,杜绝处理一半丢消息 |
| 至少一次投递 | 消息队列只保证不丢但可能重复,这是为不丢而重试的必然代价 |
| 幂等消费 | 用消息唯一 ID 去重,让处理一次和处理多次的最终结果完全一样 |
| 消息顺序 | 多消费者并行不保证顺序,需保序要同实体进同分区且单消费者顺序消费 |
| 消息积压 | 生产快于消费导致堆积,要监控堆积量并支持消费者水平扩容 |
| 死信队列 | 超过最大重试次数的毒消息转入死信队列,不丢失也不阻塞正常流程 |
避坑清单
- 别以为调了发送方法消息就稳了,它要经过生产者 Broker 消费者三段路,每段都可能丢。
- 生产端要开 confirm 确认,Broker 落盘后回 ack 收到才算数,否则消息可能根本没到。
- 下单写库又发消息要用本地消息表,把消息当数据和业务同事务落库,再轮询发出。
- Broker 持久化要队列 durable 和消息 delivery_mode=2 同时具备,缺一个重启就丢。
- 消费端绝不能用 auto_ack,要手动 ack,业务处理完才确认,失败 nack 重回队列。
- 消息队列只保证至少一次会重复,exactly-once 中间件给不了,只能消费端做幂等。
- 幂等消费用消息唯一 ID 配合数据库唯一约束去重,或把业务设计成天然幂等的操作。
- 多消费者并行不保证消息顺序,强依赖顺序要同实体进同分区,但保序会牺牲吞吐。
- 大促消息会积压,要监控队列堆积量并告警,让消费者能水平扩容分摊压力。
- 注定失败的毒消息别无限重试,设最大重试次数超了转入死信队列等人工排查。
总结
回头看那三类"消息丢了、消息重复了、消息积压了"的事故,以及我后来在消息队列上接连踩的坑,最该记住的不是某一段 ack 代码,而是我动手前那个想当然的判断——"消息发进 MQ,就等于这件事一定会被不多不少地处理一次"。这句话错在它把"发送一条消息",想象成了一个瞬间完成、绝对可靠、有始有终的单一动作。可它根本不是。一条消息的一生,是一段漫长的旅程:它要离开生产者、穿过不可靠的网络、降落在 Broker、被存进磁盘、再次起飞奔向消费者、最后被处理。这一路上,每一段航程都可能失事;而你为了"不让它失事"配的那些保险(重试、重投),又会让它意外地飞了两趟。消息队列这件事想清楚的,正是这个:它不是一根"消息进去、结果出来"的魔法管道,而是一套需要你在每个环节都亲手加固的传送系统。引入 MQ 解决了"接口慢"的问题,但它同时把一个"本地的、事务性的、确定的"操作,变成了一个"跨网络的、最终一致的、可能重复的"操作——你省下的那几秒响应时间,是用接管这一整套可靠性换来的。
所以做消息队列,真正的工程量不在"basic_publish 和 basic_consume"那两行主干上。那两行,任何教程的第一页就教完了。真正的工程量,在于你要沿着消息的整条生命线,一个断点一个断点地去焊死:在生产端,你要用 confirm 和本地消息表,确保消息"发得出、且和业务同生共死";在 Broker,你要用持久化,确保消息"存得住、重启不丢";在消费端,你要用手动 ack,确保消息"处理完才算数";然后,你还要清醒地接受"至少一次"这个现实,用幂等去消化掉它必然带来的"重复"。这篇文章的几节,其实就是顺着消息这条生命线展开的:先想清楚消息会在哪三个环节丢,再逐段看生产端、Broker、消费端怎么加固,接着是为什么必须幂等,最后是顺序、积压、死信这几个把消息队列真正做对的工程细节。
你会发现,可靠消息的思路,和现实里寄一件贵重的包裹完全相通。你去寄一个重要的包裹,一个没经验的人,会把包裹往快递柜一塞就走,觉得"放进去了,自然会到"——这就是"发完就不管"的第一版。而一个有经验的人会怎么做?他会当面交给快递员,并拿到一张回执(这是生产端 confirm);他会确认快递公司有正规的仓库、包裹不会在中转时丢在路边(这是 Broker 持久化);他会要求收件人签收——而且是真的收到东西了才签,不是"快递员说放门口了"就算数(这是消费端手动 ack);他还知道,万一系统重复派送了一个包裹,收件人不会因此多付一次钱(这是幂等)。寄包裹的人都本能地明白:"我把它交出去了"和"它被完好地、且只被签收一次地送到了",是两件事,中间隔着一连串需要确认的环节。可程序员一用上 MQ,反而常常忘了这个朴素的道理。
最后想说,消息队列做没做扎实,差距永远不会在开发期暴露——开发时你发一条、收一条,网络又好、进程又不崩,丢不丢、重不重,根本看不出来。它只在真实的、有网络抖动、有进程崩溃、有大促洪峰的生产环境里才显形。那时候它会用最难堪的方式给你结账:做不好,你会像我一样,被客服的投诉逐条叫醒——这个用户下单成功了积分却没到,那个用户的优惠券被重复核销、等于多扣了钱,大促那天所有人的短信都晚了一个小时——你的系统看起来在正常运转,数据却在悄无声息地错乱。而做对了,无论网络怎么抖、进程怎么崩、流量怎么涌,每一条"订单已创建"的消息,都会不多不少、不早不晚地,最终让积分加上一次、短信发出一条——哪怕中途经历了重试、重投,业务结果依然分毫不差。所以别等客服的投诉单找上门,在你写下第一行"把它丢进 MQ"的代码时就该想清楚:这条消息发到 Broker 了吗,我怎么知道?Broker 重启了它还在不在?消费者处理一半崩了会怎样?同一条消息被处理两次,我的业务扛得住吗?这几个问题都有了答案,你的消息队列才不只是开发期那个"发得出收得到"的玩具,而是一套无论环境多恶劣都能把每件事可靠交付的传送系统。
—— 别看了 · 2026