2024 年我们一个 Kafka 集群,某天凌晨被业务方推了一次全量回放,3000w 条消息在 10 分钟内涌入,消费者直接撑不住,Lag 涨到 5000w 不收敛,下游业务 8 小时延迟。复盘后做了:消费并发提升 + 批量消费 + 死信队列 + 限速消费 + 监控告警,Lag 从 5000w 在 40 分钟内归零。本文复盘 Kafka 消费端的完整优化方案。
事故时间线
02:00 业务方推全量数据(3000w 行)
02:05 Kafka producer 写入 30w/s,topic 堆积
02:10 消费者 Lag 涨到 1000w,继续涨
02:30 Lag 5000w,消费速度只有 10w/s
03:00 下游业务报警:数据延迟 1 小时
05:00 Lag 还有 3000w,延迟 3 小时
07:30 投资优化消费者,扩容
10:00 Lag 归零
事故总结:
- 消费者吞吐:10w/s(理论应 100w/s)
- 单分区单消费者:并发不够
- 反序列化 + DB 写慢:消费瓶颈
- 失败重试拖累整体
原架构问题
// 原消费者(Spring Kafka)
@KafkaListener(topics = "user-events", groupId = "g1")
public void consume(ConsumerRecord record) {
UserEvent event = JSON.parseObject(record.value(), UserEvent.class);
eventService.save(event); // 单条入库
// 一次一条,慢
}
// 配置
spring:
kafka:
consumer:
max-poll-records: 500 # 一次拉 500 条
fetch-max-bytes: 1048576 # 1MB
auto-offset-reset: latest
enable-auto-commit: false
// 问题:
// 1. 单 partition 单线程,topic 12 partition → 12 并发
// 2. 单条 save,DB 写入慢(500/s)
// 3. 没有批处理
优化 1:增加 partition + 并发
# 评估:消费瓶颈是 partition 数
# 原 12 partition,改 64 partition(必须 partition 数 >= 消费者数)
# 1. 创建新 topic(不能改原 topic 分区数,会导致顺序错乱)
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic user-events-v2 \
--partitions 64 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=259200000
# 2. 生产者双写迁移
kafka-mirror-maker.sh --consumer.config old.conf --producer.config new.conf \
--whitelist 'user-events'
# 3. 消费者切换到新 topic
spring:
kafka:
listener:
concurrency: 32 # 32 个消费线程
// Java 端 concurrency 配置
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(32); // 32 个线程
factory.setBatchListener(true); // 批量监听
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
// 错误处理
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),
new FixedBackOff(1000L, 3) // 重试 3 次,每次间隔 1s
));
return factory;
}
}
优化 2:批量消费 + 批量写
@KafkaListener(topics = "user-events-v2", groupId = "g1")
public void consumeBatch(
List> records,
Acknowledgment ack
) {
if (records.isEmpty()) return;
try {
// 批量反序列化
List events = records.stream()
.map(r -> JSON.parseObject(r.value(), UserEvent.class))
.collect(Collectors.toList());
// 批量入库(MyBatis-Plus saveBatch)
eventService.saveBatch(events, 500);
ack.acknowledge(); // 手动提交
} catch (Exception e) {
log.error("batch consume failed", e);
// 失败不提交,Kafka 会重试
}
}
// MyBatis-Plus 批量插入
public boolean saveBatch(List list, int batchSize) {
String sqlStatement = "insertBatch";
return executeBatch(list, batchSize, (sqlSession, entity) -> {
sqlSession.insert(sqlStatement, entity);
});
}
// XML:
INSERT INTO user_events (user_id, event_type, ts, data) VALUES
(#{e.userId}, #{e.eventType}, #{e.ts}, #{e.data})
// 效果:
// 单条 save:500/s
// 批量 saveBatch:50000/s
// 100x 提升
优化 3:并发消费内部
// 单个消费线程内,把批量再切给线程池并发处理
@Component
public class BatchEventConsumer {
private final ExecutorService workerPool = Executors.newFixedThreadPool(8);
private final EventService eventService;
@KafkaListener(topics = "user-events-v2", groupId = "g1")
public void consume(List> records, Acknowledgment ack) {
// 按 key 分组,保证同 key 顺序
Map> grouped = records.stream()
.map(r -> JSON.parseObject(r.value(), UserEvent.class))
.collect(Collectors.groupingBy(UserEvent::getUserId));
// 并发处理不同分组
List> futures = grouped.entrySet().stream()
.map(entry -> CompletableFuture.runAsync(() ->
eventService.saveBatch(entry.getValue(), 100),
workerPool
))
.collect(Collectors.toList());
// 等所有完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
ack.acknowledge();
}
}
优化 4:DLQ 死信队列
// 反复失败的消息扔死信队列,不阻塞主流程
@Configuration
public class DlqConfig {
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate template) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
template,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition())
);
DefaultErrorHandler handler = new DefaultErrorHandler(
recoverer,
new FixedBackOff(2000L, 3) // 重试 3 次
);
// 部分异常不重试(直接进 DLQ)
handler.addNotRetryableExceptions(
IllegalArgumentException.class,
DeserializationException.class
);
return handler;
}
}
// 死信队列消费(单独服务监控/补偿)
@KafkaListener(topics = "user-events-v2.DLT", groupId = "dlq")
public void consumeDlq(ConsumerRecord record) {
// 报警 + 持久化
alertService.notify("DLQ message: " + record.value());
dlqRepository.save(record);
}
优化 5:Consumer 限速
// 如果下游(DB / 第三方 API)有 QPS 上限,要限速
import com.google.common.util.concurrent.RateLimiter;
@Component
public class RateLimitedConsumer {
private final RateLimiter limiter = RateLimiter.create(50000); // 5w/s
@KafkaListener(topics = "user-events-v2", groupId = "g1")
public void consume(List> records, Acknowledgment ack) {
// 限速
limiter.acquire(records.size());
// 处理...
process(records);
ack.acknowledge();
}
}
// 或在 Kafka 端限速
spring:
kafka:
consumer:
fetch-max-wait: 500ms # 最多等 500ms
max-poll-records: 500 # 单次最多 500 条
max-poll-interval-ms: 300000 # 5 分钟处理完一批
优化 6:消费者监控
# Prometheus 监控 Kafka
# kafka_exporter
kafka_consumergroup_lag{group="g1",topic="user-events-v2"}
# 告警规则
- alert: KafkaConsumerLagHigh
expr: sum(kafka_consumergroup_lag{group="g1"}) > 1000000
for: 5m
annotations:
summary: "Consumer lag > 100w 持续 5min"
- alert: KafkaConsumerSlow
expr: rate(kafka_consumergroup_current_offset[5m]) < 1000
for: 10m
annotations:
summary: "消费速度 < 1000/s"
- alert: KafkaConsumerStuck
expr: changes(kafka_consumergroup_current_offset[10m]) == 0
for: 5m
annotations:
summary: "消费偏移 10min 不动,可能卡死"
优化效果
指标 优化前 优化后
=========================================================
单消费者 QPS 500/s 50000/s
集群消费 QPS 10w/s 160w/s
Lag 恢复速度 8 小时 40 分钟
分区数 12 64
消费者并发 12 线程 32 线程
DLQ 兜底 无 有
监控告警 少 全
P99 处理延迟 5min 500ms
业务影响:
- 大流量回放时不再阻塞下游
- 监控全打通,堆积秒级发现
- 死信队列保证不丢消息
避坑清单
- topic 分区数提前规划,后增不能减(顺序保证)
- partition 数 >= 消费者并发数,否则浪费消费者
- 必须 batchListener + saveBatch,单条入库就是慢
- 同 key 落到同 partition,保证顺序(关键!)
- 失败必须 DLQ,不能无限重试阻塞
- auto-commit 必须关,改 MANUAL_IMMEDIATE
- 下游有 QPS 限制时 RateLimiter 限速
- kafka-exporter + Grafana 监控 lag
- max.poll.interval.ms 大于实际处理时间,否则被踢出 group
- 消费者扩容前先扩 partition,partition 不够白扩
总结
Kafka 是消息系统标配,但消费端优化经常被忽略。生产者写得快(几十万 QPS),消费者跟不上(几千 QPS)是常见反模式。这次事故让我深刻理解:Kafka 消费侧优化的本质是"分区并发 + 批量 + 异步",三个手段叠加才能从 1w/s 提到 100w/s。最大的认知改变:不要盲目相信"Kafka 高吞吐",高吞吐是生产者端的事,消费者要自己优化。DLQ 是必备的兜底,没有 DLQ 的消费者迟早会被一条毒消息卡死整个集群。最后,partition 数是终身决策,设小了后面追加都救不回来,初设建议 2-3x 预期消费者数。
—— 别看了 · 2026