消费者追不上生产者:一次 Kafka 消息积压的复盘

上游促销订单量翻数倍,Kafka 消费组 lag 涨到数百万还在爬,下游报表停在两小时前。根子是消费并行度被 4 个分区锁死、单条消费 200ms。几天重构消费端:分区与消费者的铁律、扩分区扩消费者、批量消费、rebalance 与 poll 超时、积压紧急追赶与 lag 监控。

2024 年我们一条 Kafka 链路出过一次很被动的故障。这条链路负责把订单事件同步给下游的数据分析系统,平时一切风平浪静。某天上游搞了一次促销,订单量翻了好几倍,运营那边过了一两个小时跑来问:为什么报表里的数据停在两小时前?我们一看 Kafka 的监控,消费组的 lag(消息堆积量)已经涨到了几百万,而且还在以肉眼可见的速度往上爬——生产者每秒往里塞几万条,消费者每秒只能啃下来几千条,差距越拉越大。这就是典型的消息积压:消费者追不上生产者。投了几天把 Kafka 消息积压的成因和应对彻底搞清并重构了消费端,本文复盘这次实战。

问题背景

业务:订单事件经 Kafka 同步给数据分析系统
事故现象:
- 上游促销,订单量翻数倍
- 下游报表数据停在 2 小时前
- 消费组 lag 涨到数百万,且持续增长
- 生产 ~3 万条/秒,消费 ~4 千条/秒

现场排查:
# 1. 看消费组的 lag
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --describe --group order-analysis-group

TOPIC          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
order-event    0          1203440         1789002         585562
order-event    1          1198330         1788540         590210
order-event    2          1205110         1790221         585111
order-event    3          1199870         1789900         590030
# 4 个分区,每个都堆了近 60 万,合计两百多万还在涨

# 2. 看消费者实例
# 消费组里只有 2 个消费者实例,4 个分区
# -> 每个消费者要扛 2 个分区

# 3. 看单条消息的消费逻辑
@KafkaListener(topics = "order-event")
public void consume(OrderEvent event) {
    analysisService.process(event);   // 内部有 3 次 DB 查询 + 1 次 RPC
}
# 单条处理耗时约 200ms

根因:
1. 消费能力 << 生产能力,lag 持续累积
2. topic 只有 4 个分区,消费并行度被死死卡在 4
3. 单条消息处理 200ms,且是逐条串行处理
4. 没有 lag 监控告警,积压两小时才被运营发现

修复 1:先理清分区、消费者、lag 的关系

=== Kafka 消费并行度的铁律 ===
一个 topic 分成若干 partition(分区)。
一个消费组里,一个 partition 在同一时刻
【只能被一个消费者实例】消费。

由此推出一条最重要的铁律:
  一个消费组的【有效消费者数】<= topic 的【分区数】
分区数 4,你就算起 10 个消费者实例,
也只有 4 个在干活,另外 6 个【空闲待命】。

=== 我们的瓶颈算一笔账 ===
- 分区数 = 4 -> 消费并行度最多 4
- 单条处理 200ms -> 单个消费者 1 秒最多处理 5 条
- 满打满算:4 个分区 x 5 条/秒 = 20 条/秒??
  (实际我们测出来 4 千/秒,是因为有批量拉取,
   但核心结论不变:并行度被分区数锁死)
- 而生产是 3 万条/秒 -> 消费能力差了一个数量级

=== lag 是什么 ===
lag = LOG-END-OFFSET - CURRENT-OFFSET
   = 分区里最新的位置 - 消费者已经消费到的位置
   = 还有多少条没被消费
lag 持续增长 = 消费速度 < 生产速度,正在积压。
lag 稳定不变 = 两者速度持平。
lag 趋近 0    = 消费跟得上。

=== 解决积压的两个总方向 ===
方向一:提高消费并行度(更多分区 + 更多消费者)
方向二:降低单条消息的处理耗时(批量 / 异步 / 优化)
这次我们两个方向都要做。

修复 2:扩分区 + 扩消费者,把并行度提上去

# === 第一步:增加 topic 的分区数 ===
# 把 order-event 的分区数从 4 扩到 16
kafka-topics.sh --bootstrap-server localhost:9092 \
    --alter --topic order-event --partitions 16

# === 关键警告:分区数只能增、不能减 ===
# 而且扩分区会改变 key 的分区路由:
#   分区 = hash(key) % 分区数
# 分区数从 4 变 16,同一个 key 算出来的分区就变了。
# 如果你的业务【依赖"同一个 key 的消息进同一分区"
# 来保证顺序】,扩分区会打破这个顺序保证 ——
# 扩之前必须确认:这个 topic 对分区内顺序有没有要求。
# 我们这条分析链路不要求严格顺序,所以可以放心扩。

