Kafka 5000w Lag 8 小时事故复盘:消费端优化全实录

Kafka 大流量回放事故复盘:3000w 消息 10min 涌入,Lag 5000w 不收敛延迟 8 小时。优化全实录:partition 12→64 + batchListener + saveBatch + 内部并发分组 + DLQ + RateLimiter + 全链路监控。Lag 从 5000w 在 40min 归零,QPS 10w→160w。

2024 年我们一个 Kafka 集群,某天凌晨被业务方推了一次全量回放,3000w 条消息在 10 分钟内涌入,消费者直接撑不住,Lag 涨到 5000w 不收敛,下游业务 8 小时延迟。复盘后做了:消费并发提升 + 批量消费 + 死信队列 + 限速消费 + 监控告警,Lag 从 5000w 在 40 分钟内归零。本文复盘 Kafka 消费端的完整优化方案。

事故时间线

02:00 业务方推全量数据(3000w 行)
02:05 Kafka producer 写入 30w/s,topic 堆积
02:10 消费者 Lag 涨到 1000w,继续涨
02:30 Lag 5000w,消费速度只有 10w/s
03:00 下游业务报警:数据延迟 1 小时
05:00 Lag 还有 3000w,延迟 3 小时
07:30 投资优化消费者,扩容
10:00 Lag 归零

事故总结:
- 消费者吞吐:10w/s(理论应 100w/s)
- 单分区单消费者:并发不够
- 反序列化 + DB 写慢:消费瓶颈
- 失败重试拖累整体

原架构问题

// 原消费者(Spring Kafka)
@KafkaListener(topics = "user-events", groupId = "g1")
public void consume(ConsumerRecord record) {
    UserEvent event = JSON.parseObject(record.value(), UserEvent.class);
    eventService.save(event);   // 单条入库
    // 一次一条,慢
}

// 配置
spring:
  kafka:
    consumer:
      max-poll-records: 500          # 一次拉 500 条
      fetch-max-bytes: 1048576       # 1MB
      auto-offset-reset: latest
      enable-auto-commit: false

// 问题:
// 1. 单 partition 单线程,topic 12 partition → 12 并发
// 2. 单条 save,DB 写入慢(500/s)
// 3. 没有批处理

优化 1:增加 partition + 并发

# 评估:消费瓶颈是 partition 数
# 原 12 partition,改 64 partition(必须 partition 数 >= 消费者数)

# 1. 创建新 topic(不能改原 topic 分区数,会导致顺序错乱)
kafka-topics.sh --create \
  --bootstrap-server kafka:9092 \
  --topic user-events-v2 \
  --partitions 64 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config retention.ms=259200000

# 2. 生产者双写迁移
kafka-mirror-maker.sh --consumer.config old.conf --producer.config new.conf \
  --whitelist 'user-events'

# 3. 消费者切换到新 topic
spring:
  kafka:
    listener:
      concurrency: 32         # 32 个消费线程
// Java 端 concurrency 配置
@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
        ConsumerFactory consumerFactory) {

        var factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(32);                    // 32 个线程
        factory.setBatchListener(true);                // 批量监听
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

        // 错误处理
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate),
            new FixedBackOff(1000L, 3)   // 重试 3 次,每次间隔 1s
        ));

        return factory;
    }
}

优化 2:批量消费 + 批量写

@KafkaListener(topics = "user-events-v2", groupId = "g1")
public void consumeBatch(
    List> records,
    Acknowledgment ack
) {
    if (records.isEmpty()) return;

    try {
        // 批量反序列化
        List events = records.stream()
            .map(r -> JSON.parseObject(r.value(), UserEvent.class))
            .collect(Collectors.toList());

        // 批量入库(MyBatis-Plus saveBatch)
        eventService.saveBatch(events, 500);

        ack.acknowledge();   // 手动提交
    } catch (Exception e) {
        log.error("batch consume failed", e);
        // 失败不提交,Kafka 会重试
    }
}

// MyBatis-Plus 批量插入
public boolean saveBatch(List list, int batchSize) {
    String sqlStatement = "insertBatch";
    return executeBatch(list, batchSize, (sqlSession, entity) -> {
        sqlSession.insert(sqlStatement, entity);
    });
}

// XML:

