2024 年我们一条 Kafka 链路出过一次很被动的故障。这条链路负责把订单事件同步给下游的数据分析系统,平时一切风平浪静。某天上游搞了一次促销,订单量翻了好几倍,运营那边过了一两个小时跑来问:为什么报表里的数据停在两小时前?我们一看 Kafka 的监控,消费组的 lag(消息堆积量)已经涨到了几百万,而且还在以肉眼可见的速度往上爬——生产者每秒往里塞几万条,消费者每秒只能啃下来几千条,差距越拉越大。这就是典型的消息积压:消费者追不上生产者。投了几天把 Kafka 消息积压的成因和应对彻底搞清并重构了消费端,本文复盘这次实战。
问题背景
业务:订单事件经 Kafka 同步给数据分析系统
事故现象:
- 上游促销,订单量翻数倍
- 下游报表数据停在 2 小时前
- 消费组 lag 涨到数百万,且持续增长
- 生产 ~3 万条/秒,消费 ~4 千条/秒
现场排查:
# 1. 看消费组的 lag
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-analysis-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-event 0 1203440 1789002 585562
order-event 1 1198330 1788540 590210
order-event 2 1205110 1790221 585111
order-event 3 1199870 1789900 590030
# 4 个分区,每个都堆了近 60 万,合计两百多万还在涨
# 2. 看消费者实例
# 消费组里只有 2 个消费者实例,4 个分区
# -> 每个消费者要扛 2 个分区
# 3. 看单条消息的消费逻辑
@KafkaListener(topics = "order-event")
public void consume(OrderEvent event) {
analysisService.process(event); // 内部有 3 次 DB 查询 + 1 次 RPC
}
# 单条处理耗时约 200ms
根因:
1. 消费能力 << 生产能力,lag 持续累积
2. topic 只有 4 个分区,消费并行度被死死卡在 4
3. 单条消息处理 200ms,且是逐条串行处理
4. 没有 lag 监控告警,积压两小时才被运营发现
修复 1:先理清分区、消费者、lag 的关系
=== Kafka 消费并行度的铁律 ===
一个 topic 分成若干 partition(分区)。
一个消费组里,一个 partition 在同一时刻
【只能被一个消费者实例】消费。
由此推出一条最重要的铁律:
一个消费组的【有效消费者数】<= topic 的【分区数】
分区数 4,你就算起 10 个消费者实例,
也只有 4 个在干活,另外 6 个【空闲待命】。
=== 我们的瓶颈算一笔账 ===
- 分区数 = 4 -> 消费并行度最多 4
- 单条处理 200ms -> 单个消费者 1 秒最多处理 5 条
- 满打满算:4 个分区 x 5 条/秒 = 20 条/秒??
(实际我们测出来 4 千/秒,是因为有批量拉取,
但核心结论不变:并行度被分区数锁死)
- 而生产是 3 万条/秒 -> 消费能力差了一个数量级
=== lag 是什么 ===
lag = LOG-END-OFFSET - CURRENT-OFFSET
= 分区里最新的位置 - 消费者已经消费到的位置
= 还有多少条没被消费
lag 持续增长 = 消费速度 < 生产速度,正在积压。
lag 稳定不变 = 两者速度持平。
lag 趋近 0 = 消费跟得上。
=== 解决积压的两个总方向 ===
方向一:提高消费并行度(更多分区 + 更多消费者)
方向二:降低单条消息的处理耗时(批量 / 异步 / 优化)
这次我们两个方向都要做。
修复 2:扩分区 + 扩消费者,把并行度提上去
# === 第一步:增加 topic 的分区数 ===
# 把 order-event 的分区数从 4 扩到 16
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic order-event --partitions 16
# === 关键警告:分区数只能增、不能减 ===
# 而且扩分区会改变 key 的分区路由:
# 分区 = hash(key) % 分区数
# 分区数从 4 变 16,同一个 key 算出来的分区就变了。
# 如果你的业务【依赖"同一个 key 的消息进同一分区"
# 来保证顺序】,扩分区会打破这个顺序保证 ——
# 扩之前必须确认:这个 topic 对分区内顺序有没有要求。
# 我们这条分析链路不要求严格顺序,所以可以放心扩。
# === 第二步:把消费者实例数提到和分区数匹配 ===
# 消费者从 2 个扩到 16 个(可以是 16 个进程,
# 也可以是 4 个进程 x 每进程 4 个消费线程)。
// 用 concurrency 让单个应用起多个消费线程
@KafkaListener(
topics = "order-event",
groupId = "order-analysis-group",
concurrency = "4") // 这个实例内起 4 个消费线程
public void consume(OrderEvent event) {
analysisService.process(event);
}
// 部署 4 个应用实例 x 每个 concurrency=4 = 16 个消费线程,
// 正好和 16 个分区一一对应,并行度拉满。
// === 注意:concurrency 不要超过分配到的分区数 ===
// 超过的那些线程会空闲,白占资源。
// 并行度的天花板,永远是分区数。
修复 3:把单条 200ms 的消费逻辑打下来
// === 优化 1:批量消费,批量处理 ===
// 原来逐条 process,每条 3 次 DB 查询。
// 改成一次拉一批,批量查 DB —— 把 N 次查询合并成 1 次。
@KafkaListener(topics = "order-event", batch = "true")
public void consumeBatch(List events) {
// 批量取出所有要查的 id,一次性查回来
List ids = events.stream()
.map(OrderEvent::getOrderId).collect(toList());
Map details = orderMapper.batchSelect(ids);
// 批量组装、批量写入分析库
List records = events.stream()
.map(e -> buildRecord(e, details.get(e.getOrderId())))
.collect(toList());
analysisMapper.batchInsert(records);
}
// 100 条消息,原来 300 次 DB 查询,现在 1~2 次。
// 这是把"消费慢"打下来最有效的一招。
// === 优化 2:消费内的慢操作异步化 ===
// 那次 RPC 调用如果不影响主流程,丢进线程池异步做,
// 不要让它阻塞消费主线程的 poll 节奏。
// === 优化 3:消费线程只做"快活",重活交给工作线程池 ===
// 消费线程负责快速 poll 消息、丢进队列就返回;
// 真正的处理由一个独立的业务线程池并发执行。
// 但这样要自己管好 offset 提交时机(处理完才提交),
// 否则消息会丢。Spring Kafka 的 batch + 手动 ack 更稳妥。
// === 优化 4:别在消费里做能离线做的事 ===
// 重新审视 analysisService.process 里每一步:
// 哪些是必须实时的?哪些其实可以攒着批量离线算?
// 把非实时的逻辑移出消费链路,消费自然就轻了。
修复 4:警惕 rebalance 和 poll 超时
=== 一个会让积压雪上加霜的坑:频繁 rebalance ===
消费组里只要有消费者"加入"或"离开",就会触发
rebalance —— 所有分区重新分配。rebalance 期间,
整个消费组【停止消费】。频繁 rebalance = 消费时断时续。
=== 什么会导致消费者被判定"离开" ===
Kafka 靠两个机制判断消费者是否还活着:
1. 心跳:后台线程定期发心跳,超过 session.timeout.ms
没心跳 -> 认为消费者挂了,踢出组
2. poll 间隔:两次 poll 之间超过 max.poll.interval.ms
-> 认为这个消费者"处理不过来 / 卡死了",踢出组
=== 我们差点踩的坑 ===
改成批量消费后,一次 poll 拉 500 条,每条 200ms,
处理一整批要 100 秒,远超默认的
max.poll.interval.ms(5 分钟)边界值。
一旦处理超时,消费者被踢 -> rebalance ->
这批消息没提交 offset,被重新分配给别人再消费一遍
-> 重复消费 + 消费更慢 -> 恶性循环。
=== 正确的参数配合 ===
max.poll.records:一次 poll 拉多少条 —— 调小它,
让"一批的处理时间"控制在 max.poll.interval.ms 内
max.poll.interval.ms:两次 poll 最大间隔 ——
若单批确实耗时长,适当调大它
原则:max.poll.records x 单条耗时 << max.poll.interval.ms
=== 还有:消费端必须幂等 ===
rebalance、重启、超时重投,都会导致消息重复消费。
用消息里的唯一 id 去重,保证重复消费不产生脏数据。
修复 5:积压已经发生,怎么快速追回来
// === 场景:lag 已经几百万,光优化代码追得太慢 ===
// 需要一个"紧急追赶"的临时方案。
// === 方案 1:临时扩分区 + 临时扩消费者(首选)===
// 前面修复 2 做的就是这个。扩到 16 分区 16 消费者后,
// 消费速度数倍提升,lag 会肉眼可见地往下掉。
// 这是最干净的方案 —— 前提是你能接受扩分区。
// === 方案 2:开一个"临时消费组"专门追旧数据 ===
// 如果分区数不能动,可以另起一个消费组,
// 它和原消费组各自独立消费、互不影响 offset。
// 让临时组用更多机器、更"糙快"的逻辑专门消化积压,
// 原组继续消费新数据。两边数据最后合并。
// === 方案 3:中转——快搬运,慢处理 ===
// 起一批"搬运工"消费者,它们什么业务都不做,
// 只飞快地把积压消息原样转发到另一个 topic,
// 这个 topic 分区数开得很大,再由大量消费者慢慢处理。
// 本质是用一次"无脑转发"解开分区数的并行度枷锁。
// === 方案 4:评估能否"跳过" ===
// 极端情况下,如果积压的是时效性很强、过期就没用的
// 消息(如实时推荐、监控打点),且业务能接受丢弃,
// 可以直接把 offset 重置到最新位置:
// kafka-consumer-groups.sh --reset-offsets --to-latest
// --group xxx --topic xxx --execute
// 【高危操作】:等于主动丢弃所有积压消息,
// 必须业务明确确认这些数据可以不要,才能做。
// === 我们的实战选择 ===
// 用方案 1(扩分区扩消费者)+ 修复 3(批量消费),
// lag 从 200 多万,在约 40 分钟内追平到接近 0。
修复 6:把积压挡在发生之前
=== 第一位的是监控告警 ===
这次最大的教训是:积压两小时,我们自己不知道,
是运营发现报表不动了才告诉我们的。
必须监控消费组的 lag:
1. 用工具(kafka-exporter + Prometheus + Grafana)
把每个消费组、每个分区的 lag 采集成曲线
2. 设两级告警:
- lag 超过阈值(如 10 万)-> 预警
- lag 持续增长 N 分钟 -> 严重告警
3. 同时监控"消费速率"和"生产速率"两条曲线,
它俩一旦持续分叉,就是积压的最早信号
=== 容量上预留余量 ===
1. 分区数别卡着当前流量设计,要按"未来峰值"预留。
分区数能加不能减,宁可一开始多设一点。
2. 消费者要能快速水平扩容 —— 提前确认扩容到
"分区数"这个上限时,机器资源是够的。
=== 给生产端也加上保护 ===
1. 若下游消费能力有明确上限,生产端要能限流,
别让上游一个促销就把下游冲垮。
2. 区分消息的重要性:核心消息和非核心消息
分不同 topic,积压时优先保核心。
=== 定期做消费能力压测 ===
知道自己这条链路"最多能消费多少条/秒",
拿它和"业务峰值预估"对比,差距不够就提前扩容,
而不是等真出事了手忙脚乱。
优化效果
指标 治理前 治理后
=============================================================
topic 分区数 4 16
消费者实例(线程) 2 16
单条/单批处理 逐条 200ms 批量,摊薄到每条 ~8ms
消费速率 ~4 千条/秒 ~5 万条/秒
积压追赶 追不上,持续增长 200万 lag 约 40 分钟追平
DB 访问 每条 3 次查询 每批 1~2 次批量查询
rebalance 改批量后险些频发 调好 max.poll,稳定
消费端幂等 无,重投会重复 msgId 去重,幂等
lag 监控告警 无,靠人肉发现 Prometheus 两级告警
治理过程:
- 定位积压根因(并行度 + 单条耗时):0.5 天
- 扩分区 + 消费者扩容:1 天
- 批量消费改造 + DB 批量化:1.5 天
- max.poll 参数调优 + 幂等改造:1 天
- lag 监控告警 + 消费能力压测:1 天
避坑清单
- 一个分区同一时刻只能被消费组里一个消费者消费,消费并行度上限就是分区数
- 消费者实例数超过分区数,多出来的实例会空闲,扩容前先确认分区数够不够
- lag = 日志末尾 offset - 已消费 offset,lag 持续增长就是消费跟不上生产
- 分区数只能增不能减,扩分区会改变 key 的分区路由,依赖分区内顺序的要谨慎
- 降低单条耗时最有效的是批量消费,把 N 次 DB 查询合并成一次批量查询
- 一批的处理时间必须远小于 max.poll.interval.ms,否则消费者被踢引发 rebalance
- rebalance 期间整个消费组停止消费,频繁 rebalance 会让积压雪上加霜
- rebalance、重启、超时重投都会导致重复消费,消费端必须用唯一 id 做幂等
- 积压紧急追赶:扩分区扩消费者为首选,也可临时消费组、中转 topic 解并行度枷锁
- 必须监控消费组 lag 并设两级告警,容量上分区数按未来峰值预留,别卡当前流量
总结
这次 Kafka 消息积压的故障,给我最直接的一个教训是关于"发现"的——我们的链路积压了整整两个小时,而我们这些维护它的人,自始至终毫不知情,最后是运营看着报表数据停摆,跑过来才把我们点醒。一个系统出了问题,你却要靠业务方来通知你,这本身就是监控的彻底失职。所以这次复盘我把"监控消费组的 lag"放在了所有改进措施的第一位:lag 这个数字,本质上就是 Kafka 替你算好的、"消费者还差多少才追上生产者"的距离,它持续往上涨,就是积压正在发生的最直白、最早的信号,根本不需要等到下游数据停摆才看得出来。把 lag 采集成曲线、配上两级告警,这条链路才算真正长出了"自己会喊疼"的神经。理解了 lag 之后,整个积压问题的脉络其实非常清晰:积压,就是消费速度长期低于生产速度,二者之差日积月累的结果。那么解法也就只有两个大方向——要么提高消费的速度,要么……没有别的了,就是提高消费速度,只不过提高的手段分两条路。第一条路是提高并行度,也就是让更多的消费者同时干活。但这里有一条 Kafka 的铁律必须先刻进脑子:在同一个消费组里,一个分区在同一时刻只能被一个消费者消费。这条铁律推导出一个残酷的结论——你的消费并行度,有一个写死的天花板,就是这个 topic 的分区数。我们最初的链路只有 4 个分区,这意味着无论我们往消费组里塞多少台机器,最多也只有 4 个消费者在真正工作,其余的全在旁边干瞪眼。所以提并行度这条路,必须分区和消费者一起扩,缺一不可。扩分区的时候有个细节差点让我栽跟头:分区数只能增不能减,而且增加分区会改变消息按 key 路由的结果,如果你的业务依赖"同一个 key 的消息落在同一分区"来保证局部顺序,扩分区就会悄悄打破这个顺序保证——好在我们这条分析链路不要求严格顺序,才得以放心地扩。第二条路是降低单条消息的处理耗时。我们最初的消费逻辑是逐条处理,每条消息内部要做三次数据库查询,单条就耗掉 200 毫秒,这种"零售式"的处理方式,在大流量面前是必然要被冲垮的。改造的核心是从"零售"切换到"批发"——一次拉取一批消息,把这一批里所有需要查的数据库记录,合并成一两次批量查询拿回来,再批量地组装、批量地写出去。这一下,原本一百条消息要打三百次数据库的开销,被压缩成了一两次,单条消息被摊薄后的处理成本直接掉了一个数量级。不过批量消费也带来了一个新的、很隐蔽的风险,我必须把它单独拎出来强调:一旦改成批量,一次 poll 拉回来一大批消息,这一整批处理完所需要的总时间,绝对不能超过 Kafka 的 max.poll.interval.ms 这个阈值,否则 Kafka 会认定这个消费者"卡死了",把它一脚踢出消费组,触发 rebalance。而 rebalance 期间整个消费组是停止消费的,被踢的那个消费者还没提交 offset,它处理到一半的那批消息会被重新分配给别人从头再消费一遍——消费变慢、又叠加重复消费,这就形成了一个会把积压越拖越深的恶性循环。所以批量消费一定要让"单批处理时间"和 max.poll.records、max.poll.interval.ms 这几个参数严丝合缝地配合好,同时消费端必须老老实实做好幂等,因为 rebalance、重启、超时重投这些情况下,消息被重复消费几乎是必然会发生的。回头看,这次积压留给我最深的体会是:消息队列从来不是一根可以无限吞咽的管子,它是一个有明确吞吐上限的系统,而这个上限,在你创建 topic、定下分区数的那一刻,就已经被悄悄写好了一半。平时流量平稳,你感觉不到这个上限的存在,可一旦上游来一次促销、来一波峰值,这个上限就会毫不留情地现出原形,把消费者死死按在地上。真正成熟的做法,不是等积压发生了再手忙脚乱地扩容追赶,而是平时就清楚地知道自己这条链路的消费能力上限是多少,把它和业务可能的峰值认真地比一比,在分区数上预留出足够的余量,在监控上装好能提前预警的探针。让系统自己在距离危险还很远的时候就开口提醒你,而不是让运营拿着一张停摆的报表来通知你——这,才是这次故障真正想教给我的东西。
—— 别看了 · 2026