2024 年我们的 Kafka 集群:9 个 broker,500 个 topic,15000 partition,日吞吐 80TB,某个春节大促开始集群异常 — 生产者大量 NotLeaderForPartitionException,消费者 lag 飙升到 2000w 条,部分 broker CPU 100%,Controller 切换频繁,客户端连接报错。投了两周做集群治理,生产端延迟 P99 从 5s 降到 50ms,消费 lag 稳定在 1000 条内,broker 平均 CPU 40%,扛过了第二轮峰值。本文复盘 Kafka 集群高可用、高吞吐治理的完整实战,覆盖 broker 调优、partition 规划、生产消费、监控告警。
事故现场
集群:Kafka 3.4 (KRaft 模式)
节点:9 broker,每个 32 核 / 128GB / 8TB NVMe × 4
Topic:500 个,partition 15000
副本:replication.factor=3,min.insync.replicas=2
日吞吐:写入 80TB / 读取 200TB
故障现象:
- 生产者:NotLeaderForPartitionException 大量
- 生产者:RecordTooLargeException(单条 > 1MB)
- 生产者 P99 延迟:50ms → 5s
- 消费者 lag:正常 < 1000 → 暴涨到 2000w
- Broker:个别节点 CPU 100%,磁盘 IO util 99%
- Controller:每 5min 切换一次
- ZooKeeper(老集群):session expire
排查:
1. _cat /broker-info: 个别 broker partition 数 3000+(其他 1500)
2. JMX kafka.server:type=ReplicaManager: 副本同步 ISR 抖动
3. log4j: ERROR Failed to read 'shrink isr' command from controller
4. kafka-topics --describe: 某些 topic partition 严重不均
5. kafka-log-dirs: 单 broker 磁盘占用 95%+
根因:
- partition 分配不均(手工建 topic 没指定 broker 列表)
- 大消息没分片(单条 5MB 撑爆 broker buffer)
- 消费者 max.poll.records 太大(单次拉 10000 条,处理超时)
- replication 网络带宽打满
修复 1:Broker 核心配置
# server.properties
# === 节点标识 ===
broker.id=1
node.id=1
process.roles=broker,controller # KRaft 模式
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
# === 网络 ===
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
advertised.listeners=PLAINTEXT://host1.internal:9092
num.network.threads=8 # = CPU 核心数
num.io.threads=16 # 磁盘 IO 线程
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600 # 100MB(支持大消息)
# === 日志(数据)存储 ===
log.dirs=/data1/kafka,/data2/kafka,/data3/kafka,/data4/kafka
# 多目录 = 多磁盘并发,提升吞吐
num.partitions=12 # 新 topic 默认 partition 数
num.recovery.threads.per.data.dir=4
# === 副本 ===
default.replication.factor=3
min.insync.replicas=2 # 至少 2 副本 ack
unclean.leader.election.enable=false # 不允许非 ISR 当 leader(避免丢数据)
replica.lag.time.max.ms=30000 # 副本落后超时
replica.socket.receive.buffer.bytes=65536
replica.fetch.max.bytes=10485760
# === 副本同步带宽限制(关键)===
# 避免 follower 同步打满网络
follower.replication.throttled.rate=100000000 # 100MB/s
leader.replication.throttled.rate=100000000
# === 日志保留 ===
log.retention.hours=72 # 3 天
log.retention.bytes=-1
log.segment.bytes=1073741824 # 1GB/segment
log.retention.check.interval.ms=300000
log.cleaner.enable=true
# === 性能 ===
message.max.bytes=10485760 # 单消息上限 10MB
compression.type=lz4 # broker 不重压缩
log.flush.interval.messages=10000 # 异步 flush
log.flush.interval.ms=1000
# === JVM(kafka-server-start.sh) ===
# KAFKA_HEAP_OPTS="-Xms8g -Xmx8g" # 6-8GB,不要更大(剩余给 page cache)
# KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20"
# 系统 page cache 才是 Kafka 性能关键(顺序读写)
修复 2:Topic / Partition 设计
# 1. partition 数计算
# 公式:max(目标吞吐/单 partition 吞吐, 消费者数)
# 单 partition 吞吐:写 10MB/s,读 30MB/s
# 100MB/s 写需要 10 partition
# 50 个消费者需要 50 partition
# 取 max,且为消费者整倍数
# 2. 创建 topic(均衡分布)
$ kafka-topics --bootstrap-server kafka:9092 \
--create --topic orders \
--partitions 36 \
--replication-factor 3 \
--config retention.ms=259200000 \
--config segment.bytes=1073741824 \
--config compression.type=lz4 \
--config min.insync.replicas=2
# 3. 重平衡 partition(unbalanced 修复)
$ kafka-reassign-partitions --bootstrap-server kafka:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3,4,5,6,7,8,9" \
--generate > reassign.json
# 编辑 reassign.json,确认 partition 分配
$ kafka-reassign-partitions --bootstrap-server kafka:9092 \
--reassignment-json-file reassign.json \
--execute \
--throttle 100000000 # 限速 100MB/s
# 验证
$ kafka-reassign-partitions --bootstrap-server kafka:9092 \
--reassignment-json-file reassign.json --verify
# 4. 监控 partition 分布
$ kafka-topics --bootstrap-server kafka:9092 --describe | \
awk '/Leader/ {print $4}' | sort | uniq -c | sort -rn
# 查看每个 broker 当 leader 的 partition 数,应该均衡
# 5. preferred leader election(每周跑一次)
$ kafka-leader-election --bootstrap-server kafka:9092 \
--election-type preferred --all-topic-partitions
修复 3:生产者优化
// Producer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
// === 可靠性 ===
props.put("acks", "all"); // 所有 ISR ack
props.put("retries", 3);
props.put("max.in.flight.requests.per.connection", 5);
props.put("enable.idempotence", true); // 幂等生产(避免重复)
// === 性能 ===
props.put("batch.size", 65536); // 64KB 批
props.put("linger.ms", 20); // 等 20ms 凑批
props.put("compression.type", "lz4"); // 压缩
props.put("buffer.memory", 67108864); // 64MB 缓冲
// === 大消息 ===
props.put("max.request.size", 5242880); // 5MB(< broker message.max.bytes)
// === 超时 ===
props.put("request.timeout.ms", 30000);
props.put("delivery.timeout.ms", 120000);
KafkaProducer producer = new KafkaProducer<>(props);
// 异步发送 + 回调(高性能)
ProducerRecord record = new ProducerRecord<>(
"orders",
orderId, // key,保证同 key 进同 partition
JSON.toJSONString(order)
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("send failed: {}", record, exception);
// 兜底:落库后台重发
failedQueue.offer(record);
} else {
log.debug("sent to {}-{}@{}", metadata.topic(), metadata.partition(), metadata.offset());
}
});
// 关键设计:
// 1. acks=all + idempotence + replication=3: 0 数据丢失
// 2. 同 orderId 进同 partition,保证局部有序
// 3. batch + linger + compression 提升 5-10x 吞吐
// 4. callback 异步处理失败,不阻塞
修复 4:消费者优化
// Consumer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "order-processor");
// === 拉取 ===
props.put("max.poll.records", 500); // 单次最多 500 条(原 10000)
props.put("max.poll.interval.ms", 300000); // 5min 处理超时
props.put("fetch.min.bytes", 1048576); // 1MB 最小拉取
props.put("fetch.max.wait.ms", 500);
props.put("max.partition.fetch.bytes", 5242880); // 5MB/partition
// === offset 提交 ===
props.put("enable.auto.commit", false); // 手动提交
props.put("auto.offset.reset", "latest"); // 新 group 从最新开始
// === 心跳 ===
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
while (running) {
ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
// 批量处理(高吞吐)
List batch = new ArrayList<>();
for (ConsumerRecord r : records) {
batch.add(JSON.parseObject(r.value(), Order.class));
}
try {
orderService.batchProcess(batch);
consumer.commitSync(); // 处理完才 commit
} catch (Exception e) {
log.error("batch process failed", e);
// 不 commit,下次重新拉
}
}
// 多线程消费(单 consumer 慢的情况)
// 方案 1:多消费者实例(推荐,Kafka 原生)
// 方案 2:单 consumer 拉,提交线程池处理(注意 commit 时机)
// 关键陷阱:
// 1. max.poll.records 太大 → 处理超时 → consumer 被踢出
// 2. 处理过程中 rebalance → 没 commit 的数据重复消费(需幂等)
// 3. CommitFailedException 必须捕获(已 rebalance)
修复 5:Consumer Group 监控 + lag 告警
# 1. 查看消费组状态
$ kafka-consumer-groups --bootstrap-server kafka:9092 \
--describe --group order-processor
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG HOST
order-proc orders 0 1234567 1234890 323 consumer-1
order-proc orders 1 1234500 1234890 390 consumer-2
...
# 2. 找 lag 最大的 partition
$ kafka-consumer-groups --bootstrap-server kafka:9092 --describe --group order-processor | \
awk 'NR>1 {print $5, $1, $2, $3}' | sort -rn | head -10
# 3. lag 告警(Prometheus + kafka-exporter)
- alert: KafkaConsumerLagHigh
expr: sum by(consumergroup, topic) (kafka_consumergroup_lag) > 100000
for: 5m
annotations:
summary: "{{ $labels.consumergroup }} lag {{ $value }} > 10w"
- alert: KafkaConsumerLagGrowing
expr: deriv(kafka_consumergroup_lag[10m]) > 100
for: 10m
annotations:
summary: "{{ $labels.consumergroup }} lag 持续增长"
# 4. 紧急处理 lag 飙升
# 方案 1:增加消费者(< partition 数)
$ kubectl scale deployment order-processor --replicas=36
# 方案 2:重置 offset 到最新(放弃旧数据)
$ kafka-consumer-groups --bootstrap-server kafka:9092 \
--group order-processor --reset-offsets --to-latest \
--topic orders --execute
# 方案 3:dump 到独立 topic 慢慢处理
# kafka-mirror-maker 或自己写小工具
修复 6:Broker 监控告警
# Prometheus + kafka-exporter + JMX exporter
# Broker 健康
- alert: KafkaBrokerDown
expr: up{job="kafka"} == 0
for: 1m
labels: { severity: critical }
annotations:
summary: "Broker {{ $labels.instance }} down"
# Controller 切换(异常信号)
- alert: KafkaControllerFlap
expr: changes(kafka_controller_active[10m]) > 2
annotations:
summary: "Controller 切换 > 2 次/10min"
# 副本未同步
- alert: KafkaUnderReplicated
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 5m
annotations:
summary: "{{ $value }} 个 partition 副本不足"
# ISR 抖动
- alert: KafkaISRShrink
expr: rate(kafka_server_replicamanager_isrshrinkspersec[5m]) > 0.1
for: 5m
annotations:
summary: "ISR 频繁收缩"
# 磁盘
- alert: KafkaDiskHigh
expr: 1 - node_filesystem_avail_bytes{mountpoint=~"/data.*"} / node_filesystem_size_bytes > 0.85
annotations:
summary: "{{ $labels.instance }} 磁盘 > 85%"
# 网络
- alert: KafkaNetworkIdleHigh
expr: avg(kafka_network_socketserver_networkprocessoravgidlepercent) < 0.3
for: 5m
annotations:
summary: "网络线程繁忙,idle < 30%"
# Request 队列
- alert: KafkaRequestQueueHigh
expr: kafka_network_requestchannel_requestqueuesize > 100
annotations:
summary: "请求队列 > 100,broker 压力大"
# 生产消费速率
- alert: KafkaProduceRateAnomaly
expr: abs(rate(kafka_server_brokertopicmetrics_messagesinpersec[5m]) -
avg_over_time(rate(kafka_server_brokertopicmetrics_messagesinpersec[5m])[1h:5m])) > 100000
annotations:
summary: "生产速率异常波动"
优化效果
指标 优化前 优化后
=========================================================
生产 P50 延迟 200ms 10ms
生产 P99 延迟 5s 50ms
消费 lag 2000w < 1000
Broker CPU 平均 80% (不均) 40%
Partition 分布 最大-最小:3000 最大-最小:50
Controller 切换 5min/次 0(7 天)
NotLeaderForPart 错误 数万/min 0
ISR 抖动 频繁 稳定
吞吐:
- 写入:60TB/天 → 80TB/天(集群扛得住)
- 读取:150TB/天 → 200TB/天
业务影响:
- 春节大促零事故
- 消费者实时性回归 < 1s
- 业务方信任度 ↑
- SRE oncall 频率 ↓
成本:
- 集群扩容 9 broker → 12 broker(+33%)
- 但吞吐 +33% 同时 + 稳定性 ↑↑↑
- 真正的"花小钱办大事"
避坑清单
- acks=all + min.insync.replicas=2 + unclean.leader.election=false(0 数据丢失)
- enable.idempotence=true(生产端不重复)
- partition 数 = max(吞吐需求, 消费者数),且为消费者整倍数
- partition 分布不均必须 reassign,preferred leader election 周跑
- broker JVM heap 6-8GB,剩余 RAM 留给 page cache
- max.poll.records 不要太大(500-1000),max.poll.interval > 处理耗时
- 消费失败不 commit,但要消费幂等(可能重复)
- follower 同步限速,避免打满网络
- 大消息(> 1MB)走 OSS,Kafka 只传 ref;实在不行 message.max.bytes 调大
- lag 告警 + ISR 告警 + Controller 切换告警三件套
总结
Kafka 集群治理的核心是 partition 均衡 + 副本同步 + 生产消费配对。最大的认知改变:Kafka 的高吞吐不靠 JVM heap,靠 OS page cache,heap 给 6-8GB 就够,剩下的 100GB+ 都让操作系统拿去缓存日志文件 — 顺序读写 + page cache 才是 Kafka 性能的真正密码。最被低估的是 partition 重平衡,运行半年的集群手工建了无数 topic 后 partition 分布严重不均,某些 broker 累死某些闲死,定期跑 reassign + preferred leader election 是必修课。最容易踩的坑是消费者 max.poll.records 太大,新人为了"高吞吐"设成 10000,结果业务处理超过 max.poll.interval,consumer 被踢出,rebalance 风暴,lag 越拉越大 — 调到 500 + 批量处理就稳了。最后,acks=all + idempotence + min.insync.replicas=2 是金融级业务的底线,牺牲一点点延迟换来 0 数据丢失,绝对值得;只有日志类、indicator 类业务才适合 acks=1 / acks=0 提速。
—— 别看了 · 2026