Kafka 集群春节大促雪崩复盘:partition 均衡 + 生产消费 + 监控告警实录

Kafka 3.4 KRaft 集群 9 broker 1.5w partition,春节大促 NotLeaderForPartition 暴增,lag 2000w,Controller 切换频繁。两周治理:partition 重均衡 + idempotence + acks=all + max.poll 500 + ISR 限速 + 全链路监控。P99 50ms,lag < 1000,稳过二轮峰值。

2024 年我们的 Kafka 集群:9 个 broker,500 个 topic,15000 partition,日吞吐 80TB,某个春节大促开始集群异常 — 生产者大量 NotLeaderForPartitionException,消费者 lag 飙升到 2000w 条,部分 broker CPU 100%,Controller 切换频繁,客户端连接报错。投了两周做集群治理,生产端延迟 P99 从 5s 降到 50ms,消费 lag 稳定在 1000 条内,broker 平均 CPU 40%,扛过了第二轮峰值。本文复盘 Kafka 集群高可用、高吞吐治理的完整实战,覆盖 broker 调优、partition 规划、生产消费、监控告警。

事故现场

集群:Kafka 3.4 (KRaft 模式)
节点:9 broker,每个 32 核 / 128GB / 8TB NVMe × 4
Topic:500 个,partition 15000
副本:replication.factor=3,min.insync.replicas=2
日吞吐:写入 80TB / 读取 200TB

故障现象:
- 生产者:NotLeaderForPartitionException 大量
- 生产者:RecordTooLargeException(单条 > 1MB)
- 生产者 P99 延迟:50ms → 5s
- 消费者 lag:正常 < 1000 → 暴涨到 2000w
- Broker:个别节点 CPU 100%,磁盘 IO util 99%
- Controller:每 5min 切换一次
- ZooKeeper(老集群):session expire

排查:
1. _cat /broker-info: 个别 broker partition 数 3000+(其他 1500)
2. JMX kafka.server:type=ReplicaManager: 副本同步 ISR 抖动
3. log4j: ERROR Failed to read 'shrink isr' command from controller
4. kafka-topics --describe: 某些 topic partition 严重不均
5. kafka-log-dirs: 单 broker 磁盘占用 95%+

根因:
- partition 分配不均(手工建 topic 没指定 broker 列表)
- 大消息没分片(单条 5MB 撑爆 broker buffer)
- 消费者 max.poll.records 太大(单次拉 10000 条,处理超时)
- replication 网络带宽打满

修复 1:Broker 核心配置

# server.properties

# === 节点标识 ===
broker.id=1
node.id=1
process.roles=broker,controller       # KRaft 模式
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093

# === 网络 ===
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
advertised.listeners=PLAINTEXT://host1.internal:9092

num.network.threads=8                  # = CPU 核心数
num.io.threads=16                      # 磁盘 IO 线程
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600     # 100MB(支持大消息)

# === 日志(数据)存储 ===
log.dirs=/data1/kafka,/data2/kafka,/data3/kafka,/data4/kafka
# 多目录 = 多磁盘并发,提升吞吐
num.partitions=12                      # 新 topic 默认 partition 数
num.recovery.threads.per.data.dir=4

# === 副本 ===
default.replication.factor=3
min.insync.replicas=2                  # 至少 2 副本 ack
unclean.leader.election.enable=false   # 不允许非 ISR 当 leader(避免丢数据)
replica.lag.time.max.ms=30000          # 副本落后超时
replica.socket.receive.buffer.bytes=65536
replica.fetch.max.bytes=10485760

# === 副本同步带宽限制(关键)===
# 避免 follower 同步打满网络
follower.replication.throttled.rate=100000000   # 100MB/s
leader.replication.throttled.rate=100000000

# === 日志保留 ===
log.retention.hours=72                 # 3 天
log.retention.bytes=-1
log.segment.bytes=1073741824           # 1GB/segment
log.retention.check.interval.ms=300000
log.cleaner.enable=true