# === 第二步:把消费者实例数提到和分区数匹配 ===
# 消费者从 2 个扩到 16 个(可以是 16 个进程,
# 也可以是 4 个进程 x 每进程 4 个消费线程)。
// 用 concurrency 让单个应用起多个消费线程
@KafkaListener(
    topics = "order-event",
    groupId = "order-analysis-group",
    concurrency = "4")          // 这个实例内起 4 个消费线程
public void consume(OrderEvent event) {
    analysisService.process(event);
}
// 部署 4 个应用实例 x 每个 concurrency=4 = 16 个消费线程,
// 正好和 16 个分区一一对应,并行度拉满。

// === 注意:concurrency 不要超过分配到的分区数 ===
// 超过的那些线程会空闲,白占资源。
// 并行度的天花板,永远是分区数。

修复 3:把单条 200ms 的消费逻辑打下来

// === 优化 1:批量消费,批量处理 ===
// 原来逐条 process,每条 3 次 DB 查询。
// 改成一次拉一批,批量查 DB —— 把 N 次查询合并成 1 次。
@KafkaListener(topics = "order-event", batch = "true")
public void consumeBatch(List events) {
    // 批量取出所有要查的 id,一次性查回来
    List ids = events.stream()
            .map(OrderEvent::getOrderId).collect(toList());
    Map details = orderMapper.batchSelect(ids);
    // 批量组装、批量写入分析库
    List records = events.stream()
            .map(e -> buildRecord(e, details.get(e.getOrderId())))
            .collect(toList());
    analysisMapper.batchInsert(records);
}
// 100 条消息,原来 300 次 DB 查询,现在 1~2 次。
// 这是把"消费慢"打下来最有效的一招。

// === 优化 2:消费内的慢操作异步化 ===
// 那次 RPC 调用如果不影响主流程,丢进线程池异步做,
// 不要让它阻塞消费主线程的 poll 节奏。

// === 优化 3:消费线程只做"快活",重活交给工作线程池 ===
// 消费线程负责快速 poll 消息、丢进队列就返回;
// 真正的处理由一个独立的业务线程池并发执行。
// 但这样要自己管好 offset 提交时机(处理完才提交),
// 否则消息会丢。Spring Kafka 的 batch + 手动 ack 更稳妥。

// === 优化 4:别在消费里做能离线做的事 ===
// 重新审视 analysisService.process 里每一步:
// 哪些是必须实时的?哪些其实可以攒着批量离线算?
// 把非实时的逻辑移出消费链路,消费自然就轻了。

修复 4:警惕 rebalance 和 poll 超时

=== 一个会让积压雪上加霜的坑:频繁 rebalance ===
消费组里只要有消费者"加入"或"离开",就会触发
rebalance —— 所有分区重新分配。rebalance 期间,
整个消费组【停止消费】。频繁 rebalance = 消费时断时续。

=== 什么会导致消费者被判定"离开" ===
Kafka 靠两个机制判断消费者是否还活着:
1. 心跳:后台线程定期发心跳,超过 session.timeout.ms
   没心跳 -> 认为消费者挂了,踢出组
2. poll 间隔:两次 poll 之间超过 max.poll.interval.ms
   -> 认为这个消费者"处理不过来 / 卡死了",踢出组

=== 我们差点踩的坑 ===
改成批量消费后,一次 poll 拉 500 条,每条 200ms,
处理一整批要 100 秒,远超默认的
max.poll.interval.ms(5 分钟)边界值。
一旦处理超时,消费者被踢 -> rebalance ->
这批消息没提交 offset,被重新分配给别人再消费一遍
-> 重复消费 + 消费更慢 -> 恶性循环。

=== 正确的参数配合 ===
max.poll.records:一次 poll 拉多少条 —— 调小它,
   让"一批的处理时间"控制在 max.poll.interval.ms 内
max.poll.interval.ms:两次 poll 最大间隔 ——
   若单批确实耗时长,适当调大它
原则:max.poll.records x 单条耗时 << max.poll.interval.ms

=== 还有:消费端必须幂等 ===
rebalance、重启、超时重投,都会导致消息重复消费。
用消息里的唯一 id 去重,保证重复消费不产生脏数据。

修复 5:积压已经发生,怎么快速追回来

// === 场景:lag 已经几百万,光优化代码追得太慢 ===
// 需要一个"紧急追赶"的临时方案。

// === 方案 1:临时扩分区 + 临时扩消费者(首选)===
// 前面修复 2 做的就是这个。扩到 16 分区 16 消费者后,
// 消费速度数倍提升,lag 会肉眼可见地往下掉。
// 这是最干净的方案 —— 前提是你能接受扩分区。

