Kafka 文档反复强调"At least once" —— 至少一次,可能重复。但很多团队上线 Kafka 后才发现:这"可能重复"是日常,不是偶发。下游业务如果不做幂等,重复处理就是事故。本文把我们 2 年里 Kafka 消费踩的 7 个坑写出来,每个都附最小复现 + 正确写法。
坑 1:auto.offset.commit 自动提交
新人最爱用默认配置,因为"就工作了"。但 enable.auto.commit=true 是 Kafka 数据丢失的头号原因。
// 错:默认自动提交
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "my-consumer");
props.put("enable.auto.commit", "true"); // ← 默认就是这个
props.put("auto.commit.interval.ms", "5000"); // 5 秒自动提交一次
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records) {
process(r); // 处理耗时 30 秒
// 处理过程中 5 秒自动提交了 offset,但记录还没处理完
// 如果此时进程挂了,这些"已提交但未处理"的记录永久丢失
}
}
修法:关掉自动提交,处理完再手动提交:
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) continue;
for (ConsumerRecord<String, String> r : records) {
try {
process(r);
} catch (Exception e) {
// 错误处理:重试、入死信队列、或者跳过
handleError(r, e);
}
}
// 一批处理完后同步提交
consumer.commitSync();
}
坑 2:消费速度跟不上,触发 rebalance
消费者每次 poll 都要在 max.poll.interval.ms(默认 5 分钟)内再次 poll,否则 broker 认为它挂了,触发 rebalance。如果你单条记录处理 30 秒,一次 poll 500 条,处理完要 4 小时 —— 早就 rebalance 几十次了。
// 错:max.poll.records 默认 500,但每条处理慢
props.put("max.poll.records", "500");
props.put("max.poll.interval.ms", "300000"); // 5 分钟,默认
// 对:压低 max.poll.records,缩短一批的处理时间
props.put("max.poll.records", "10"); // 一次只拿 10 条
props.put("max.poll.interval.ms", "600000"); // 给 10 分钟兜底
// 或者:多线程消费(消费线程只 poll,业务线程处理)
ExecutorService workerPool = Executors.newFixedThreadPool(20);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
List<Future<?>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> r : records) {
futures.add(workerPool.submit(() -> process(r)));
}
// 等所有任务完成再提交 offset
for (Future<?> f : futures) f.get();
consumer.commitSync();
}
多线程消费要注意:必须等所有 worker 完成再 commit,否则会丢消息。
坑 3:消费幂等没做,重复扣款
Kafka 保证 at-least-once,意味着同一条消息可能被消费 2 次甚至更多。常见场景:消费者处理完业务但提交 offset 前挂掉,重启后 broker 重发这条消息。
幂等的正确做法:
@Transactional
public void process(ConsumerRecord<String, String> r) {
OrderEvent event = parse(r.value());
// 1. 幂等检查:Redis SET NX
String key = "kafka:processed:" + event.getEventId();
Boolean isNew = redis.opsForValue().setIfAbsent(key, "1", Duration.ofDays(7));
if (Boolean.FALSE.equals(isNew)) {
log.info("event already processed, skip: {}", event.getEventId());
return;
}
// 2. 业务处理(数据库)
try {
orderService.handle(event);
} catch (Exception e) {
// 业务失败,撤销 Redis 标记,让下次重试
redis.delete(key);
throw e;
}
}
更稳的做法用数据库唯一索引:
CREATE TABLE event_processed (
event_id VARCHAR(64) PRIMARY KEY,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;
-- 业务事务内同时 INSERT 这张表
-- 重复消费时 INSERT 会触发 Duplicate key,直接回滚整个事务
@Transactional(rollbackFor = Exception.class)
public void process(ConsumerRecord<String, String> r) {
OrderEvent event = parse(r.value());
// 利用 DB 唯一索引做幂等
try {
jdbc.update("INSERT INTO event_processed(event_id) VALUES(?)", event.getEventId());
} catch (DuplicateKeyException e) {
log.info("event already processed, skip");
return;
}
// 业务在同一个事务里
orderService.handle(event);
}
坑 4:消费失败的处理
消息处理失败时,常见 4 种处理策略:
- 跳过:记录失败 offset,继续消费下一条(数据丢失风险)
- 重试:本地重试 N 次,失败后跳过 → 简单但可能阻塞分区
- 死信队列:重试 N 次失败后转发到 DLQ topic,后续人工处理
- 停止消费:遇到失败就停,等人介入(数据强一致场景)
// 死信队列模式
KafkaProducer<String, String> dlqProducer = createProducer("dlq");
void processWithDLQ(ConsumerRecord<String, String> r) {
for (int attempt = 0; attempt < 3; attempt++) {
try {
process(r);
return;
} catch (TransientException e) {
log.warn("retry {}/3 for {}: {}", attempt + 1, r.offset(), e.getMessage());
try { Thread.sleep(1000L << attempt); } catch (Exception ignored) {}
} catch (Exception e) {
// 业务异常直接转 DLQ
break;
}
}
// 3 次失败 → DLQ
dlqProducer.send(new ProducerRecord<>(
"orders.dlq",
r.key(),
buildDlqValue(r, "max retries exceeded")
));
}
坑 5:消费顺序问题
Kafka 单 partition 内有序,跨 partition 无序。如果业务依赖顺序(同一个 user_id 的事件按时间处理),必须保证同 key 落到同 partition:
// Producer 端:按业务 key hash 分区(Kafka 默认行为)
producer.send(new ProducerRecord<>(
"orders",
String.valueOf(event.getUserId()), // ← key 是 user_id
event.toJson()
));
// 同一 user_id 的消息永远到同一 partition,消费有序
// Consumer 端:别多线程乱序处理同一 key
// 错:用通用线程池,同 user_id 可能被不同线程并发处理
workerPool.submit(() -> process(record));
// 对:按 key 分配到固定线程
int slot = Math.abs(record.key().hashCode()) % WORKER_COUNT;
workerPools.get(slot).submit(() -> process(record));
坑 6:rebalance 期间的处理
rebalance 时,consumer 暂停消费,分区在组员之间重新分配。如果你的代码在 rebalance 时正在处理消息但没提交 offset,新拿到这个分区的消费者会重新拉这条消息。
// 用 ConsumerRebalanceListener 在 rebalance 前提交 offset
consumer.subscribe(Collections.singletonList("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 这个分区马上要被分给其他消费者,提交我已处理的 offset
log.info("partitions revoked: {}", partitions);
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("partitions assigned: {}", partitions);
// 可以做初始化工作
}
});
Kafka 2.4+ 引入了 Cooperative Sticky Assignment 协议,可以让 rebalance 更平滑(不停顿所有消费者,只迁移必要的分区):
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
坑 7:消息体太大,producer 报错
Kafka 默认单条消息限制 1MB(message.max.bytes)。业务直接把大对象塞进去:
// 错:把订单详情整个序列化
producer.send(new ProducerRecord<>("orders", orderId, BIG_ORDER_JSON_5MB));
// RecordTooLargeException: The message is 5242880 bytes which is larger than 1048576
// 对:Kafka 只发"事件",大对象存对象存储,事件里只放 URL
ObjectStorage.upload("orders/" + orderId, orderJson);
producer.send(new ProducerRecord<>("orders", orderId,
String.format("{\"order_id\":\"%s\",\"s3_url\":\"s3://bucket/orders/%s\"}", orderId, orderId)
));
// 或者拆分大消息为多条,加 chunk_id / total_chunks 字段
// 消费端按 order_id 聚合
Go 版的消费者实现
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
GroupID: "my-consumer",
Topic: "orders",
MinBytes: 10e3, // 10KB,批量 fetch 提效
MaxBytes: 10e6, // 10MB
CommitInterval: 0, // 0 = 手动提交
})
defer r.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
log.Printf("fetch error: %v", err)
continue
}
if err := processMessage(m); err != nil {
log.Printf("process error: %v, msg=%s", err, string(m.Value))
// 错误处理:重试 / 死信 / 跳过
continue
}
// 处理成功才提交 offset
if err := r.CommitMessages(ctx, m); err != nil {
log.Printf("commit failed: %v", err)
}
}
}
func processMessage(m kafka.Message) error {
// 幂等:用 Redis 或 DB 唯一索引
// 业务处理
return nil
}
监控指标
# Prometheus 必备指标
- alert: ConsumerLagHigh
expr: kafka_consumergroup_lag > 10000
for: 5m
annotations:
summary: '{{ $labels.consumergroup }} 滞后 {{ $value }} 条'
- alert: ConsumerRebalanceFrequent
expr: rate(kafka_consumer_rebalance_total[5m]) > 0.1
for: 5m
annotations:
summary: 'consumer 频繁 rebalance,可能是 max.poll.interval 设错'
- alert: DLQGrowing
expr: kafka_topic_partition_oldest_offset{topic="orders.dlq"} > 100
annotations:
summary: '死信队列堆积,需要人工处理'
核心配置清单
# Consumer 配置
enable.auto.commit=false # 手动提交
max.poll.records=10 # 一批 10 条,处理时间可控
max.poll.interval.ms=600000 # 10 分钟兜底
session.timeout.ms=30000
heartbeat.interval.ms=3000
auto.offset.reset=earliest # 新消费者从最早开始(还是 latest 看场景)
isolation.level=read_committed # 事务消息只读 committed
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
# Producer 配置
acks=all # 等所有 ISR 副本确认
retries=10
enable.idempotence=true # producer 幂等(避免网络重试导致重复)
max.in.flight.requests.per.connection=5
compression.type=zstd # zstd 压缩率高速度快
linger.ms=10 # 攒 10ms 再发,提升批量效率
batch.size=32768
消费者代码 review 6 问
- 是不是关了 auto.commit?
- 幂等怎么做?Redis SET NX 还是 DB 唯一索引?
- 失败如何处理?重试 / DLQ / 跳过 哪一种?
- rebalance listener 提交 offset 了吗?
- 消息顺序要求是什么?用 key 分区?
- 消费延迟监控有没有?>1 万条要告警吗?
把这 6 问当 code review checklist,Kafka 消费侧 bug 能砍掉 90%。剩下 10% 是真正的疑难杂症,值得专门复盘。
—— 别看了 · 2026