# === 性能 ===
message.max.bytes=10485760             # 单消息上限 10MB
compression.type=lz4                    # broker 不重压缩
log.flush.interval.messages=10000      # 异步 flush
log.flush.interval.ms=1000

# === JVM(kafka-server-start.sh) ===
# KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"  # 6-8GB,不要更大(剩余给 page cache)
# KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20"
# 系统 page cache 才是 Kafka 性能关键(顺序读写)

修复 2:Topic / Partition 设计

# 1. partition 数计算
# 公式:max(目标吞吐/单 partition 吞吐, 消费者数)
# 单 partition 吞吐:写 10MB/s,读 30MB/s
# 100MB/s 写需要 10 partition
# 50 个消费者需要 50 partition
# 取 max,且为消费者整倍数

# 2. 创建 topic(均衡分布)
$ kafka-topics --bootstrap-server kafka:9092 \
    --create --topic orders \
    --partitions 36 \
    --replication-factor 3 \
    --config retention.ms=259200000 \
    --config segment.bytes=1073741824 \
    --config compression.type=lz4 \
    --config min.insync.replicas=2

# 3. 重平衡 partition(unbalanced 修复)
$ kafka-reassign-partitions --bootstrap-server kafka:9092 \
    --topics-to-move-json-file topics.json \
    --broker-list "1,2,3,4,5,6,7,8,9" \
    --generate > reassign.json

# 编辑 reassign.json,确认 partition 分配
$ kafka-reassign-partitions --bootstrap-server kafka:9092 \
    --reassignment-json-file reassign.json \
    --execute \
    --throttle 100000000          # 限速 100MB/s

# 验证
$ kafka-reassign-partitions --bootstrap-server kafka:9092 \
    --reassignment-json-file reassign.json --verify

# 4. 监控 partition 分布
$ kafka-topics --bootstrap-server kafka:9092 --describe | \
    awk '/Leader/ {print $4}' | sort | uniq -c | sort -rn
# 查看每个 broker 当 leader 的 partition 数,应该均衡

# 5. preferred leader election(每周跑一次)
$ kafka-leader-election --bootstrap-server kafka:9092 \
    --election-type preferred --all-topic-partitions

修复 3:生产者优化

// Producer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");

// === 可靠性 ===
props.put("acks", "all");                  // 所有 ISR ack
props.put("retries", 3);
props.put("max.in.flight.requests.per.connection", 5);
props.put("enable.idempotence", true);     // 幂等生产(避免重复)

// === 性能 ===
props.put("batch.size", 65536);            // 64KB 批
props.put("linger.ms", 20);                // 等 20ms 凑批
props.put("compression.type", "lz4");      // 压缩
props.put("buffer.memory", 67108864);      // 64MB 缓冲

// === 大消息 ===
props.put("max.request.size", 5242880);    // 5MB(< broker message.max.bytes)

// === 超时 ===
props.put("request.timeout.ms", 30000);
props.put("delivery.timeout.ms", 120000);

KafkaProducer producer = new KafkaProducer<>(props);

// 异步发送 + 回调(高性能)
ProducerRecord record = new ProducerRecord<>(
    "orders",
    orderId,                               // key,保证同 key 进同 partition
    JSON.toJSONString(order)
);

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        log.error("send failed: {}", record, exception);
        // 兜底:落库后台重发
        failedQueue.offer(record);
    } else {
        log.debug("sent to {}-{}@{}", metadata.topic(), metadata.partition(), metadata.offset());
    }
});

// 关键设计:
// 1. acks=all + idempotence + replication=3: 0 数据丢失
// 2. 同 orderId 进同 partition,保证局部有序
// 3. batch + linger + compression 提升 5-10x 吞吐
// 4. callback 异步处理失败,不阻塞

修复 4:消费者优化

// Consumer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "order-processor");

// === 拉取 ===
props.put("max.poll.records", 500);            // 单次最多 500 条(原 10000)
props.put("max.poll.interval.ms", 300000);     // 5min 处理超时
props.put("fetch.min.bytes", 1048576);         // 1MB 最小拉取
props.put("fetch.max.wait.ms", 500);
props.put("max.partition.fetch.bytes", 5242880); // 5MB/partition

