2023 年我们公司有一套订单系统 上游产生事件 下游有 6 个消费服务 用 Kafka 做异步消息总线 一开始我接手时配置很 标准 3 个 broker 默认 partition 数 ack=1 producer 自动批 consumer auto-commit 看起来该有的都有 测试环境跑得也挺顺。但上线半年我们陆续踩了一堆坑。第一种最让我傻眼 某次机房故障 一个 broker 挂了 我以为副本机制应该自动转 结果有 200 条订单消息丢了 排查发现是 ack=1 producer 收到 leader 的 ack 就走了 leader 还没同步给 follower 就宕机消息就丢了。第二种最难缠 我们的下游消费服务用了 auto-commit 5 秒一次 某次消费到一半服务挂了 重启后 offset 已经被自动提交 那批未处理的消息再也不会被消费 业务侧少了 80 单。第三种最离谱 我们的消息有顺序要求 同一订单的支付完成 发货 完成 必须按顺序处理 但默认 partition 路由是 hash key 我们居然没设 key 消息被分散到 8 个 partition 顺序完全乱套 客户投诉发货状态早于支付状态。第四种最致命 某次促销流量 10 倍 producer 端 batch 没调好 broker 端 partition 数太少 消息堆积到 200 万 consumer 追了 6 小时才追平。第五种最莫名其妙 同一条消息被消费了 3 次 客户被扣了 3 次款 排查发现是 consumer 处理慢 超过 session.timeout.ms rebalance 触发 但原 consumer 还在处理没退出 消息被新 consumer 重新拿走 全程没有任何告警。我盯着这一连串问题想了很久才彻底想明白第一版错在一个根本的认知上我以为 Kafka 就是 push 进去 pull 出来 自带高可用和顺序 配置不用动 可这个认知是错的真正能扛业务的 Kafka 是一个 ack 与 ISR 设计 加 partition 与 key 规划 加 consumer 提交策略 加 幂等与事务 加 监控与容量规划 的整套工程方法论 任何一环没做都可能在某次峰值或故障里造成消息丢失重复或顺序错乱本文从头梳理 Kafka 的核心原理 Producer ack 与 ISR 的关系 partition 与 key 的设计 consumer offset 与 rebalance 的工作机制 exactly-once 语义 怎么实现 以及一些把 Kafka 做扎实要避开的工程坑
问题背景:为什么 Kafka 看起来简单做起来满是坑
很多人对 Kafka 的认知是 高吞吐持久化消息队列 默认配置就能跑 但生产里你会发现 同样的代码 不同的配置 同样的流量 可能 0 丢失也可能丢失 1% 同样的故障 可能 0 影响也可能业务停摆几小时。问题的根源在于:
- 消息可靠性是 producer broker consumer 三方协议:任何一方配置错都会破坏 Exactly-once 保证。
- partition 是 Kafka 并行度和顺序的根本单元:数量太少限制吞吐 太多增加 broker 负担 key 选错破坏顺序。
- consumer group rebalance 是常见隐患:慢消费触发 rebalance 处理中消息可能被重复消费 必须设 max.poll.interval.ms。
- auto-commit 是常见数据丢失源:offset 自动提交可能在消息未处理完时就推进 重启后这批消息再也不会被消费。
- Exactly-once 不是默认行为:必须显式启用 enable.idempotence 和事务 API 而且性能有一定代价。
- 监控不到位等于盲飞:必须监控 lag rebalance 次数 ISR 缩水 broker 健康度 不然故障来了才发现已经晚了。
一 Producer 端 ack 与 ISR:数据持久化的根本
Producer 端最重要的参数是 acks 和 enable.idempotence。acks=0 不等待 broker 确认 最快但消息可能丢 acks=1 等 leader 确认 折中 acks=all 或 -1 等 ISR 集合所有副本确认 最安全。配合 min.insync.replicas 保证至少 N 个副本同步 ack 才生效 否则发送失败。
// Kafka Producer 生产可靠配置 不丢消息基线
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 关键可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等所有 ISR 确认
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等开启
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // 幂等模式 <=5
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 总超时 2 分钟
// 性能与可靠性折中
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 压缩降带宽
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 批大小 64KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 攒批 20ms
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 缓冲 64MB
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576); // 单请求 1MB
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 同步发送适合金额这类必须确认的场景
public void sendOrderEvent(String orderId, String payload) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, payload);
try {
RecordMetadata md = producer.send(record).get(5, TimeUnit.SECONDS);
log.info("sent topic={} partition={} offset={}",
md.topic(), md.partition(), md.offset());
} catch (Exception e) {
log.error("send failed orderId={}", orderId, e);
throw e;
}
}
关键配置 acks=all + enable.idempotence=true + retries=MAX + min.insync.replicas=2 broker 端配 这是生产环境的 不丢消息基线 缺一不可。min.insync.replicas 在 broker 端配置 写命令必须有 N 个副本同步成功才返回 OK 否则报错 让 producer 重试 配合 replication.factor=3 能在单 broker 宕机时仍可写。
# broker 端关键配置 server.properties
# 默认副本数 新 topic 自动用这个值
default.replication.factor=3
# 最小同步副本 配合 acks=all 用 副本不够写就失败
min.insync.replicas=2
# 关闭 unclean leader election 否则会让落后副本当 leader 导致消息丢
unclean.leader.election.enable=false
# 强制每个分区独立 log segment 不混杂
log.segment.bytes=1073741824
log.retention.hours=168
log.retention.bytes=-1
# 副本同步参数 ISR 慢就剔除
replica.lag.time.max.ms=30000
二 Partition 与 Key:并行度与顺序的根本
partition 数量决定了 topic 的并行能力 也决定了消息顺序的边界。同一 partition 内消息有序 跨 partition 无序。key 决定消息路由到哪个 partition 默认 hash 路由 同 key 必去同 partition。
// 顺序场景必须设 key 否则会被 round-robin 分散到多 partition
public void sendOrderEventOrdered(String orderId, String eventType, String payload) {
// 用 orderId 做 key 同一订单的所有事件去同一 partition 保证顺序
String key = orderId;
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-events", key, payload);
record.headers().add("eventType", eventType.getBytes(StandardCharsets.UTF_8));
producer.send(record);
}
// partition 数量规划基于吞吐需求
// 单 partition 写入约 10-50 MB/s 单 consumer 消费约 5-20 MB/s
// 假设峰值 200 MB/s 写入 至少 8-10 个 partition
// consumer 并发数 = partition 数 partition 多余 consumer 会闲置
// 已存在 topic 动态增 partition 但不能减
// 增 partition 会让历史按 key 路由的消息打乱 必须停业务后做 或者用 sticky partition
// 创建 topic 显式设 partition 与 replication
// kafka-topics.sh --create --topic orders \
// --partitions 12 --replication-factor 3 \
// --config min.insync.replicas=2 \
// --config retention.ms=604800000
partition 设计的关键原则 第一是按峰值吞吐规划 不要按当前规划 一年后就要增 partition 是痛苦事 第二是 key 是顺序保证的唯一手段 必须用稳定的业务 ID 做 key 而不是临时 ID。第三个原则容易被忽视 partition 数量不能动态减少 增加 partition 会让按 key hash 的路由表变化 历史 key 可能跑到新 partition 导致顺序错乱 改 partition 数前必须停业务。
三 Consumer Offset 与 Rebalance:数据丢失重复的源头
Consumer 端最容易出问题的是 offset 提交策略和 rebalance 行为。auto-commit 看似省事 但会让你不知道哪些消息真正被处理。生产环境推荐手动提交 在业务处理完后再 commit。
// Consumer 生产可靠配置 不丢不重基线
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 关键参数 关闭 auto-commit
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 新 consumer 从头消费
// rebalance 与心跳
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 分钟 业务慢必须调
// 拉取参数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 65536);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"), new RebalanceListener());
手动提交分两种 commitSync 同步阻塞但可靠 commitAsync 异步快但失败不重试 实践中 推荐 主循环用 commitAsync 提速 关键节点比如 rebalance 前用 commitSync 保证 这是两者结合的工程做法。下面是手动提交的典型循环结构 处理完一批再提交 失败就让下次重消费 业务侧必须做幂等去重。
// 手动提交模式 处理完再提交
public void consumeLoop() {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
try {
processOrder(record);
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
} catch (Exception e) {
log.error("process failed offset={} key={}", record.offset(), record.key(), e);
// 不提交 offset 下次会重试 但要做幂等
break;
}
}
if (!offsets.isEmpty()) {
consumer.commitSync(offsets);
}
}
}
关键原则 处理完后再提交 offset 失败不提交让下次重消费 业务必须做幂等 否则重消费就是重复扣款。max.poll.interval.ms 是另一个隐患 默认 5 分钟 如果你的业务处理一个 batch 超过 5 分钟 心跳还在但 poll 没来 会被踢出 group 导致 rebalance 然后那批消息会被另一个 consumer 重新消费 必须根据业务最长处理时间设这个值。
// rebalance 监听器 在 rebalance 前提交本地未 commit 的 offset 避免重消费
public class RebalanceListener implements ConsumerRebalanceListener {
private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private final KafkaConsumer<String, String> consumer;
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 在分区被收走之前提交已处理 offset
log.warn("partitions revoked count={}", partitions.size());
if (!currentOffsets.isEmpty()) {
consumer.commitSync(currentOffsets);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("partitions assigned count={}", partitions.size());
}
}
四 Exactly-Once 与事务:跨系统一致性
很多业务场景需要 exactly-once 比如 Kafka 消息触发数据库写入 必须 数据库写入和 Kafka offset 提交在一个原子操作 否则就会产生消息处理但 offset 没提交 重启重复消费的问题。Kafka 0.11+ 支持事务 producer 配合 consumer 的 isolation.level=read_committed 实现 exactly-once。
// Producer 事务配置
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-" + instanceId);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
// Consume-process-produce 三步事务
public void consumeAndProduce(KafkaConsumer<String, String> consumer) {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
String result = processOrder(record);
producer.send(new ProducerRecord<>("processed-orders", record.key(), result));
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition tp : consumer.assignment()) {
offsets.put(tp, new OffsetAndMetadata(consumer.position(tp)));
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
log.error("transaction aborted", e);
}
}
}
// Consumer 读已提交 避免读到未提交的事务消息
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
事务的代价是 性能下降 30-50% 因为有事务协调器开销 必须按业务关键度选择。订单支付这类强一致性场景必用事务 日志聚合分析这类容忍少量重复的场景用幂等 producer + 业务层去重就够了。
[mermaid]flowchart TD
A[业务请求] --> B[Producer
acks=all idempotent]
B -->|partition key=orderId| C[Broker Leader]
C -->|同步副本| D[ISR 副本1]
C -->|同步副本| E[ISR 副本2]
D -->|ack| C
E -->|ack| C
C -->|ack| B
C --> F[Consumer Group
read_committed]
F -->|按 partition 分配| G[Consumer 1]
F -->|按 partition 分配| H[Consumer 2]
G --> I[业务处理 + 幂等]
H --> I
I --> J[手动提交 offset]
J --> F
五 监控:看不到就等于没运维
Kafka 集群必须有完整监控 否则故障来了你根本不知道是哪里出了问题。核心指标包括 consumer lag broker 健康 ISR 缩水 rebalance 次数 网络带宽。
# 关键监控指标 用 kafka-exporter + Prometheus + Grafana
# 1. Consumer Lag 最重要的指标
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
--describe --group order-processor
# 输出示例
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
# orders 0 12450 12500 50 consumer-1
# orders 1 12380 12500 120 consumer-2
# 2. ISR 健康度 ISR < replication.factor 就要告警
kafka-topics.sh --bootstrap-server kafka1:9092 \
--describe --topic orders
# 3. broker 端 JMX 关键指标
# kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions // 必须为 0
# kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce // P99
# kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Fetch
# kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
# kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
# 4. 自动告警规则示例 Prometheus alertmanager
groups:
- name: kafka
rules:
- alert: KafkaConsumerLagHigh
expr: kafka_consumergroup_lag > 100000
for: 5m
annotations:
summary: "Consumer lag too high"
- alert: KafkaUnderReplicated
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 2m
annotations:
summary: "Under-replicated partitions"
- alert: KafkaRebalancesFrequent
expr: rate(kafka_consumer_coordinator_rebalance_total[5m]) > 0.1
for: 5m
annotations:
summary: "Frequent rebalances detected"
Consumer Lag 是最重要的业务指标 一定要按业务设阈值告警 不同 topic 阈值不同。订单这类强实时性业务 lag>10000 就要告警 日志聚合这类弱实时性业务 lag>100 万才告警。Rebalance 频率高一般是 consumer 处理慢导致心跳超时 必须排查处理逻辑。
六 Kafka 的工程坑:那些文档里学不到的
讲完原理来说几个真实生产里踩过的坑。第一个坑是 topic 删除不会立刻释放磁盘 配置 delete.topic.enable=true 加重启才行 一些老版本甚至彻底不能删 必须用 retention 自然过期 容量规划要预留。第二个坑是 单 partition 单 consumer 顺序消费的瓶颈 业务量大时 单 partition 处理不过来 增 partition 又破坏 key 路由 解法是用 message key 做内部并行 但要保证同 key 串行 异 key 并行 这需要应用层做调度。第三个坑是 ZooKeeper 时代 Kafka 集群是双依赖 ZK 故障 Kafka 集群不能选主 新版 KRaft 模式去掉了 ZK 但迁移成本不低 现有 ZK 集群不要急着改。第四个坑是 消息压缩选错 lz4 平衡 snappy 速度快 zstd 压缩率高 gzip 已过时 普通业务推荐 lz4 大日志类业务用 zstd。第五个坑是 Kafka 集群扩缩容 加 broker 容易 减 broker 难 数据 reassignment 是耗时操作 严重时影响业务 必须用 kafka-reassign-partitions.sh 配合限流 throttle 慢慢迁不能一刀切。
关键概念速查
| 概念 | 含义 | 工程价值 |
|---|---|---|
| acks=all | 所有 ISR 确认 | 不丢消息底线 |
| min.insync.replicas | 最小同步副本 | broker 端配 防退化 |
| enable.idempotence | 幂等 producer | 避免重试重复 |
| partition | 并行单元 | 吞吐与顺序边界 |
| message key | 路由依据 | 顺序保证唯一手段 |
| auto.offset.reset | 新 consumer 起点 | earliest 或 latest |
| max.poll.interval.ms | poll 最大间隔 | 处理慢时必须调大 |
| 事务 producer | 跨 topic 原子 | Exactly-once 基础 |
| isolation.level | 读已提交 | 避免读到未提交事务 |
| Consumer Lag | 未消费消息数 | 核心业务监控 |
避坑清单
- Producer 必须 acks=all + enable.idempotence + 无限 retries 是不丢消息的底线。
- broker 端必须 min.insync.replicas=2 + replication.factor=3 + 关闭 unclean leader 选举。
- 顺序场景必须设 message key 用业务 ID 否则消息会被分散打乱。
- partition 数量按峰值吞吐规划 一年后再扩很痛苦 增加 partition 会打乱历史 key 路由。
- Consumer 必须关闭 auto-commit 处理完再手动提交 否则可能消息没处理 offset 已推进。
- max.poll.interval.ms 必须按业务最长处理时间设 否则慢消费触发 rebalance 重复消费。
- 跨系统强一致性场景用事务 producer + isolation.level=read_committed。
- 幂等是业务层职责 Kafka 只保证至少一次或事务 业务必须做去重表或唯一约束。
- Consumer Lag 必须按业务设告警阈值 ISR 缩水 rebalance 频繁都要告警。
- 扩缩容用 kafka-reassign-partitions 加 throttle 限速 不要一刀切迁移影响业务。
总结
Kafka 这事 很多人的直觉是 push 消息 pull 消息 默认配置跑 这其实是把 我会用 KafkaProducer 和 KafkaConsumer 和 我能在生产保住消息不丢不重不错乱 混为一谈。前者是会调 API 后者是懂消息中间件工程。中间隔着的是 producer 可靠性配置 broker 副本机制 partition 与 key 设计 consumer offset 与 rebalance 事务与幂等 监控与容量规划 整整一套工程方法论。
从原型到生产 你需要做的事远不止 发送和接收。你要懂 acks ISR min.insync.replicas 的三方协议 要会规划 partition 数量和 key 要会管 offset 与 rebalance 要会用事务做 exactly-once 要会监控 lag 与副本健康 要会处理扩缩容。每一项单独看都不复杂 但它们组合在一起 才是一个能扛业务的 Kafka 体系。少任何一项 都可能在某次高峰或故障里 让你的消息丢一批 重一批 顺序乱一批 然后业务被客户追到办公桌前。
我经常用一个比喻来理解 Kafka 它有点像一个智能仓储系统。Producer 是入库车 broker 是货架仓库 ISR 是多个备份仓库 partition 是仓库的多个货架 key 是货物分类标签 consumer 是出库车 offset 是出库进度记录 事务是入库出库的统一账本。你不能因为有了仓库就觉得货物不会丢 还要管入库时是否真正入到位 多仓库是否真正同步 出库进度是否真的更新了 这才是一整套仓储工程。
这套架构最难的地方在于 它的复杂度在 normal traffic 时几乎完全暴露不了。Kafka 跑着挺好 业务也跑着挺顺 你觉得 Kafka 真省心。但真正出故障 broker 宕机 网络抖动 消费慢 promotion 流量峰值 你才发现 99% 的复杂度都在 那 1% 的极端 case 里 acks 配错丢消息 partition key 错乱 offset 提前提交 rebalance 重复消费。建议任何想用 Kafka 的团队 上线前一定要做故障演练 故意杀一个 broker 故意把 consumer 卡死 故意触发 rebalance 看业务表现如何 千万别等真实故障来教你 那时候业务损失已经造成了。
—— 别看了 · 2026