INSERT INTO user_events (user_id, event_type, ts, data) VALUES

    (#{e.userId}, #{e.eventType}, #{e.ts}, #{e.data})



// 效果:
// 单条 save:500/s
// 批量 saveBatch:50000/s
// 100x 提升

优化 3:并发消费内部

// 单个消费线程内,把批量再切给线程池并发处理
@Component
public class BatchEventConsumer {

    private final ExecutorService workerPool = Executors.newFixedThreadPool(8);
    private final EventService eventService;

    @KafkaListener(topics = "user-events-v2", groupId = "g1")
    public void consume(List> records, Acknowledgment ack) {
        // 按 key 分组,保证同 key 顺序
        Map> grouped = records.stream()
            .map(r -> JSON.parseObject(r.value(), UserEvent.class))
            .collect(Collectors.groupingBy(UserEvent::getUserId));

        // 并发处理不同分组
        List> futures = grouped.entrySet().stream()
            .map(entry -> CompletableFuture.runAsync(() ->
                eventService.saveBatch(entry.getValue(), 100),
                workerPool
            ))
            .collect(Collectors.toList());

        // 等所有完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        ack.acknowledge();
    }
}

优化 4:DLQ 死信队列

// 反复失败的消息扔死信队列,不阻塞主流程
@Configuration
public class DlqConfig {

    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate template) {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
            template,
            (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition())
        );

        DefaultErrorHandler handler = new DefaultErrorHandler(
            recoverer,
            new FixedBackOff(2000L, 3)   // 重试 3 次
        );

        // 部分异常不重试(直接进 DLQ)
        handler.addNotRetryableExceptions(
            IllegalArgumentException.class,
            DeserializationException.class
        );

        return handler;
    }
}

// 死信队列消费(单独服务监控/补偿)
@KafkaListener(topics = "user-events-v2.DLT", groupId = "dlq")
public void consumeDlq(ConsumerRecord record) {
    // 报警 + 持久化
    alertService.notify("DLQ message: " + record.value());
    dlqRepository.save(record);
}

优化 5:Consumer 限速

// 如果下游(DB / 第三方 API)有 QPS 上限,要限速
import com.google.common.util.concurrent.RateLimiter;

@Component
public class RateLimitedConsumer {

    private final RateLimiter limiter = RateLimiter.create(50000);   // 5w/s

    @KafkaListener(topics = "user-events-v2", groupId = "g1")
    public void consume(List> records, Acknowledgment ack) {
        // 限速
        limiter.acquire(records.size());

        // 处理...
        process(records);
        ack.acknowledge();
    }
}

// 或在 Kafka 端限速
spring:
  kafka:
    consumer:
      fetch-max-wait: 500ms        # 最多等 500ms
      max-poll-records: 500        # 单次最多 500 条
      max-poll-interval-ms: 300000 # 5 分钟处理完一批

优化 6:消费者监控

# Prometheus 监控 Kafka
# kafka_exporter
kafka_consumergroup_lag{group="g1",topic="user-events-v2"}

# 告警规则
- alert: KafkaConsumerLagHigh
  expr: sum(kafka_consumergroup_lag{group="g1"}) > 1000000
  for: 5m
  annotations:
    summary: "Consumer lag > 100w 持续 5min"

- alert: KafkaConsumerSlow
  expr: rate(kafka_consumergroup_current_offset[5m]) < 1000
  for: 10m
  annotations:
    summary: "消费速度 < 1000/s"

- alert: KafkaConsumerStuck
  expr: changes(kafka_consumergroup_current_offset[10m]) == 0
  for: 5m
  annotations:
    summary: "消费偏移 10min 不动,可能卡死"

优化效果

指标                    优化前          优化后
=========================================================
单消费者 QPS            500/s           50000/s
集群消费 QPS            10w/s           160w/s
Lag 恢复速度            8 小时          40 分钟
分区数                  12              64
消费者并发              12 线程         32 线程
DLQ 兜底                无              有
监控告警                少              全
P99 处理延迟            5min            500ms

业务影响:
- 大流量回放时不再阻塞下游
- 监控全打通,堆积秒级发现
- 死信队列保证不丢消息

避坑清单

  1. topic 分区数提前规划,后增不能减(顺序保证)
  2. partition 数 >= 消费者并发数,否则浪费消费者
  3. 必须 batchListener + saveBatch,单条入库就是慢
  4. 同 key 落到同 partition,保证顺序(关键!)
  5. 失败必须 DLQ,不能无限重试阻塞
  6. auto-commit 必须关,改 MANUAL_IMMEDIATE
  7. 下游有 QPS 限制时 RateLimiter 限速
  8. kafka-exporter + Grafana 监控 lag
  9. max.poll.interval.ms 大于实际处理时间,否则被踢出 group
  10. 消费者扩容前先扩 partition,partition 不够白扩

总结

Kafka 是消息系统标配,但消费端优化经常被忽略。生产者写得快(几十万 QPS),消费者跟不上(几千 QPS)是常见反模式。这次事故让我深刻理解:Kafka 消费侧优化的本质是"分区并发 + 批量 + 异步",三个手段叠加才能从 1w/s 提到 100w/s。最大的认知改变:不要盲目相信"Kafka 高吞吐",高吞吐是生产者端的事,消费者要自己优化。DLQ 是必备的兜底,没有 DLQ 的消费者迟早会被一条毒消息卡死整个集群。最后,partition 数是终身决策,设小了后面追加都救不回来,初设建议 2-3x 预期消费者数。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

PostgreSQL 2TB 慢查询治理:从 30s 到 800ms 全实录

2026-5-19 12:47:04

技术教程

Nginx 高并发调优实录:单机 10w→60w QPS 全过程

2026-5-19 12:51:08

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索