// === offset 提交 ===
props.put("enable.auto.commit", false);        // 手动提交
props.put("auto.offset.reset", "latest");      // 新 group 从最新开始

// === 心跳 ===
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));

while (running) {
    ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));

    // 批量处理(高吞吐)
    List batch = new ArrayList<>();
    for (ConsumerRecord r : records) {
        batch.add(JSON.parseObject(r.value(), Order.class));
    }

    try {
        orderService.batchProcess(batch);
        consumer.commitSync();                  // 处理完才 commit
    } catch (Exception e) {
        log.error("batch process failed", e);
        // 不 commit,下次重新拉
    }
}

// 多线程消费(单 consumer 慢的情况)
// 方案 1:多消费者实例(推荐,Kafka 原生)
// 方案 2:单 consumer 拉,提交线程池处理(注意 commit 时机)

// 关键陷阱:
// 1. max.poll.records 太大 → 处理超时 → consumer 被踢出
// 2. 处理过程中 rebalance → 没 commit 的数据重复消费(需幂等)
// 3. CommitFailedException 必须捕获(已 rebalance)

修复 5:Consumer Group 监控 + lag 告警

# 1. 查看消费组状态
$ kafka-consumer-groups --bootstrap-server kafka:9092 \
    --describe --group order-processor

GROUP          TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG    HOST
order-proc     orders   0          1234567         1234890         323    consumer-1
order-proc     orders   1          1234500         1234890         390    consumer-2
...

# 2. 找 lag 最大的 partition
$ kafka-consumer-groups --bootstrap-server kafka:9092 --describe --group order-processor | \
    awk 'NR>1 {print $5, $1, $2, $3}' | sort -rn | head -10

# 3. lag 告警(Prometheus + kafka-exporter)
- alert: KafkaConsumerLagHigh
  expr: sum by(consumergroup, topic) (kafka_consumergroup_lag) > 100000
  for: 5m
  annotations:
    summary: "{{ $labels.consumergroup }} lag {{ $value }} > 10w"

- alert: KafkaConsumerLagGrowing
  expr: deriv(kafka_consumergroup_lag[10m]) > 100
  for: 10m
  annotations:
    summary: "{{ $labels.consumergroup }} lag 持续增长"

# 4. 紧急处理 lag 飙升
# 方案 1:增加消费者(< partition 数)
$ kubectl scale deployment order-processor --replicas=36

# 方案 2:重置 offset 到最新(放弃旧数据)
$ kafka-consumer-groups --bootstrap-server kafka:9092 \
    --group order-processor --reset-offsets --to-latest \
    --topic orders --execute

# 方案 3:dump 到独立 topic 慢慢处理
# kafka-mirror-maker 或自己写小工具

修复 6:Broker 监控告警

# Prometheus + kafka-exporter + JMX exporter

# Broker 健康
- alert: KafkaBrokerDown
  expr: up{job="kafka"} == 0
  for: 1m
  labels: { severity: critical }
  annotations:
    summary: "Broker {{ $labels.instance }} down"

# Controller 切换(异常信号)
- alert: KafkaControllerFlap
  expr: changes(kafka_controller_active[10m]) > 2
  annotations:
    summary: "Controller 切换 > 2 次/10min"

# 副本未同步
- alert: KafkaUnderReplicated
  expr: kafka_server_replicamanager_underreplicatedpartitions > 0
  for: 5m
  annotations:
    summary: "{{ $value }} 个 partition 副本不足"

# ISR 抖动
- alert: KafkaISRShrink
  expr: rate(kafka_server_replicamanager_isrshrinkspersec[5m]) > 0.1
  for: 5m
  annotations:
    summary: "ISR 频繁收缩"

# 磁盘
- alert: KafkaDiskHigh
  expr: 1 - node_filesystem_avail_bytes{mountpoint=~"/data.*"} / node_filesystem_size_bytes > 0.85
  annotations:
    summary: "{{ $labels.instance }} 磁盘 > 85%"