// === 方案 2:开一个"临时消费组"专门追旧数据 ===
// 如果分区数不能动,可以另起一个消费组,
// 它和原消费组各自独立消费、互不影响 offset。
// 让临时组用更多机器、更"糙快"的逻辑专门消化积压,
// 原组继续消费新数据。两边数据最后合并。

// === 方案 3:中转——快搬运,慢处理 ===
// 起一批"搬运工"消费者,它们什么业务都不做,
// 只飞快地把积压消息原样转发到另一个 topic,
// 这个 topic 分区数开得很大,再由大量消费者慢慢处理。
// 本质是用一次"无脑转发"解开分区数的并行度枷锁。

// === 方案 4:评估能否"跳过" ===
// 极端情况下,如果积压的是时效性很强、过期就没用的
// 消息(如实时推荐、监控打点),且业务能接受丢弃,
// 可以直接把 offset 重置到最新位置:
// kafka-consumer-groups.sh --reset-offsets --to-latest
//   --group xxx --topic xxx --execute
// 【高危操作】:等于主动丢弃所有积压消息,
// 必须业务明确确认这些数据可以不要,才能做。

// === 我们的实战选择 ===
// 用方案 1(扩分区扩消费者)+ 修复 3(批量消费),
// lag 从 200 多万,在约 40 分钟内追平到接近 0。

修复 6:把积压挡在发生之前

=== 第一位的是监控告警 ===
这次最大的教训是:积压两小时,我们自己不知道,
是运营发现报表不动了才告诉我们的。
必须监控消费组的 lag:
1. 用工具(kafka-exporter + Prometheus + Grafana)
   把每个消费组、每个分区的 lag 采集成曲线
2. 设两级告警:
   - lag 超过阈值(如 10 万)-> 预警
   - lag 持续增长 N 分钟 -> 严重告警
3. 同时监控"消费速率"和"生产速率"两条曲线,
   它俩一旦持续分叉,就是积压的最早信号

=== 容量上预留余量 ===
1. 分区数别卡着当前流量设计,要按"未来峰值"预留。
   分区数能加不能减,宁可一开始多设一点。
2. 消费者要能快速水平扩容 —— 提前确认扩容到
   "分区数"这个上限时,机器资源是够的。

=== 给生产端也加上保护 ===
1. 若下游消费能力有明确上限,生产端要能限流,
   别让上游一个促销就把下游冲垮。
2. 区分消息的重要性:核心消息和非核心消息
   分不同 topic,积压时优先保核心。

=== 定期做消费能力压测 ===
知道自己这条链路"最多能消费多少条/秒",
拿它和"业务峰值预估"对比,差距不够就提前扩容,
而不是等真出事了手忙脚乱。

优化效果

指标                      治理前              治理后
=============================================================
topic 分区数              4                   16
消费者实例(线程)         2                   16
单条/单批处理             逐条 200ms           批量,摊薄到每条 ~8ms
消费速率                  ~4 千条/秒           ~5 万条/秒
积压追赶                  追不上,持续增长     200万 lag 约 40 分钟追平
DB 访问                   每条 3 次查询        每批 1~2 次批量查询
rebalance                 改批量后险些频发     调好 max.poll,稳定
消费端幂等                无,重投会重复       msgId 去重,幂等
lag 监控告警              无,靠人肉发现       Prometheus 两级告警

治理过程:
- 定位积压根因(并行度 + 单条耗时):0.5 天
- 扩分区 + 消费者扩容:1 天
- 批量消费改造 + DB 批量化:1.5 天
- max.poll 参数调优 + 幂等改造:1 天
- lag 监控告警 + 消费能力压测:1 天

避坑清单

  1. 一个分区同一时刻只能被消费组里一个消费者消费,消费并行度上限就是分区数
  2. 消费者实例数超过分区数,多出来的实例会空闲,扩容前先确认分区数够不够
  3. lag = 日志末尾 offset - 已消费 offset,lag 持续增长就是消费跟不上生产
  4. 分区数只能增不能减,扩分区会改变 key 的分区路由,依赖分区内顺序的要谨慎
  5. 降低单条耗时最有效的是批量消费,把 N 次 DB 查询合并成一次批量查询
  6. 一批的处理时间必须远小于 max.poll.interval.ms,否则消费者被踢引发 rebalance
  7. rebalance 期间整个消费组停止消费,频繁 rebalance 会让积压雪上加霜
  8. rebalance、重启、超时重投都会导致重复消费,消费端必须用唯一 id 做幂等
  9. 积压紧急追赶:扩分区扩消费者为首选,也可临时消费组、中转 topic 解并行度枷锁
  10. 必须监控消费组 lag 并设两级告警,容量上分区数按未来峰值预留,别卡当前流量

