2024 年我们的用户行为日志走 Kafka 异步处理,某天监控告警:消费组 Lag(积压)从平时的几百条暴涨到 1800 万条,而且还在持续涨。登上去看消费者日志,满屏都是 "Attempt to heartbeat failed" 和 rebalance 记录 — 消费组陷入了"重平衡风暴",消费者反复被踢出、重新加入,几乎没在真正消费消息。投了一周做 Kafka 消费端专项治理,之后 Lag 稳定在千级,再没出现 rebalance 风暴。本文复盘 Kafka 消费端的完整实战。
问题背景
业务:用户行为日志,Kafka 2.8,topic 24 分区,8 个消费者实例
日均消息:6 亿条
事故现象:
- 消费组 Lag 从几百暴涨到 1800 万,持续增长
- 消费者日志刷屏 rebalance,消费几乎停滞
- 部分消息被重复处理(下游统计数据偏高)
# 看消费组状态
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group behavior_log_group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
behavior_log 0 120033421 120921550 888129
behavior_log 1 119820114 120700233 880119
... (24 个分区都在堆)
# 消费者日志
[Consumer] Attempt to heartbeat failed since group is rebalancing
[Consumer] Revoke previously assigned partitions
[Consumer] (Re-)joining group
[Consumer] Member ... sending LeaveGroup request
根因:
1. 单次 poll 拉 500 条,但每条处理要写 ES + 调下游,处理一批要 6 分钟
2. max.poll.interval.ms 默认 5 分钟,处理超时 → 消费者被判定"死亡"
3. 被踢出 → 触发 rebalance → 分区重新分配 → 又超时 → 再 rebalance
4. rebalance 用默认的 eager 策略,每次全部分区暂停,雪上加霜
5. 自动提交 offset,rebalance 时已处理的消息 offset 没提交 → 重复消费
修复 1:理解 rebalance 触发机制
# === Kafka 消费者有两个独立的"存活判定" ===
# 判定 1:心跳线程(后台线程,独立于消费线程)
# heartbeat.interval.ms —— 多久发一次心跳(默认 3s)
# session.timeout.ms —— 多久没收到心跳判定消费者死亡(默认 45s)
# → 这个判定的是"消费者进程/网络是否还活着"
# 判定 2:poll 间隔(消费线程)
# max.poll.interval.ms —— 两次 poll() 之间最大间隔(默认 5min)
# → 这个判定的是"消费者是否还在正常消费,有没有卡在处理上"
# === 我们踩的坑:判定 2 ===
# poll 拉回 500 条 → 处理这 500 条要 6 分钟 → 超过 5min 没调下一次 poll
# → 协调者认为这个消费者"卡死了" → 踢出 → rebalance
# === rebalance 的连锁反应 ===
# 消费者A被踢 → 它的分区要分给别人 → 全组 rebalance
# rebalance 期间所有消费者暂停消费(eager 模式)
# 重新分配后,接手分区的消费者又因为同样原因超时 → 又 rebalance
# → 风暴形成,消费组陷入"一直在 rebalance,几乎不消费"
# 触发 rebalance 的三类事件:
# 1. 消费者加入/离开(扩缩容、重启、被踢)
# 2. 消费者订阅的 topic 分区数变化
# 3. 消费者处理超时被判定死亡(我们的情况)
修复 2:消费参数调优
// === 核心:让"一次 poll 的处理时间" < max.poll.interval.ms ===
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "behavior_log_group");
// === 方案 A:减小单次拉取量,让一批处理更快 ===
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
// 500 → 50,一批处理从 6min 降到 ~40s,远小于 poll 间隔
// === 方案 B:同时把 poll 间隔上限调大,留足余量 ===
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5min
// 处理慢的业务可适当调大,但治本还是要让处理变快
// === 心跳相关:三者要协调 ===
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000); // 会话超时 45s
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 心跳 3s
// 经验:heartbeat.interval.ms 设为 session.timeout.ms 的 1/3
// === 方案 C:换 Cooperative rebalance,避免"全部暂停" ===
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// eager 策略:rebalance 时所有消费者放弃所有分区,全组停顿
// cooperative 策略:只重新分配"需要变动的分区",其余分区继续消费
// → rebalance 影响面大幅缩小,这是 Kafka 2.4+ 的推荐做法
// === 关闭自动提交(下面修复 4 详述)===
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
修复 3:提升消费能力
// 调参只是"别超时",真正消化积压要靠提升吞吐
// === 1. 消费并行度上限 = 分区数 ===
// 一个分区同一时刻只能被组内一个消费者消费
// 24 分区 → 最多 24 个消费者并行,加到第 25 个只会空转
// 积压严重且分区不够 → 先扩分区(注意:分区只能增不能减)
// $ kafka-topics.sh --alter --topic behavior_log --partitions 48
// === 2. 消费线程与处理线程分离 ===
// 让 poll 线程只管拉取(快),处理丢给业务线程池(慢的部分并行)
public class ParallelConsumer {
private final KafkaConsumer<String, String> consumer;
private final ThreadPoolExecutor workerPool = new ThreadPoolExecutor(
16, 16, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(200), // 有界队列,防 OOM
new ThreadPoolExecutor.CallerRunsPolicy()); // 满了让 poll 线程自己处理 → 自然反压
public void run() {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
List<Future<?>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> r : records) {
futures.add(workerPool.submit(() -> process(r)));
}
// 关键:等这批全部处理完,再 poll 下一批 + 提交 offset
// 否则 poll 线程跑太快、offset 提交了但消息还没处理完
for (Future<?> f : futures) {
f.get();
}
consumer.commitSync(); // 整批处理完才提交
}
}
}
// 注意:多线程处理打破了"分区内顺序",若业务依赖顺序,
// 要按 key 路由到固定 worker,保证同 key 串行
// === 3. 批量写下游,减少 IO 次数 ===
// 50 条消息攒成一个 ES bulk 请求 / 一条批量 INSERT
// 单条写 → 批量写,下游 IO 从 50 次降到 1 次,吞吐数量级提升
修复 4:offset 提交与重复消费
// === 自动提交的问题 ===
// enable.auto.commit=true 时,Kafka 每 5s 自动提交一次 offset
// 风险:提交的是"已 poll 的 offset",不是"已处理完的"
// poll 了 500 条,处理到第 100 条时 rebalance → offset 可能已提交到 500
// → 后 400 条丢失(没处理却被标记已消费)
// 或反过来:处理完没来得及提交 → 重复消费
// === 正确:关自动提交,处理完再手动提交 ===
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
process(record); // 先处理
}
consumer.commitSync(); // 整批处理完,同步提交
}
// === 更精细:按分区提交,rebalance 时更可控 ===
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
process(record);
}
// 提交"下一条要消费的 offset" = 当前最后一条 + 1
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(
partition, new OffsetAndMetadata(lastOffset + 1)));
}
// === 但 Kafka 仍只保证"至少一次",重复消费必然存在,消费端必须幂等 ===
// rebalance 瞬间:A 处理完没提交就被踢 → B 接手会重新处理
public void process(ConsumerRecord<String, String> record) {
String msgId = extractMsgId(record);
// Redis SETNX 去重,3 天窗口
Boolean first = redis.opsForValue()
.setIfAbsent("kafka:dedup:" + msgId, "1", Duration.ofDays(3));
if (Boolean.FALSE.equals(first)) {
return; // 重复,跳过
}
doBusiness(record);
}
// === rebalance 监听器:rebalance 前提交当前进度,减少重复 ===
consumer.subscribe(List.of("behavior_log"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(); // 失去分区前,把进度提交掉
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { }
});
修复 5:积压应急处理
# 1800 万积压不会自己消失,要主动消化。几种应急手段:
# === 手段 1:临时扩容消费者(分区够的前提下)===
# 分区 48 个,平时 8 个消费者 → 临时拉到 48 个,并行度拉满
# K8s: kubectl scale deployment log-consumer --replicas=48
# 注意:扩容本身会触发 rebalance,要用 cooperative 策略减少抖动
# === 手段 2:看积压是"持续产生"还是"一次性堆积" ===
$ kafka-consumer-groups.sh --describe --group behavior_log_group \
| awk '{sum+=$5} END {print "total lag:", sum}'
# 持续涨 → 消费能力 < 生产能力,必须提升消费能力或限流生产端
# 不涨了 → 一次性积压,扩容消化完即可
# === 手段 3:非核心消息可"跳过积压",直接消费最新 ===
# 行为日志这种,丢一段历史数据可接受 → 把 offset 重置到最新
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group behavior_log_group --topic behavior_log \
--reset-offsets --to-latest --execute
# 危险操作:会跳过未消费的消息,只对"可丢"的 topic 用
# 订单等关键消息绝不能这么干
# === 手段 4:积压消息转储,慢慢消化 ===
# 起一个临时消费组,把积压消息原样搬到另一个 topic
# 主链路消费最新消息保证实时,积压的在旁路慢慢处理
// === 治本:生产端削峰,别让生产长期快于消费 ===
// 生产端按 key 分区,避免数据倾斜(某几个分区特别热,积压全在它们身上)
ProducerRecord<String, String> record = new ProducerRecord<>(
"behavior_log",
userId, // 用 userId 做 key,均匀散到各分区
logJson);
producer.send(record);
// 检查分区是否倾斜:各分区 LOG-END-OFFSET 差距大 = 倾斜
// 倾斜会导致"个别消费者忙死,其余闲死",整体并行度形同虚设
修复 6:监控告警
# kafka_exporter + Prometheus
groups:
- name: kafka-consumer
rules:
# 1. 消费积压(最核心指标)
- alert: KafkaConsumerLagHigh
expr: sum(kafka_consumergroup_lag{group="behavior_log_group"}) > 100000
for: 5m
annotations:
summary: "{{ $labels.group }} 积压 > 10w,消费能力不足"
# 2. 积压持续增长(比绝对值更重要 —— 趋势)
- alert: KafkaLagGrowing
expr: deriv(sum(kafka_consumergroup_lag{group="behavior_log_group"})[15m:]) > 0
for: 15m
annotations:
summary: "{{ $labels.group }} 积压持续 15min 增长,生产 > 消费"
# 3. 消费速率掉零(消费者卡死/全在 rebalance)
- alert: KafkaConsumeRateZero
expr: rate(kafka_consumergroup_current_offset{group="behavior_log_group"}[5m]) == 0
for: 3m
annotations:
summary: "{{ $labels.group }} 消费速率为 0,排查 rebalance 风暴"
# 4. 分区数据倾斜
- alert: KafkaPartitionSkew
expr: |
max(kafka_consumergroup_lag{group="behavior_log_group"}) by (group)
/ avg(kafka_consumergroup_lag{group="behavior_log_group"}) by (group) > 5
for: 10m
annotations:
summary: "{{ $labels.group }} 分区积压倾斜 > 5 倍,检查 key 分布"
# 5. broker 离线
- alert: KafkaBrokerDown
expr: kafka_brokers < 3
annotations:
summary: "Kafka 存活 broker < 3,集群副本风险"
优化效果
指标 治理前 治理后
=============================================================
消费组 Lag 1800 万 < 2000(稳定)
rebalance 频率 每分钟数次 基本为 0
单批处理耗时 6 分钟 38 秒
消费吞吐 ~3000 条/s 8.5 万条/s
重复消费 大量 0(幂等去重)
rebalance 影响面 全组暂停 仅变动分区(cooperative)
分区数 24 48
offset 提交 自动(会丢/重) 手动,处理完才提交
压测(生产 8 万条/s 持续):
- 治理前:消费跟不上,Lag 无限增长
- 治理后:消费 8.5 万条/s,Lag 稳定低位,无 rebalance 风暴
排查与改造:
- 定位 rebalance 风暴根因:0.5 天
- 消费参数调优 + cooperative 策略:1 天
- 消费/处理线程分离 + 批量写:2 天
- 手动提交 + 幂等改造:1.5 天
- 扩分区 + 积压消化 + 压测:2 天
避坑清单
- rebalance 有两套存活判定:心跳判进程死活,poll 间隔判处理是否卡住
- 一批消息的处理时间必须远小于 max.poll.interval.ms,否则被踢引发风暴
- 处理慢就减小 max.poll.records,而不是无脑调大 poll 间隔
- 用 CooperativeStickyAssignor,rebalance 只动变动分区,不全组暂停
- 消费并行度上限 = 分区数,加消费者超过分区数纯属空转
- poll 线程与处理线程分离,但要等一批处理完再提交 offset
- 关自动提交,处理完手动 commitSync,避免丢消息或扩大重复
- Kafka 只保证至少一次,消费端必须幂等,rebalance 必然带来重复
- 积压可丢的非核心消息能 reset offset 到最新,关键消息绝不能
- 监控 Lag 要看趋势不只看绝对值,还要监控分区倾斜与消费速率
总结
这次 Kafka 的 rebalance 风暴事故,让我对"消费者组"这个机制有了远比之前深刻的理解。最大的认知误区是把 rebalance 当成一个偶发的、无害的小事,实际上它是一个会自我强化的恶性循环:一个消费者因为处理太慢被判定死亡,触发 rebalance,它的分区被分给别的消费者,而别的消费者因为同样的原因也处理不过来、也超时被踢,于是 rebalance 套娃,整个消费组陷入"一直在重新分配、几乎不消费"的瘫痪状态。打破这个循环的关键,是理解 Kafka 有两套独立的存活判定——心跳线程判断的是消费者进程和网络还活不活着,而 poll 间隔判断的是消费者有没有卡在消息处理上,我们栽的就是后者:一次拉 500 条、处理要 6 分钟,远超 5 分钟的 poll 间隔上限。解药不是把超时阈值无脑调大,而是让"一批消息的处理时间"真正变短——减小单次拉取量、把拉取线程和处理线程分离让慢的部分并行、批量写下游减少 IO。第二个深刻的体会是 offset 提交的语义:自动提交提交的是"已拉取"的位置而非"已处理完"的位置,这个错位在 rebalance 瞬间会直接导致消息丢失或重复,必须关掉自动提交、处理完再手动提交;但即便如此,Kafka 也只能保证至少一次投递,重复消费在 rebalance 时不可避免,所以消费端的幂等不是可选项而是必选项。最后,面对已经堆积的千万级消息,要冷静判断它是一次性堆积还是持续产生——持续产生说明消费能力根本追不上生产,扩容只是拖时间,必须从根上提升吞吐或给生产端限流;而对于行为日志这类可以接受丢失一段历史的非核心数据,把 offset 直接重置到最新、保住实时性,反而是更务实的选择。Kafka 用起来容易,但要让消费组在高吞吐下长期稳定,得真正理解它每一个参数背后的判定逻辑。
—— 别看了 · 2026