2024 年我们的积分服务出过一次很憋屈的故障:运营反馈说,有一批用户该到账的积分始终没到。查下来,这些积分是通过 RabbitMQ 的消息异步发放的,而问题出在消费端——某条消息因为一个偶发的下游异常处理失败了,我们的代码对失败消息的处理方式是"直接 basicNack 并且 requeue=true",于是这条坏消息被打回队列、立刻又被投递、又失败……它就这样在队列头部疯狂打转,把一个消费线程死死占住,后面正常的消息全被堵在身后。更糟的是另一个队列,失败消息被设成了 requeue=false,结果它直接被 RabbitMQ 丢弃了,消息就这么无声无息地消失了。这两种极端,正是因为我们从来没有正经设计过"消息消费失败之后该去哪"。投了几天把死信队列和消费端的可靠性补齐了一遍,本文复盘这次实战。
问题背景
业务:积分服务,通过 RabbitMQ 异步发放用户积分
事故现象:
- 一批用户积分迟迟不到账
- 监控发现某队列消息严重堆积,消费速度几乎为 0
- 另一个队列则相反:消息"消费成功"了,但业务没生效
现场排查:
# 1. 看堆积队列的消费代码
channel.basicConsume(queue, false, (tag, msg) -> {
try {
addPoints(msg); // 处理消息
channel.basicAck(...); // 成功确认
} catch (Exception e) {
// 失败:打回队列重新投递
channel.basicNack(tag, false, true); // requeue=true
}
});
# 一条坏消息处理失败 -> nack requeue -> 立刻又被投回来
# -> 又失败 -> 又 requeue ... 死循环,卡住整个消费
# 2. 看"丢消息"队列的代码
channel.basicNack(tag, false, false); // requeue=false
# 失败后 requeue=false -> 消息被 RabbitMQ 直接丢弃,人间蒸发
根因:
1. 消费失败的消息,只有"无限重投"和"直接丢弃"两条路
2. 无限重投:一条坏消息卡死队列,形成消费死循环
3. 直接丢弃:消息悄无声息地没了,无法追溯、无法补偿
4. 从来没有为"失败消息"设计一个专门的归宿
修复 1:先理解什么是死信和死信队列
=== 什么样的消息会变成"死信"(Dead Letter)===
RabbitMQ 里,一条消息在以下三种情况会变成死信:
1. 消息被消费者 nack / reject,且 requeue = false
(消费失败,且明确表示"不要再放回队列了")
2. 消息在队列里存活超过了 TTL(过期了),还没被消费
3. 队列达到了最大长度上限,最先入队的消息被挤出
=== 死信去哪了 —— 死信交换机 DLX ===
死信不会凭空消失。如果给队列配置了
"死信交换机"(Dead Letter Exchange, DLX),
那么这个队列里产生的所有死信,
都会被自动转发到这个 DLX,再路由进一个专门的队列 ——
这个队列,就是我们常说的【死信队列】。
=== 死信队列的价值 ===
它给"处理失败的消息"提供了一个明确的、可控的归宿:
- 消息不会无限重投卡死正常队列
- 消息也不会被静默丢弃、无法追溯
- 失败消息集中堆在死信队列里,可以被监控、
被人工排查、被重新处理
一句话:死信队列把"消息失败"这件事,从一个
失控的意外,变成了一个可观测、可处理的常规流程。
修复 2:给业务队列配上死信交换机
// === 声明:业务队列 + 它的死信交换机 + 死信队列 ===
@Configuration
public class PointsMqConfig {
// --- 1. 死信交换机和死信队列(失败消息的归宿)---
@Bean
public DirectExchange pointsDlx() {
return new DirectExchange("points.dlx");
}
@Bean
public Queue pointsDlQueue() {
return new Queue("points.dl.queue"); // 死信队列
}
@Bean
public Binding dlBinding() {
return BindingBuilder.bind(pointsDlQueue())
.to(pointsDlx()).with("points.dead");
}
// --- 2. 业务队列,关键是把 DLX 挂上去 ---
@Bean
public Queue pointsQueue() {
Map args = new HashMap<>();
// 指定该队列产生的死信,转发到哪个交换机
args.put("x-dead-letter-exchange", "points.dlx");
// 死信转发时用的路由键
args.put("x-dead-letter-routing-key", "points.dead");
// 可选:消息在本队列的存活上限,超时变死信
args.put("x-message-ttl", 600000); // 10 分钟
return new Queue("points.queue", true, false, false, args);
}
@Bean
public DirectExchange pointsExchange() {
return new DirectExchange("points.exchange");
}
@Bean
public Binding pointsBinding() {
return BindingBuilder.bind(pointsQueue())
.to(pointsExchange()).with("points.add");
}
}
// === 配好之后 ===
// points.queue 里任何一条消息,只要被 nack(requeue=false)、
// 或超时、或队列满被挤出,都会自动流进 points.dl.queue。
// 失败消息有了明确的归宿,不再失控。
修复 3:消费失败不要无脑 requeue
// === 消费失败时,先分清是哪种失败 ===
@RabbitListener(queues = "points.queue")
public void onMessage(PointsMessage msg, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
try {
addPoints(msg);
channel.basicAck(tag, false); // 成功,确认
} catch (BusinessException e) {
// === 业务异常:消息本身有问题,重试多少次都没用 ===
// 例:用户不存在、参数非法、积分规则不匹配
// -> 直接进死信队列,不要浪费重试
log.error("消息业务异常,转入死信队列, msg={}", msg, e);
channel.basicNack(tag, false, false); // requeue=false -> 死信
} catch (Exception e) {
// === 系统/临时异常:下游抖动、超时、网络问题 ===
// 这种"重试可能会成功",但绝不能无脑立刻重投
log.warn("消息处理临时失败,准备重试, msg={}", msg, e);
// 交给"有限次重试"机制处理(见修复 4)
retryHandler.handle(msg, tag, channel);
}
}
// === 核心原则 ===
// 1. 永远不要 requeue=true 无限重投 ——
// 一条坏消息会立刻打回、立刻重消费,形成死循环卡死队列。
// 2. 区分"可重试"和"不可重试":
// - 业务错误(数据本身不对)-> 重试无意义,直接进死信
// - 系统错误(下游临时抖动)-> 值得有限次重试
// 3. 无论走哪条路,消息都不能"消失",要么被处理、
// 要么进死信队列留档。
修复 4:用延迟重试队列做"有限次退避重试"
=== 想要的重试效果 ===
临时失败的消息,不要立刻重投(立刻重投往往还是失败),
而是【等一会儿再重试】,并且【最多重试 N 次】,
N 次都失败,才真正进死信队列。
=== 怎么实现"等一会儿再重试" —— 重试队列 ===
搞一个专门的"重试队列",它有两个特点:
1. 没有消费者(消息进去不会被立即消费)
2. 设置了 TTL(比如 30 秒)+ 死信交换机指向【业务队列】
消息流转:
业务队列消费失败 -> 把消息发到【重试队列】
-> 消息在重试队列里待满 30 秒 TTL,变成死信
-> 死信被转发回【业务队列】-> 被重新消费一次
这样就实现了"延迟 30 秒后自动重投"。
=== 重试次数怎么记 ===
在消息的 header 里放一个 retryCount 计数:
- 每次重试前 +1
- 消费时先读 retryCount,达到上限(如 3 次)
就不再进重试队列,直接进【最终死信队列】
- header 还能记下每次失败的原因,方便排查
=== 退避(backoff)===
进阶做法:让重试间隔逐次拉长 ——
第 1 次等 10s,第 2 次等 1 分钟,第 3 次等 10 分钟。
给下游更充分的恢复时间,避免"重试风暴"。
可以用多个不同 TTL 的重试队列来实现分级延迟。
// === 重试处理器:带次数上限的延迟重试 ===
@Component
public class RetryHandler {
private static final int MAX_RETRY = 3;
public void handle(PointsMessage msg, long tag, Channel channel)
throws IOException {
int retryCount = msg.getRetryCount();
if (retryCount >= MAX_RETRY) {
// 重试次数耗尽 -> 进最终死信队列,人工介入
log.error("消息重试 {} 次仍失败,转入死信队列, msg={}",
MAX_RETRY, msg);
rabbitTemplate.convertAndSend(
"points.dlx", "points.dead", msg);
channel.basicAck(tag, false); // 原消息确认掉
} else {
// 还能重试 -> 计数 +1,发往延迟重试队列
msg.setRetryCount(retryCount + 1);
rabbitTemplate.convertAndSend(
"points.retry.exchange", "points.retry", msg);
channel.basicAck(tag, false); // 原消息先确认
}
}
}
修复 5:消费端必须保证幂等
// === 为什么重试机制下,幂等是【必须】的 ===
// 一条消息在重试链路里会被消费【多次】。
// 还有一种隐蔽情况:消息处理其实成功了,
// 但 basicAck 还没发出去,消费者就宕机了 ——
// RabbitMQ 没收到 ack,会把这条消息再投递一次。
// 所以消费端必须假定:同一条消息可能被处理不止一次。
// === 幂等做法:用消息的全局唯一 ID 去重 ===
@RabbitListener(queues = "points.queue")
public void onMessage(PointsMessage msg, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
String msgId = msg.getMsgId(); // 生产者生成的全局唯一 ID
// 用 Redis SET NX 抢占:抢到才处理,抢不到说明处理过了
Boolean first = redis.opsForValue()
.setIfAbsent("mq:done:" + msgId, "1", Duration.ofDays(3));
if (!Boolean.TRUE.equals(first)) {
log.info("消息已处理过,幂等跳过, msgId={}", msgId);
channel.basicAck(tag, false);
return;
}
try {
addPoints(msg);
channel.basicAck(tag, false);
} catch (Exception e) {
// 处理失败:把幂等标记删掉,允许后续重试
redis.delete("mq:done:" + msgId);
throw e;
}
}
// === 更可靠:幂等校验放进业务数据库事务 ===
// 比如积分发放,建一张"积分流水表",msgId 加唯一索引,
// 发积分和插流水在同一个事务里完成。
// 同一条消息再来,插流水时唯一索引冲突 -> 天然幂等。
// 这比单独用 Redis 标记更可靠(不怕标记和业务不一致)。
修复 6:死信队列的监控与处理流程
# 死信队列监控:死信队列里有消息,本身就是一种告警
groups:
- name: rabbitmq-dlq
rules:
# 1. 死信队列出现消息(有消息彻底处理失败了)
- alert: DeadLetterQueueNotEmpty
expr: rabbitmq_queue_messages{queue="points.dl.queue"} > 0
for: 1m
annotations:
summary: "死信队列有 {{ $value }} 条消息,存在处理失败的业务消息"
# 2. 业务队列消息堆积(消费跟不上)
- alert: BusinessQueueBacklog
expr: rabbitmq_queue_messages{queue="points.queue"} > 1000
for: 5m
annotations:
summary: "points.queue 堆积超 1000 条,消费能力不足"
# 3. 消费端无消费者(消费全挂了)
- alert: QueueNoConsumer
expr: rabbitmq_queue_consumers{queue="points.queue"} == 0
for: 1m
annotations:
summary: "points.queue 没有任何消费者,消息无人处理"
=== 死信队列的处理流程(必须有人负责)===
死信队列不是"垃圾桶",消息进去了不能没人管。
要建立一套明确的处理流程:
1. 死信队列一有消息,立刻告警到负责人
2. 排查每条死信的失败原因(消息 header 里记着):
- 数据问题(用户不存在等)-> 修数据 / 确认可丢弃
- 代码 bug -> 修复代码后重新投递
- 下游长时间故障 -> 等下游恢复后批量重投
3. 提供一个"死信重投"工具/接口:
能把死信队列里的消息,修正后重新发回业务队列
4. 确认无需处理的死信,人工归档后再清理,留下记录
=== 一个重要心态 ===
死信队列里【有】消息,不是系统的失败,而是系统的成熟 ——
它说明失败的消息被妥善接住了,而不是丢了或卡死了。
真正可怕的,是没有死信队列时,你根本不知道有消息失败了。
优化效果
指标 治理前 治理后
=============================================================
消费失败处理 无限重投 或 直接丢弃 死信队列 + 有限次重试
坏消息影响 一条卡死整个队列 单条隔离,不阻塞正常消息
消息丢失 requeue=false 即丢失 失败消息全进死信队列留档
重试方式 立刻重投,大概率再失败 延迟退避重试,最多 3 次
消费幂等 无,重试导致重复发放 msgId 去重 + 流水唯一索引
失败可追溯 无法追溯 死信 header 记录失败原因
失败可观测 无 死信/堆积/无消费者监控
失败消息处理 没有流程 告警 + 排查 + 重投工具
治理过程:
- 定位无限重投与消息丢失根因:0.5 天
- 业务队列配置死信交换机:0.5 天
- 失败分类 + 延迟重试队列:1.5 天
- 消费端幂等改造:1 天
- 死信监控 + 重投工具 + 流程:1 天
避坑清单
- 消费失败只有"无限重投"和"直接丢弃"两条路时,前者卡死队列、后者丢消息
- 死信是被 nack 且 requeue=false、或超时 TTL、或队列满被挤出的消息
- 给业务队列配 x-dead-letter-exchange,失败消息会自动流入死信队列,不再失控
- 永远不要 requeue=true 无限重投,一条坏消息会立刻打回形成消费死循环
- 区分失败类型:业务错误重试无意义直接进死信,系统临时错误才值得重试
- 用带 TTL 的重试队列实现"延迟后自动重投",header 里记 retryCount 控制次数
- 重试间隔最好逐次拉长(退避),给下游恢复时间,避免重试风暴
- 重试机制下消息会被消费多次,消费端必须幂等,用 msgId 去重
- 幂等校验放进业务事务、靠唯一索引兜底,比单用 Redis 标记更可靠
- 死信队列必须监控并配人工重投流程,死信堆积本身就是一种告警
总结
这次死信队列的治理,让我重新认识了一个一直被我们想得太简单的问题:一条消息消费失败之后,它到底应该去哪里。在出事之前,我们消费端对失败的处理粗暴到近乎没有处理——要么 nack 时把 requeue 设成 true,把消息打回队列,要么设成 false,让它消失。这两种做法看起来是"二选一",可它们其实是同一个根本缺陷的两个极端表现:我们从来没有给"失败的消息"准备一个真正的归宿。先说 requeue=true 的那条路,它的致命之处在于"立刻"二字——一条因为数据本身有问题而注定失败的坏消息,被打回队列后会被几乎瞬间地重新投递、重新消费、重新失败,如此往复,它就像卡在唱片上的一根针,在队列头部疯狂空转,死死占住消费线程,排在它后面那些本来完全正常的消息,全部被它堵在身后动弹不得。一条坏消息,瘫痪一整条队列。再说 requeue=false 的那条路,它没有死循环,但它更阴险——消息失败之后被 RabbitMQ 直接丢弃,干净利落,不留痕迹,以至于我们是靠运营反馈"用户积分没到账"才知道出了事,而那时消息早已无从追溯。这两条路一对比就能看清:问题的核心不在于"重投还是丢弃"这个选择本身,而在于我们的系统里根本不存在第三个选项——一个让失败消息既不会卡死正常流程、也不会无声消失的安全归宿。死信队列,就是这个第三选项。它的机制其实很朴素:给业务队列挂上一个死信交换机,那么这个队列里所有"消费失败且明确不再重投""存活超时""被队列挤出"的消息,都会自动地、可靠地流进一个专门的死信队列里待命。有了这个归宿,整个失败处理的链路才真正变得可设计:面对一条失败的消息,我们终于可以从容地判断它属于哪一种失败——如果是业务错误,比如消息里的用户压根不存在、参数本身就不合法,那这种消息重试一万次也不会成功,就让它直接进死信队列,等人来排查,绝不浪费一次重试;如果是系统性的临时错误,比如下游服务正好在抖动、一次网络超时,那它"重试可能会成功",值得给它机会,但这个机会必须是"有限次"且"延迟"的——用一个带 TTL 的重试队列让消息先冷静几十秒再回到业务队列,并在消息头里用一个计数器记录重试了几次,次数耗尽,才最终落入死信队列。这套机制还逼出了一个无法回避的配套要求:幂等。因为一旦引入了重试,同一条消息被消费多次就成了常态,再加上"业务处理成功了但 ack 还没发出去消费者就宕机"这种经典情况,消费端就必须假定每一条消息都可能被重复处理,于是用消息的全局唯一 ID 去重、甚至把这个去重直接靠数据库流水表的唯一索引来兜底,就成了必修课。这次复盘最终改变我观念的,是对"死信队列里有消息"这件事的态度。过去我会本能地觉得,死信队列里堆着消息,是系统出问题了、是一种失败。但现在我明白,恰恰相反——死信队列里有消息,说明那些原本会卡死队列或凭空消失的失败消息,被系统稳稳地接住了、留存了、并且变得可观测可处理了,这是系统健壮和成熟的标志。真正可怕的从来不是"看到失败",而是像我们出事之前那样,消息已经失败了、用户已经受影响了,而我们这套系统对此一无所知。
—— 别看了 · 2026