# 网络
- alert: KafkaNetworkIdleHigh
  expr: avg(kafka_network_socketserver_networkprocessoravgidlepercent) < 0.3
  for: 5m
  annotations:
    summary: "网络线程繁忙,idle < 30%"

# Request 队列
- alert: KafkaRequestQueueHigh
  expr: kafka_network_requestchannel_requestqueuesize > 100
  annotations:
    summary: "请求队列 > 100,broker 压力大"

# 生产消费速率
- alert: KafkaProduceRateAnomaly
  expr: abs(rate(kafka_server_brokertopicmetrics_messagesinpersec[5m]) -
            avg_over_time(rate(kafka_server_brokertopicmetrics_messagesinpersec[5m])[1h:5m])) > 100000
  annotations:
    summary: "生产速率异常波动"

优化效果

指标                  优化前         优化后
=========================================================
生产 P50 延迟         200ms          10ms
生产 P99 延迟         5s             50ms
消费 lag              2000w          < 1000
Broker CPU 平均       80% (不均)    40%
Partition 分布        最大-最小:3000 最大-最小:50
Controller 切换       5min/次       0(7 天)
NotLeaderForPart 错误  数万/min      0
ISR 抖动              频繁          稳定

吞吐:
- 写入:60TB/天 → 80TB/天(集群扛得住)
- 读取:150TB/天 → 200TB/天

业务影响:
- 春节大促零事故
- 消费者实时性回归 < 1s
- 业务方信任度 ↑
- SRE oncall 频率 ↓

成本:
- 集群扩容 9 broker → 12 broker(+33%)
- 但吞吐 +33% 同时 + 稳定性 ↑↑↑
- 真正的"花小钱办大事"

避坑清单

  1. acks=all + min.insync.replicas=2 + unclean.leader.election=false(0 数据丢失)
  2. enable.idempotence=true(生产端不重复)
  3. partition 数 = max(吞吐需求, 消费者数),且为消费者整倍数
  4. partition 分布不均必须 reassign,preferred leader election 周跑
  5. broker JVM heap 6-8GB,剩余 RAM 留给 page cache
  6. max.poll.records 不要太大(500-1000),max.poll.interval > 处理耗时
  7. 消费失败不 commit,但要消费幂等(可能重复)
  8. follower 同步限速,避免打满网络
  9. 大消息(> 1MB)走 OSS,Kafka 只传 ref;实在不行 message.max.bytes 调大
  10. lag 告警 + ISR 告警 + Controller 切换告警三件套

总结

Kafka 集群治理的核心是 partition 均衡 + 副本同步 + 生产消费配对。最大的认知改变:Kafka 的高吞吐不靠 JVM heap,靠 OS page cache,heap 给 6-8GB 就够,剩下的 100GB+ 都让操作系统拿去缓存日志文件 — 顺序读写 + page cache 才是 Kafka 性能的真正密码。最被低估的是 partition 重平衡,运行半年的集群手工建了无数 topic 后 partition 分布严重不均,某些 broker 累死某些闲死,定期跑 reassign + preferred leader election 是必修课。最容易踩的坑是消费者 max.poll.records 太大,新人为了"高吞吐"设成 10000,结果业务处理超过 max.poll.interval,consumer 被踢出,rebalance 风暴,lag 越拉越大 — 调到 500 + 批量处理就稳了。最后,acks=all + idempotence + min.insync.replicas=2 是金融级业务的底线,牺牲一点点延迟换来 0 数据丢失,绝对值得;只有日志类、indicator 类业务才适合 acks=1 / acks=0 提速。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

Elasticsearch 集群 50TB 治理:索引合并 + 冷热分层 + JVM 调优实录

2026-5-19 13:22:06

技术教程

Rust 所有权 ownership · 5 个常见坑与对策 完全指南:速查、踩坑与最佳实践

2026-5-19 0:14:46

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索