总结

这次 Kafka 消息积压的故障,给我最直接的一个教训是关于"发现"的——我们的链路积压了整整两个小时,而我们这些维护它的人,自始至终毫不知情,最后是运营看着报表数据停摆,跑过来才把我们点醒。一个系统出了问题,你却要靠业务方来通知你,这本身就是监控的彻底失职。所以这次复盘我把"监控消费组的 lag"放在了所有改进措施的第一位:lag 这个数字,本质上就是 Kafka 替你算好的、"消费者还差多少才追上生产者"的距离,它持续往上涨,就是积压正在发生的最直白、最早的信号,根本不需要等到下游数据停摆才看得出来。把 lag 采集成曲线、配上两级告警,这条链路才算真正长出了"自己会喊疼"的神经。理解了 lag 之后,整个积压问题的脉络其实非常清晰:积压,就是消费速度长期低于生产速度,二者之差日积月累的结果。那么解法也就只有两个大方向——要么提高消费的速度,要么……没有别的了,就是提高消费速度,只不过提高的手段分两条路。第一条路是提高并行度,也就是让更多的消费者同时干活。但这里有一条 Kafka 的铁律必须先刻进脑子:在同一个消费组里,一个分区在同一时刻只能被一个消费者消费。这条铁律推导出一个残酷的结论——你的消费并行度,有一个写死的天花板,就是这个 topic 的分区数。我们最初的链路只有 4 个分区,这意味着无论我们往消费组里塞多少台机器,最多也只有 4 个消费者在真正工作,其余的全在旁边干瞪眼。所以提并行度这条路,必须分区和消费者一起扩,缺一不可。扩分区的时候有个细节差点让我栽跟头:分区数只能增不能减,而且增加分区会改变消息按 key 路由的结果,如果你的业务依赖"同一个 key 的消息落在同一分区"来保证局部顺序,扩分区就会悄悄打破这个顺序保证——好在我们这条分析链路不要求严格顺序,才得以放心地扩。第二条路是降低单条消息的处理耗时。我们最初的消费逻辑是逐条处理,每条消息内部要做三次数据库查询,单条就耗掉 200 毫秒,这种"零售式"的处理方式,在大流量面前是必然要被冲垮的。改造的核心是从"零售"切换到"批发"——一次拉取一批消息,把这一批里所有需要查的数据库记录,合并成一两次批量查询拿回来,再批量地组装、批量地写出去。这一下,原本一百条消息要打三百次数据库的开销,被压缩成了一两次,单条消息被摊薄后的处理成本直接掉了一个数量级。不过批量消费也带来了一个新的、很隐蔽的风险,我必须把它单独拎出来强调:一旦改成批量,一次 poll 拉回来一大批消息,这一整批处理完所需要的总时间,绝对不能超过 Kafka 的 max.poll.interval.ms 这个阈值,否则 Kafka 会认定这个消费者"卡死了",把它一脚踢出消费组,触发 rebalance。而 rebalance 期间整个消费组是停止消费的,被踢的那个消费者还没提交 offset,它处理到一半的那批消息会被重新分配给别人从头再消费一遍——消费变慢、又叠加重复消费,这就形成了一个会把积压越拖越深的恶性循环。所以批量消费一定要让"单批处理时间"和 max.poll.records、max.poll.interval.ms 这几个参数严丝合缝地配合好,同时消费端必须老老实实做好幂等,因为 rebalance、重启、超时重投这些情况下,消息被重复消费几乎是必然会发生的。回头看,这次积压留给我最深的体会是:消息队列从来不是一根可以无限吞咽的管子,它是一个有明确吞吐上限的系统,而这个上限,在你创建 topic、定下分区数的那一刻,就已经被悄悄写好了一半。平时流量平稳,你感觉不到这个上限的存在,可一旦上游来一次促销、来一波峰值,这个上限就会毫不留情地现出原形,把消费者死死按在地上。真正成熟的做法,不是等积压发生了再手忙脚乱地扩容追赶,而是平时就清楚地知道自己这条链路的消费能力上限是多少,把它和业务可能的峰值认真地比一比,在分区数上预留出足够的余量,在监控上装好能提前预警的探针。让系统自己在距离危险还很远的时候就开口提醒你,而不是让运营拿着一张停摆的报表来通知你——这,才是这次故障真正想教给我的东西。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

加了索引还是全表扫描:一次 MySQL 索引失效排查的复盘

2026-5-20 16:47:24

技术教程

线上内存慢慢涨到 OOM:一次 ThreadLocal 内存泄漏的复盘

2026-5-20 16:53:27

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索