2024 年我们一个订单消息的消费链路出过一次事故:大促那天,Kafka 的 consumer lag(消费滞后)从平时的几百条,一路涨到几百万条,下游依赖这条消息流的履约系统延迟了好几个小时。我们临时扩了消费者实例,lag 降下去了,可没过两天又冒出新问题——有用户收到了两条一模一样的发货短信,排查发现是消费端在处理消息时重启,offset 没来得及提交,重启后又把同一批消息消费了一遍。一边是消费太慢积压,一边是消费"过头"重复,这两个问题几乎是 Kafka 消费端最常见的一对。投了几天做消息消费专项治理,本文复盘这次实战。
问题背景
业务:订单消息 -> Kafka -> 履约系统消费,Kafka 2.8,单 topic 12 分区
事故现象:
- 大促期间 consumer lag 从几百涨到 300 万+,履约延迟数小时
- 扩容后 lag 降了,但出现"重复发货短信"——同一消息被消费两次
- 偶尔还有 rebalance 风暴,消费者频繁掉线重连
现场排查:
# 1. 看消费组的 lag
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group fulfill-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order 0 1203400 1452100 248700
order 1 1198200 1450300 252100
... # 每个分区都积压 20 万+,总 lag 300 万+
# 2. 看消费端代码
@KafkaListener(topics = "order")
public void onMessage(ConsumerRecord r) {
fulfillService.handle(parse(r.value())); // 单条处理,约 200ms
}
# 单线程单条消费,200ms 一条 -> 单实例 5 条/秒,12 分区也就 60 条/秒
# 而大促时生产端 QPS 是 3000+ —— 消费速度根本追不上
# 3. 重复消费的现场:
# enable.auto.commit=true,auto.commit.interval.ms=5000
# 消费者每 5 秒自动提交一次 offset
# 处理到一半实例被 kill -> 这 5 秒内已处理的消息 offset 没提交
# 重启 / rebalance 后,这批消息被重新拉取、重新消费
根因:
1. 消费能力不足:单线程单条消费,吞吐远低于生产速度 -> 积压
2. offset 自动提交,与"消息真正处理完"不同步 -> 重复消费
3. 消费端业务不是幂等的 -> 重复消费直接变成重复发短信
4. 消费者处理太慢,超过 max.poll.interval.ms -> 被踢出组 -> rebalance
修复 1:看懂 consumer lag 与消费模型
=== consumer lag:消费滞后了多少条 ===
LAG = LOG-END-OFFSET - CURRENT-OFFSET
= 分区里最新的消息位置 - 消费组已经消费到的位置
lag 持续增大 = 生产速度 > 消费速度,消息在堆积。
=== Kafka 消费的几个核心概念 ===
- 一个 topic 分成多个 partition(分区),消息在分区内有序
- 一个 consumer group(消费组)里的多个 consumer 分摊这些分区
- 关键规则:一个分区,同一时刻只能被组内【一个】consumer 消费
-> 所以【消费组里 consumer 数量 <= 分区数】才有意义
-> 12 个分区,最多 12 个 consumer 能并行,第 13 个只能空转
=== 提升消费速度的两个方向 ===
方向 1:加分区 + 加 consumer 实例(横向扩展并行度)
前提是分区数要够,12 分区就最多 12 个实例并行
方向 2:单个 consumer 内部再做并发(把拉到的一批消息丢线程池)
不增加分区也能提速,但要小心顺序和 offset 提交
=== 一个常见误区 ===
"我加了 20 个消费者实例,怎么没快多少?"
—— 因为只有 12 个分区,多出来的 8 个实例分不到分区,纯属空转。
扩消费者之前,先确认分区数够不够。
修复 2:重复消费 —— offset 提交与幂等
// === 问题根源:自动提交 offset,与"消息处理完成"不同步 ===
// enable.auto.commit=true 时,consumer 每隔几秒自动提交一次 offset,
// 它提交的是"已经 poll 出来的"位置,不管你处理完没有。
// 处理到一半崩溃 -> 已提交的部分丢失进度 / 未提交的部分重复消费。
// === 改 1:关闭自动提交,处理完再手动提交 ===
// 配置:enable.auto.commit=false
@KafkaListener(topics = "order")
public void onMessage(ConsumerRecord r,
Acknowledgment ack) {
fulfillService.handle(parse(r.value())); // 1. 先把消息处理完
ack.acknowledge(); // 2. 再手动提交 offset
// 这样保证:offset 提交了 == 消息一定处理完了
}
// 注意:这是"至少一次(at-least-once)"语义 ——
// 处理完了、提交前崩溃,消息仍会重复。所以幂等是【必须】的。
// === 改 2:消费端做幂等,这是重复消费的终极解法 ===
// 不管 Kafka 怎么重投,业务只要保证"同一条消息处理多次 == 处理一次"。
// 方案 A:每条消息带唯一 msgId,处理前先查"处理过没"
@KafkaListener(topics = "order")
public void onMessage(ConsumerRecord r, Acknowledgment ack) {
OrderMsg msg = parse(r.value());
String msgId = msg.getMsgId();
// 用 Redis SETNX 抢一个"处理标记",抢不到说明已处理过
Boolean first = redis.setIfAbsent("msg:" + msgId, "1", 24, HOURS);
if (Boolean.FALSE.equals(first)) {
ack.acknowledge(); // 已处理过,直接确认跳过
return;
}
fulfillService.handle(msg);
ack.acknowledge();
}
// 方案 B:利用数据库唯一约束做幂等(更可靠,不怕 Redis 丢)
// 履约表上对 msg_id 建唯一索引
public void handle(OrderMsg msg) {
try {
fulfillMapper.insert(msg.getMsgId(), ...); // msg_id 唯一索引
} catch (DuplicateKeyException e) {
log.info("消息已处理过,幂等跳过: {}", msg.getMsgId());
return; // 重复,安全跳过
}
doFulfill(msg);
}
// 经验:不要试图让 Kafka "恰好一次",成本极高;
// 老老实实"至少一次 + 消费端幂等",简单且可靠。
修复 3:消息丢失 —— 自动提交的另一面
// === 自动提交不仅会重复,还可能丢消息 ===
// 场景:poll 出 100 条,自动提交先把 offset 提交了(提交到第 100 条),
// 然后线程池异步处理这 100 条,处理到第 30 条时实例崩溃 ——
// offset 已经停在 100,重启后从 101 开始,31~100 这 70 条【永久丢失】。
// === 防丢失:offset 必须在消息"确实处理完"之后才提交 ===
// 关闭自动提交,手动提交,且严格遵守"先处理、后提交"的顺序。
// === 生产端也要配合,否则消息根本没到 Kafka ===
// application.yml (producer)
// acks=all 所有 ISR 副本都写入才算成功(最高可靠)
// retries=3 发送失败自动重试
// 错误示范:acks=0 / acks=1,leader 还没同步给副本就宕机 -> 丢消息
// === 完整的不丢链路 ===
// 生产端:acks=all + retries -> 保证消息到了 Kafka
// Kafka :replication-factor>=3 + min.insync.replicas=2 -> 副本不丢
// 消费端:手动提交 offset,处理完才提交 -> 保证不漏处理
// === 关键认知 ===
// "消息不丢" 和 "消息不重复" 往往不能同时低成本满足:
// - 想绝对不丢:offset 提交要晚(处理完再提交)-> 可能重复
// - 想绝对不重:offset 提交要早 -> 可能丢
// 工程上的标准选择:选"不丢 + 可能重复",再用幂等消化掉重复。
@KafkaListener(topics = "order")
public void onMessage(ConsumerRecord r, Acknowledgment ack) {
try {
fulfillService.handle(parse(r.value())); // 处理
ack.acknowledge(); // 成功才提交 -> 不丢
} catch (Exception e) {
log.error("消费失败,不提交 offset,等待重试", e);
// 不 ack,这条消息下次还会被拉到,配合幂等,安全
}
}
修复 4:提升消费吞吐
// === 方案 A:批量消费,一次 poll 处理一批 ===
// 配置:max.poll.records=500(一次最多拉 500 条)
// 并开启 batch listener
@KafkaListener(topics = "order", batch = "true")
public void onBatch(List> records,
Acknowledgment ack) {
// 批量处理:比如 500 条订单一次性批量写库,
// 把 500 次单条 insert 合并成 1 次 batch insert,吞吐量级提升
List msgs = records.stream().map(r -> parse(r.value())).toList();
fulfillService.handleBatch(msgs); // 批量写库 / 批量调用
ack.acknowledge(); // 整批处理完再提交
}
// === 方案 B:单 consumer 内部用线程池并发处理 ===
// 拉到一批后,丢给线程池并发跑,但要小心 offset 提交时机
private final ExecutorService pool = Executors.newFixedThreadPool(16);
@KafkaListener(topics = "order", batch = "true")
public void onBatchConcurrent(List> records,
Acknowledgment ack) {
List> futures = records.stream()
.map(r -> CompletableFuture.runAsync(
() -> fulfillService.handle(parse(r.value())), pool))
.toList();
// 必须等这一整批都处理完,才能提交 offset
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
ack.acknowledge();
}
// ⚠ 并发处理打破了分区内的顺序性。
// 若业务要求"同一订单的消息按序处理",要按订单 id 取模分发到固定线程,
// 保证同一 key 的消息始终串行。
// === 方案 C:扩分区 + 扩实例(根本性扩容)===
// 12 分区不够,先把分区扩到 36,再把消费者实例扩到 36 个,
// 并行度直接翻 3 倍。
// $ kafka-topics.sh --alter --topic order --partitions 36
// ⚠ 分区只能加不能减;加分区会改变 key 的分区路由,
// 对"按 key 保证顺序"的场景要评估影响。
// === 方案 D:慢消息隔离,别让一条慢消息拖垮整批 ===
// 处理特别慢 / 反复失败的消息,转投到"重试 topic"或死信队列,
// 主流程不被它阻塞,保持高吞吐。
修复 5:rebalance 治理
# === rebalance:消费组成员变化时,重新分配分区的过程 ===
# rebalance 期间,整个消费组【暂停消费】,频繁 rebalance 严重拖慢消费。
# 常见诱因:消费者处理太慢,超时被判定"死了",踢出组触发 rebalance。
# === 关键参数 1:max.poll.interval.ms ===
# 两次 poll() 之间的最大间隔,默认 5 分钟。
# 如果一批消息处理时间超过它,Kafka 认为这个 consumer 卡死了,
# 把它踢出组 -> rebalance。
max.poll.interval.ms=600000 # 处理慢就调大,给足处理时间
# === 关键参数 2:max.poll.records ===
# 一次 poll 拉多少条。拉太多 + 处理慢 -> 容易超过上面的 interval。
max.poll.records=200 # 拉的量和处理速度要匹配
# === 关键参数 3:session.timeout.ms + heartbeat.interval.ms ===
# consumer 靠后台心跳告诉 Kafka "我还活着"。
session.timeout.ms=45000 # 多久没心跳判定掉线
heartbeat.interval.ms=15000 # 心跳间隔,一般为 session 的 1/3
# === 经验法则 ===
# 一次 poll 出来的消息,要能在 max.poll.interval.ms 内处理完。
# 估算:max.poll.records * 单条处理耗时 < max.poll.interval.ms
# 处理不完,要么调小 max.poll.records,要么调大 interval,要么提速处理。
# === 减少 rebalance 影响:用 CooperativeStickyAssignor ===
# 默认的 rebalance 是"全部分区收回再重分",期间全组停消费;
# 协作式粘性分配只挪动需要变动的分区,影响小很多
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
修复 6:消费监控告警
# 消息消费的核心是 lag,围绕它建监控
groups:
- name: kafka-consumer
rules:
# 1. consumer lag 过高(消费跟不上生产)
- alert: ConsumerLagHigh
expr: sum(kafka_consumergroup_lag{group="fulfill-group"}) > 100000
for: 5m
annotations:
summary: "消费组 lag > 10 万,消费能力不足,考虑扩容/批量"
# 2. lag 持续增长(积压在恶化,而非偶发抖动)
- alert: ConsumerLagGrowing
expr: deriv(sum(kafka_consumergroup_lag{group="fulfill-group"})[10m:]) > 0
for: 15m
annotations:
summary: "消费 lag 持续 15 分钟上涨,生产已稳定超过消费速度"
# 3. rebalance 频繁(消费组不稳定)
- alert: FrequentRebalance
expr: increase(kafka_consumer_rebalance_total[10m]) > 3
annotations:
summary: "10 分钟内 rebalance 超过 3 次,排查消费者处理超时"
# 4. 消费失败率(消息处理异常)
- alert: ConsumeErrorRate
expr: rate(consume_error_total[5m]) / rate(consume_total[5m]) > 0.01
for: 5m
annotations:
summary: "消息消费失败率 > 1%,排查下游依赖或脏消息"
优化效果
指标 治理前 治理后
=============================================================
消费模型 单线程单条 批量 + 线程池并发
单实例消费速度 约 5 条/秒 约 600 条/秒
分区数 / 消费实例 12 / 12 36 / 36
大促峰值 lag 300 万+ 峰值 < 2 万,分钟级消化
offset 提交 自动提交(5s) 处理完手动提交
重复消费 重复发货短信 幂等拦截,处理多次 = 一次
消息丢失风险 自动提交可能丢 acks=all + 手动提交,不丢
rebalance 偶发风暴 协作式分配,影响极小
消费可观测性 无 lag/rebalance/失败率监控
治理过程:
- 定位积压与重复消费根因:0.5 天
- 关自动提交 + 消费端幂等改造:2 天
- 批量消费 + 线程池并发改造:2 天
- 扩分区扩实例 + rebalance 参数调优:1.5 天
- 消费监控接入:1 天
避坑清单
- consumer lag = 最新 offset - 已消费 offset,持续增大说明消费跟不上生产
- 一个分区同时只能被组内一个 consumer 消费,消费实例数超过分区数纯属空转
- 扩消费者前先确认分区数够,分区不够加再多实例也不提速
- enable.auto.commit 自动提交与"消息真正处理完"不同步,会重复也会丢
- 关闭自动提交,严格"先处理完、后手动提交 offset",保证不漏处理
- Kafka 是"至少一次"语义,消费端幂等是重复消费的终极解法,必须做
- 幂等用唯一索引或 Redis SETNX,保证同一消息处理多次等于处理一次
- 提升吞吐:批量消费合并写库、单 consumer 内线程池并发、扩分区扩实例
- 并发消费会打破分区内顺序,需要保序就按 key 取模分发到固定线程
- 处理慢超过 max.poll.interval.ms 会被踢出组触发 rebalance,参数要和处理速度匹配
总结
这次 Kafka 消费链路的治理,让我对消息队列有了一个更清醒的认识:消息队列从来不是"把消息扔进去就万事大吉"的黑盒,它的生产端、Broker、消费端三段,每一段都有自己要操心的可靠性问题,而我们这次集中爆发的,是消费端最经典的一对矛盾——积压和重复。先说积压。大促那天 lag 从几百飙到三百万,本质原因朴素得近乎尴尬:我们的消费速度,根本就追不上生产速度。消费端是单线程、一条一条处理,每条要两百毫秒,一个实例满打满算每秒也就消费五条,十二个分区十二个实例,峰值也才六十多条每秒,而生产端大促时每秒涌进来三千多条——这是一道小学算术题,积压是必然的。想明白这一点,提速的方向也就清晰了:要么把一次拉到的一批消息合并起来批量处理,把五百次单条写库压缩成一次批量写库;要么在单个消费者内部用线程池并发地跑;要么干脆从根上扩容,把分区数和消费者实例数一起翻几倍——但这里有个一定要记住的前提,一个分区同一时刻只能被消费组里的一个消费者消费,所以消费者实例数超过分区数,多出来的实例就是纯粹空转,扩消费者之前一定要先看分区数够不够。再说重复。重复消费的根,在于 offset 的提交时机和消息"真正处理完"这两件事没有对齐。我们开着自动提交,消费者每隔五秒不管三七二十一就把 offset 往前推,于是处理到一半崩溃重启,这五秒内的消息就被重新消费一遍,用户就收到了第二条发货短信。这件事让我彻底接受了一个工程上的现实:想让 Kafka 做到"恰好一次"投递,代价高得不切实际,正确的姿势是坦然接受"至少一次"——也就是消息可能重复——然后把保证正确性的责任,从消息中间件转移到消费端自己身上,用幂等去消化掉重复。具体做法就是关掉自动提交、严格遵守"先把消息处理完、再手动提交 offset"的顺序,这保证了消息绝不会漏;同时给消费逻辑加上幂等,用数据库唯一索引或者 Redis 的 SETNX,确保同一条消息哪怕被投递十次,业务上也只生效一次。"不丢"靠手动提交的顺序来保证,"不重复造成影响"靠幂等来兜底,这两条加在一起,才构成一个真正可靠的消费链路。最后还有一个容易被忽略的隐患是 rebalance:消费者处理一批消息太慢,超过了 max.poll.interval.ms,Kafka 就以为它死了,把它踢出消费组,触发整个组的分区重新分配,而 rebalance 期间整个组是停止消费的——处理慢导致被踢、被踢导致 rebalance、rebalance 导致更慢,这是一个恶性循环。所以拉取参数 max.poll.records 和处理速度必须匹配,一次拉出来的消息量,要能在超时时间内从容处理完。这次治理之后,我们在消费链路上立了一条朴素的规矩:永远盯着 lag 这个指标,它是消费健康度最直接的体温计;一旦它持续上涨,就说明生产已经稳定地超过了消费,该扩容扩容、该提速提速,绝不能等它涨到几百万、等下游系统延迟了几个小时,才被动地去救火。
—— 别看了 · 2026