2023 年我们做一个支付对账系统 上游 Kafka 推支付成功消息 下游消费写入对账库 第一版用 Spring Kafka 默认配置 enable.auto.commit=true 跑得飞起 老板说"Kafka 这么稳" 上线第一周就出大事。一次 Pod OOM 重启 消费者重新加入 group 触发 rebalance offset 提交跟消费处理时序错位 一批已经写入对账库的消息被重复消费 财务对账多出 12 万条重复记录 一晚上人工对账到天亮。第一种最让我傻眼是 auto.commit 是"周期性提交" 默认 5 秒一次 也就是说我消费完一条 commit 不立刻发生 中间崩了这条就会被另一个消费者再吃一遍 表面看是"至少一次"实际是"经常重复";第二种最难缠是 partition rebalance 触发频率比想象高得多 消费者扩缩容 / 心跳超时 / GC 卡顿 都会触发 每次 rebalance 消费暂停 30 秒到 2 分钟 上游堆积 lag 飙到几百万;第三种最离谱是消费线程模型 一个 consumer 拿到 100 条消息 我们用线程池并发处理 然后 commit 但 commit 提交的是 batch 末尾 offset 中间某条失败 重启后从 batch 头开始重消费 已经成功的消息再处理一次 业务侧没幂等就重复落库;第四种最致命是跨 topic 的 exactly-once 我们要"消费 A topic 写 B topic + 更新 DB"三个动作必须原子 但 Kafka transaction 只能管 Kafka 自己 DB 是另一回事 必须用 outbox pattern 或两阶段提交;第五种最莫名其妙是 max.poll.records 与 max.poll.interval.ms 这两个参数的关系 我们设了 max.poll.records=500 但消费 500 条耗时 6 分钟 超过 max.poll.interval.ms=5 分钟默认值 消费者被踢出 group rebalance 死循环;第六种最坑是 consumer lag 监控 我们只看 lag 总数 没分 partition 看 结果某个 partition 因为 key 倾斜 lag 30 万其他 partition 0 总 lag 不显眼根本发现不了。真正能投产的 Kafka 消费体系是手动 commit + 消费幂等 + rebalance 友好的处理模型 + exactly-once transaction + 监控分 partition lag + 死信队列 + 流量回放的完整体系,任何一环失守都可能让你的对账系统从"实时可信"退化成"每天人工对账"。本文从踩坑视角梳理 Kafka 消费者组的工程要点,offset 怎么管 rebalance 怎么避 幂等怎么做 exactly-once 怎么落地 监控怎么建 死信怎么处理,以及一些把消息消费系统做扎实要避开的工程坑。
问题背景:为什么 enable.auto.commit=true 远远不够
很多 Java/Python 团队上手 Kafka 都从 Spring Kafka 或 kafka-python 默认配置起步 但生产化消费远比 demo 复杂:
- Offset 提交时机:auto vs manual / sync vs async / before vs after processing 直接决定语义。
- Rebalance 抖动:扩缩容 / GC / 网络抖动都会触发 处理不当消费整体暂停。
- 消费幂等:Kafka 至少一次语义 业务侧必须幂等否则重复落库。
- Exactly-once:跨 topic / 跨系统的原子性需要 transaction + outbox。
- 背压与流控:max.poll.records 与处理时长配比错就触发踢出 group。
- 监控告警:lag 必须分 partition 看 否则 key 倾斜被掩盖。
一 Offset 管理:手动 commit 是底线
auto.commit 是 Kafka 给"我不关心重复消费"的场景准备的快捷开关 任何业务关键消息必须手动 commit 并控制 commit 时机在"处理完成之后"。
// 1 关闭 auto.commit
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-reconcile");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 关键
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 分钟
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30 秒
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 10 秒
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 配合 transaction
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("payment-events"));
// 2 处理后再 commit(每条精细控制)
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record); // 业务处理 必须幂等
// 累积 offset+1(注意是 offset+1 不是 offset)
offsetsToCommit.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
} catch (Exception e) {
log.error("process failed at offset={}", record.offset(), e);
// 关键决策 失败要么死信 要么停在这里不前进
sendToDeadLetterQueue(record);
offsetsToCommit.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
}
// 3 同步 commit(强一致 但慢)
if (!offsetsToCommit.isEmpty()) {
try {
consumer.commitSync(offsetsToCommit);
} catch (CommitFailedException e) {
log.error("commit failed - likely rebalance", e);
// 这批可能被另一个 consumer 重消费 业务必须幂等
}
}
}
// 4 异步 commit(高吞吐 但需 fallback)
consumer.commitAsync(offsetsToCommit, (offsets, exception) -> {
if (exception != null) {
log.warn("async commit failed, will retry sync next round", exception);
}
});
// 5 优雅停机 必须最后 commitSync
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
running.set(false);
try {
consumer.commitSync();
consumer.close(Duration.ofSeconds(30));
} catch (Exception e) {
log.error("graceful shutdown failed", e);
}
}));
// 6 错误的 commit 时机(常见反模式)
// 反模式 A 处理前 commit -> 处理失败消息丢失
for (ConsumerRecord r : records) {
consumer.commitSync(); // 错 还没处理就提交
process(r); // 这里失败消息永远丢
}
// 反模式 B 批次中间 commit -> 部分成功重启后从头
for (int i = 0; i < records.size(); i++) {
process(records.get(i));
if (i % 10 == 0) consumer.commitAsync(); // 错 batch 切断
}
实战经验:auto.commit 在任何业务关键消息上必须关 不要省事;commitSync 保证 commit 失败可见 commitAsync 配合下一轮 sync 是常用兼顾吞吐与可靠的模式;commit 的 offset 是"下一条要读的位置"必须 +1 这个一直有人写错;关闭前一定 commitSync 否则最后一批必丢;CommitFailedException 几乎总是 rebalance 导致的 看到这个日志说明这批可能被其他 consumer 再消费一次 业务必须幂等。
二 Rebalance 友好的消费模型
rebalance 是 Kafka 消费组最大的稳定性杀手 触发条件多 影响大 必须从消费模型层面减少 rebalance 频率与影响。
// 1 rebalance 触发条件(全要避免)
/*
- 消费者数量变化(扩缩容 / Pod OOM 重启)
- topic partition 数量变化(扩容 topic)
- 消费者心跳超时(GC 卡顿 / 网络抖动)
- 消费者 poll 间隔超过 max.poll.interval.ms
*/
// 2 减小 batch + 提高 poll 频率
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 一次少拿 别贪
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // 处理慢就调大 10 分钟
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 5 * 1024 * 1024);
// 3 监听 rebalance 事件 优雅交接
consumer.subscribe(Collections.singletonList("payment-events"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// rebalance 即将发生 commit 当前进度
log.info("partitions revoked: {}", partitions);
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 分配到新 partition seek 到上次提交位置
log.info("partitions assigned: {}", partitions);
for (TopicPartition tp : partitions) {
OffsetAndMetadata committed = consumer.committed(Set.of(tp)).get(tp);
if (committed != null) {
consumer.seek(tp, committed.offset());
}
}
}
});
// 4 Static membership 避免 rolling restart 触发 rebalance
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-pod-1");
// 同名 instance.id 重启 group 认为还是它 不触发 rebalance
// 配合 K8s StatefulSet 用
// 5 Cooperative rebalance(Kafka 2.4+)避免 stop-the-world
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
// 增量 rebalance 不影响其他 partition 消费
// 6 心跳与会话超时(谨慎调)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000); // 45 秒
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000); // 15 秒(session 的 1/3)
// session 太短 GC 卡顿就踢
// 太长 真挂了发现慢
// 7 消费线程模型(单线程消费 + 业务线程池处理)
ExecutorService workers = Executors.newFixedThreadPool(20);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap<>();
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 按 partition 串行(保证同 partition 顺序)
Map<TopicPartition, List<ConsumerRecord>> byPartition = new HashMap<>();
for (ConsumerRecord r : records) {
byPartition.computeIfAbsent(
new TopicPartition(r.topic(), r.partition()),
k -> new ArrayList<>()
).add(r);
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (Map.Entry<TopicPartition, List<ConsumerRecord>> entry : byPartition.entrySet()) {
TopicPartition tp = entry.getKey();
List<ConsumerRecord> partRecords = entry.getValue();
futures.add(CompletableFuture.runAsync(() -> {
for (ConsumerRecord r : partRecords) {
processRecord(r);
currentOffsets.put(tp, new OffsetAndMetadata(r.offset() + 1));
}
}, workers));
}
// 等所有 partition 处理完再 commit
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
实战经验:max.poll.records 50 是大多数业务的甜点 别贪 100 以上;static membership + cooperative rebalance 是 K8s 部署必上;ConsumerRebalanceListener 必须实现 否则 rebalance 期间 offset 提交错乱;同 partition 内消息必须串行处理 否则破坏顺序保证;poll 循环里不要做耗时 IO 全放业务线程池 否则 max.poll.interval.ms 超时。我们消费模型重构后 rebalance 从每天 20 次降到每月 2 次 P99 端到端延迟从 8 秒降到 1.5 秒。
三 消费幂等:业务侧最后一道防线
Kafka 至少一次语义意味着重复消费是常态 业务侧必须做幂等否则重复落库灾难。
// 1 基于唯一键的幂等(最常用)
@Service
public class PaymentReconcileService {
@Autowired private JdbcTemplate jdbc;
@Transactional
public void processPayment(PaymentEvent event) {
// 用消息的业务唯一键(payment_id)做去重
String paymentId = event.getPaymentId();
// INSERT ... ON DUPLICATE KEY 是 MySQL 幂等利器
int affected = jdbc.update(
"INSERT INTO payment_reconcile (payment_id, amount, status, created_at) " +
"VALUES (?, ?, ?, NOW()) " +
"ON DUPLICATE KEY UPDATE status = VALUES(status), updated_at = NOW()",
paymentId, event.getAmount(), event.getStatus()
);
if (affected == 1) {
log.info("new payment recorded: {}", paymentId);
} else if (affected == 2) {
log.info("payment updated (duplicate): {}", paymentId);
}
}
}
// 2 基于 Redis SET NX 的快速去重
public boolean tryProcess(String messageId) {
Boolean acquired = redis.opsForValue().setIfAbsent(
"msg:" + messageId,
"1",
Duration.ofHours(24)
);
return Boolean.TRUE.equals(acquired);
}
// 在消费侧
for (ConsumerRecord r : records) {
String msgId = extractMessageId(r);
if (!tryProcess(msgId)) {
log.info("duplicate message skipped: {}", msgId);
continue;
}
processRecord(r);
}
// 注意 Redis 不是强一致 在 master-slave 切换时仍可能重复
// 必须 DB 层兜底
// 3 状态机校验(强校验)
public void updateOrderStatus(String orderId, String newStatus) {
// 只有合法状态转移才更新 重复消息会被状态机拒绝
int affected = jdbc.update(
"UPDATE orders SET status = ?, updated_at = NOW() " +
"WHERE id = ? AND status = ?",
newStatus,
orderId,
getValidPreviousStatus(newStatus)
);
if (affected == 0) {
log.info("status transition rejected (already processed): {} -> {}", orderId, newStatus);
}
}
// 4 版本号/时间戳防过期(乱序保护)
public void updateBalance(String accountId, BigDecimal balance, long eventTimestamp) {
// 只接受比当前更新的事件 防止旧消息覆盖新数据
jdbc.update(
"UPDATE accounts SET balance = ?, last_event_ts = ? " +
"WHERE id = ? AND last_event_ts < ?",
balance, eventTimestamp, accountId, eventTimestamp
);
}
// 5 业务层幂等表(独立追踪)
@Transactional
public void process(String messageId, Runnable bizLogic) {
try {
// 先插幂等记录 唯一约束保证只处理一次
jdbc.update(
"INSERT INTO message_idempotent (message_id, processed_at) VALUES (?, NOW())",
messageId
);
bizLogic.run();
} catch (DuplicateKeyException e) {
log.info("idempotent skip: {}", messageId);
}
}
// 6 跨服务幂等 token + 业务唯一键双重保险
// 生产者侧
record.headers().add("idempotent-token", UUID.randomUUID().toString().getBytes());
record.headers().add("business-key", payment.getId().getBytes());
// 消费者侧 双重去重
String token = new String(record.headers().lastHeader("idempotent-token").value());
String bizKey = new String(record.headers().lastHeader("business-key").value());
if (redis.hasKey("token:" + token) || dbHasProcessed(bizKey)) {
return;
}
实战经验:幂等必须从业务层做 不能依赖 Kafka 自身;唯一键约束 + INSERT ON DUPLICATE 是最简单可靠的;Redis 去重快但不强一致 必须 DB 层兜底;状态机校验是最强的 但需要业务设计配合;版本号/时间戳防止旧消息覆盖新数据 在乱序消费场景必备;幂等表方案最规范 但每条多一次 DB 写入 高吞吐场景酌情。我们对账系统加幂等后 即使 rebalance 重复消费 也不再出现重复入库。
四 Exactly-once:Kafka Transaction 与 Outbox Pattern
跨 topic 或跨系统的原子性必须用 Kafka transaction(同 Kafka 内)或 outbox pattern(跨 Kafka 与 DB)实现 exactly-once。
// 1 Kafka Transaction(同 Kafka 内 exactly-once)
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-tx-1");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();
// 消费 + 处理 + 发送 全部在 transaction 内
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
producer.beginTransaction();
try {
for (ConsumerRecord r : records) {
// 业务处理
ProcessedEvent processed = transform(r);
// 发送到下游 topic
producer.send(new ProducerRecord<>("reconcile-result",
processed.getKey(),
processed.toJson()));
}
// 关键 把 consumer offset 也提交到 transaction
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
long lastOffset = records.records(tp).get(records.records(tp).size() - 1).offset();
offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
// 不 commit offset 下次重新消费
}
}
// 2 Outbox Pattern(跨 DB + Kafka exactly-once)
// 步骤 1 业务写 DB 同事务写 outbox 表
@Transactional
public void createOrder(Order order) {
orderRepo.save(order);
outboxRepo.save(new OutboxEvent(
UUID.randomUUID(),
"OrderCreated",
toJson(order),
Instant.now()
));
// DB transaction commit 保证 order 与 outbox 原子
}
// 步骤 2 后台轮询 outbox 发到 Kafka
@Scheduled(fixedDelay = 1000)
public void publishOutbox() {
List<OutboxEvent> events = outboxRepo.findUnpublishedTop100();
for (OutboxEvent event : events) {
try {
producer.send(new ProducerRecord<>("orders",
event.getId().toString(),
event.getPayload())).get();
outboxRepo.markPublished(event.getId());
} catch (Exception e) {
log.error("publish failed for event {}", event.getId(), e);
}
}
}
// 3 Debezium CDC 自动 outbox(推荐)
// 用 Debezium 监听 outbox 表 binlog 自动推 Kafka
// 配置 io.debezium.transforms.outbox.EventRouter
// outbox 表只需 (id, aggregate_id, type, payload, created_at)
// Debezium 自动按 aggregate_id 路由到对应 topic
// 4 消费侧配合 read_committed 隔离
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// 只读已 commit 的 transaction 消息
// 否则可能读到 abort 的"中间态"消息
// 5 性能影响
/*
Transaction 开销
- 每个 transaction 至少 2 次 broker 通信(begin + commit)
- transaction.timeout.ms 默认 1 分钟 太长资源浪费
- 推荐 batch 100 条 / commit 一次 而不是 1 条 1 commit
*/
// 6 监控未完成 transaction
// kafka-transactions.sh --bootstrap-server kafka:9092 --list
// 长时间 ONGOING 的 transaction 说明有 producer 挂了
// transaction.timeout.ms 到了会自动 abort
实战经验:Kafka transaction 是 Kafka -> Kafka 的 exactly-once 标准方案 不适合跨系统;Outbox pattern 是跨 DB + Kafka 的最佳实践 + Debezium CDC 几乎零运维;消费侧必须 read_committed 否则 transaction 形同虚设;transaction 不是免费午餐 吞吐降 30-50% 评估业务是否真需要 exactly-once;at-least-once + 业务幂等是 90% 场景的更优解 不要为了 exactly-once 一刀切。我们对账系统用 outbox + Debezium 上线后 数据一致性问题彻底消失。
[mermaid]
flowchart TD
A[订单服务] --> B[写 orders 表]
A --> C[同事务写 outbox 表]
B --> D{DB Transaction}
C --> D
D -->|commit 成功| E[Debezium 监听 binlog]
E --> F[发到 Kafka orders topic]
F --> G[消费者 read_committed]
G --> H{幂等检查}
H -->|已处理| I[skip]
H -->|未处理| J[业务处理]
J --> K[INSERT ON DUPLICATE]
K --> L[commit offset]
J -->|失败| M[死信队列]
M --> N[人工或定时重放]
五 监控、死信与流量回放
消费链路监控必须分 partition 看 lag 否则 key 倾斜被掩盖。失败消息要进死信队列 而不是阻塞整个消费组。
# 1 Prometheus 指标采集(Kafka Exporter)
# docker-compose.yml
services:
kafka-exporter:
image: danielqsj/kafka-exporter:latest
command:
- --kafka.server=kafka:9092
- --group.filter=^payment-.*
- --topic.filter=^payment-.*
ports:
- 9308:9308
# 关键指标
# kafka_consumergroup_lag 消费组在某 partition 的 lag
# kafka_consumergroup_lag_sum 整个消费组总 lag
# kafka_topic_partitions topic partition 数
# kafka_consumergroup_current_offset 当前消费 offset
# kafka_topic_partition_current_offset topic 最新 offset
# 2 Grafana Dashboard 告警规则
groups:
- name: kafka-consumer-lag
rules:
- alert: ConsumerLagHighPerPartition
# 关键 按 partition 看 不只看 sum
expr: kafka_consumergroup_lag{consumergroup="payment-reconcile"} > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Partition {{$labels.partition}} of group {{$labels.consumergroup}} lag > 10k"
description: "可能 key 倾斜或消费速度不够"
- alert: ConsumerLagGrowing
expr: rate(kafka_consumergroup_lag{consumergroup="payment-reconcile"}[5m]) > 0
for: 10m
labels:
severity: critical
annotations:
summary: "Consumer lag is growing - cannot catch up"
- alert: ConsumerStuck
expr: rate(kafka_consumergroup_current_offset{consumergroup="payment-reconcile"}[10m]) == 0
for: 5m
labels:
severity: critical
annotations:
summary: "Consumer is not advancing - likely stuck or rebalancing"
// 3 死信队列 失败消息隔离
public class DeadLetterHandler {
private final KafkaProducer<String, String> producer;
private final int MAX_RETRY = 3;
public void handleFailedRecord(ConsumerRecord<String, String> record, Exception error) {
int retryCount = getRetryCount(record);
if (retryCount < MAX_RETRY) {
// 重试 topic 延迟一段时间再投递
ProducerRecord retry = new ProducerRecord<>(
"payment-events-retry-" + retryCount,
record.key(),
record.value()
);
retry.headers().add("retry-count", String.valueOf(retryCount + 1).getBytes());
retry.headers().add("original-topic", record.topic().getBytes());
retry.headers().add("error-message", error.getMessage().getBytes());
producer.send(retry);
} else {
// 进死信 人工 review
ProducerRecord dlq = new ProducerRecord<>(
"payment-events-dlq",
record.key(),
record.value()
);
dlq.headers().add("final-error", error.toString().getBytes());
dlq.headers().add("failed-at", Instant.now().toString().getBytes());
producer.send(dlq);
log.error("message moved to DLQ: key={} offset={}", record.key(), record.offset());
}
}
private int getRetryCount(ConsumerRecord r) {
Header h = r.headers().lastHeader("retry-count");
return h == null ? 0 : Integer.parseInt(new String(h.value()));
}
}
// 4 延迟重试 topic 链
// payment-events-retry-1 立即重试
// payment-events-retry-2 消费者延迟 1 分钟后处理
// payment-events-retry-3 延迟 10 分钟
// payment-events-dlq 不再重试 进入人工审查
public class DelayedRetryConsumer {
public void consume(ConsumerRecord r) {
String topic = r.topic();
long delay = parseDelay(topic); // 按 topic 名解析延迟
long age = System.currentTimeMillis() - r.timestamp();
if (age < delay) {
// 还没到延迟时间 sleep 一下
try { Thread.sleep(delay - age); } catch (InterruptedException e) {}
}
try {
processRecord(r);
} catch (Exception e) {
deadLetterHandler.handleFailedRecord(r, e);
}
}
}
// 5 死信回放工具
public class DLQReplayer {
public void replayDLQ(String dlqTopic, String originalTopic, Predicate<ConsumerRecord> filter) {
KafkaConsumer<String, String> dlqConsumer = createConsumer("dlq-replay");
KafkaProducer<String, String> producer = createProducer();
dlqConsumer.subscribe(List.of(dlqTopic));
while (true) {
ConsumerRecords<String, String> records = dlqConsumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) break;
for (ConsumerRecord r : records) {
if (filter.test(r)) {
producer.send(new ProducerRecord<>(originalTopic, r.key().toString(), r.value().toString()));
}
}
dlqConsumer.commitSync();
}
}
}
// 6 端到端追踪(traceId 贯穿)
record.headers().add("trace-id", MDC.get("traceId").getBytes());
// 消费侧
String traceId = new String(record.headers().lastHeader("trace-id").value());
MDC.put("traceId", traceId);
try {
processRecord(record);
} finally {
MDC.clear();
}
// 配合 Skywalking / Jaeger 可以从生产者到消费者全链路追踪
实战经验:lag 监控必须 by partition 否则 key 倾斜永远发现不了;告警必须包含 lag 增长率 而不只是绝对值 因为正常消费时绝对值也会暂时跳;死信队列必须有 retry-count 与 original-topic header 否则回放找不到回家的路;延迟重试 topic 链是处理瞬时故障(下游 DB 抖动)的最佳实践;DLQ 回放工具必备 没有人工干预手段就只能眼睁睁看数据卡在 DLQ。我们对账系统加上死信链后 临时故障自动恢复率 95% 真正进 DLQ 的不到 0.01%。
六 性能调优与背压
消费吞吐与背压控制需要从 fetch / process / commit 三个阶段同时优化 单点优化容易顾此失彼。
// 1 Fetch 参数(取多少)
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 至少 1KB 才返回
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最多等 500ms
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 单次 fetch 50MB 上限
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 单 partition 1MB
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536); // socket buffer 64KB
// 2 处理并发(按 partition 并发)
public class ParallelConsumer {
private final ExecutorService pool;
private final Map<TopicPartition, CompletableFuture<Void>> inflight = new ConcurrentHashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new ConcurrentHashMap<>();
public ParallelConsumer(int parallelism) {
this.pool = new ThreadPoolExecutor(
parallelism, parallelism,
0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(parallelism * 2),
new ThreadPoolExecutor.CallerRunsPolicy() // 背压关键
);
}
public void run() {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 按 partition 提交 保证顺序
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords = records.records(tp);
inflight.compute(tp, (k, prev) -> {
CompletableFuture<Void> chain = prev == null ? CompletableFuture.completedFuture(null) : prev;
return chain.thenRunAsync(() -> {
for (ConsumerRecord r : partRecords) {
processRecord(r);
pendingOffsets.put(tp, new OffsetAndMetadata(r.offset() + 1));
}
}, pool);
});
}
// 异步提交已完成的 offset
commitCompleted();
}
}
private void commitCompleted() {
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
inflight.entrySet().removeIf(e -> {
if (e.getValue().isDone()) {
OffsetAndMetadata om = pendingOffsets.remove(e.getKey());
if (om != null) toCommit.put(e.getKey(), om);
return true;
}
return false;
});
if (!toCommit.isEmpty()) {
consumer.commitAsync(toCommit, null);
}
}
}
// 3 Pause/Resume 背压(下游慢时暂停拉取)
public class BackpressureConsumer {
public void run() {
while (running.get()) {
// 检查下游是否健康
if (downstreamLagTooHigh()) {
Set<TopicPartition> assigned = consumer.assignment();
consumer.pause(assigned);
log.warn("downstream slow, pausing consumer");
} else if (!consumer.paused().isEmpty()) {
consumer.resume(consumer.paused());
log.info("downstream recovered, resuming");
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 注意 paused partition 不会返回数据 但 poll 必须继续调否则触发 max.poll.interval.ms
processRecords(records);
}
}
private boolean downstreamLagTooHigh() {
return dbConnectionPool.getActiveCount() > dbConnectionPool.getMaxPoolSize() * 0.9;
}
}
// 4 批量处理优化(减少 DB IO)
public void processBatch(List<ConsumerRecord> records) {
// 单条 INSERT 改成 batch INSERT
List<Object[]> batchArgs = records.stream()
.map(r -> new Object[]{r.key(), r.value(), Instant.now()})
.collect(Collectors.toList());
jdbc.batchUpdate(
"INSERT INTO events (id, payload, processed_at) VALUES (?, ?, ?) " +
"ON DUPLICATE KEY UPDATE processed_at = VALUES(processed_at)",
batchArgs
);
// 单批 100 条 比单条 INSERT 快 10-30 倍
}
// 5 消费者数量与 partition 数关系
/*
- 消费者数 > partition 数:多余消费者闲置
- 消费者数 = partition 数:1:1 最优
- 消费者数 < partition 数:每个消费者多 partition 资源受限
- 经验:partition 数 = 预期峰值消费者数 * 1.5
- 如 8 个消费者实例,topic 设 12 个 partition
*/
// 6 监控消费速率
@Scheduled(fixedDelay = 60000)
public void reportConsumerMetrics() {
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
double recordsConsumedRate = (double) metrics.get(
new MetricName("records-consumed-rate", "consumer-fetch-manager-metrics", "", new HashMap<>())
).metricValue();
double bytesConsumedRate = (double) metrics.get(
new MetricName("bytes-consumed-rate", "consumer-fetch-manager-metrics", "", new HashMap<>())
).metricValue();
log.info("consume rate: {} records/s, {} bytes/s", recordsConsumedRate, bytesConsumedRate);
}
实战经验:fetch 参数 fetch.min.bytes + fetch.max.wait.ms 是 throughput vs latency 权衡 实时场景 min=1 wait=10 批处理场景 min=1MB wait=500;按 partition 并发 + 顺序保证是消费并发的标准范式 不要全局乱并发破坏顺序;pause/resume 背压是下游能力变化时的弹性手段 必须配合下游健康检测;batch DB 写入是吞吐倍数提升的关键 单条 INSERT 是性能杀手;partition 数与消费者数比例 1.5x 留扩容空间。我们把消费吞吐从 5k QPS 优化到 50k QPS 没换硬件 只是把这些参数和模型调对。
关键概念速查
| 问题 | 关键参数/工具 | 推荐 | 备注 |
|---|---|---|---|
| Offset 提交 | enable.auto.commit=false | 必关 | auto commit 等于重复 |
| Rebalance 减少 | static membership + cooperative | 必上 | K8s 部署标配 |
| 消费幂等 | INSERT ON DUPLICATE KEY | 必做 | 业务侧最后防线 |
| Exactly-once | Kafka transaction 或 Outbox | 按场景 | 跨系统用 Outbox |
| Poll 配置 | max.poll.records=50 interval=5min | 调优 | 处理时长决定 |
| 分区监控 | by partition lag 告警 | 必上 | 避免 key 倾斜掩盖 |
| 死信队列 | retry-1 retry-2 retry-3 dlq | 必上 | 失败不阻塞 |
| 背压控制 | pause/resume + 下游健康 | 推荐 | 下游慢时主动减速 |
| 批量写入 | jdbc.batchUpdate | 必做 | 10-30 倍提升 |
| 分区数 | 1.5x 消费者实例数 | 容量规划 | 留扩容空间 |
避坑清单
- 不要 enable.auto.commit=true 业务关键消息必须手动 commit。
- 不要不实现 ConsumerRebalanceListener rebalance 时 offset 提交错乱。
- 不要业务不幂等 Kafka 至少一次就是会重复。
- 不要 commit offset 不 +1 commit 的是"下一条要读"的位置。
- 不要 max.poll.records 设太大 处理时长超过 max.poll.interval.ms 必被踢。
- 不要单线程串行消费 慢 IO 拖死 partition 用按 partition 并发。
- 不要监控只看总 lag 必须 by partition 否则 key 倾斜永远发现不了。
- 不要失败消息阻塞消费 必须死信队列隔离。
- 不要为了 exactly-once 一刀切 at-least-once + 业务幂等是 90% 场景的更优解。
- 不要消费侧不设 read_committed Kafka transaction 形同虚设。
总结
把 Kafka 消费者组从我们踩过的所有坑里反过来看 你会发现真正能稳定服务的消费链路不是 default 配置加个 try-catch 就够 而是一个 offset 管理 + rebalance 友好 + 消费幂等 + exactly-once + 背压 + 监控告警 + 死信回放的完整工程体系。同一个 topic 同一份代码 配置错了一天 100 万消息重复入库 配置对了能稳定跑 10 亿条数据零差错。Kafka 不是"send / receive"那么简单 它是分布式系统中最容易踩坑也最值得投入工程化的中间件。
另一个常见的认知误区是把 Kafka 的"至少一次"理解成"经常会重复" 把"exactly-once"当成"配置一下就行" 实际上"至少一次"在正常情况下不会重复 只有 rebalance 与 commit 失败时才重复 而"exactly-once"是有性能代价的 不是免费午餐。真正生产系统的语义模型 大多数是 at-least-once + 业务幂等 这个组合最简单可靠 性能也最好 只有金融对账 / 库存扣减 / 资金清算这种场景才需要真正的 exactly-once。
打个比方 Kafka 消费链路像一个大型物流分拣中心。Topic 是传送带(分多条 parallel 提速)Partition 是分拣槽(按目的地编号路由)Consumer Group 是一组分拣员(各自负责几个槽)Offset 是每个槽已分拣到第几个包裹的记号(必须准确否则漏分或重分)Rebalance 是分拣员换班(老员工把没分完的进度交接给新员工 不能有遗漏也不能重复)幂等 是业务台账(同一个订单号录两遍只生效一次)死信 是异常件暂存区(损坏的包裹放一边 不阻塞正常流水线)Exactly-once 是高价值件的双签流程(发件方和收件方都要确认才算完成)。哪一环没做 这个分拣中心可能能跑通 但要么经常丢件 要么重复派送 要么遇到异常包裹整条流水线停摆。
所以下一次再有人跟你说"用 Spring Kafka 默认配置就行" 你可以反问他 auto.commit 关了吗 ConsumerRebalanceListener 实现了吗 业务侧幂等做了吗 死信队列有吗 lag 分 partition 看吗 max.poll.records 与处理时长配比验证过吗 outbox pattern 评估过吗。这些工作没做完 你的 Kafka 消费只是一个能跑通 demo 的玩具 不是一个能在生产 7x24 稳定服务的消息系统。从踩坑到投产 中间隔着一整套消息消费工程方法论 这条路没有捷径 但走完之后 你的 Kafka 链路会从"经常出对账问题"变成"老板说 Kafka 真稳" 从每月几次 P1 故障变成连续半年零故障 从每天人工对账变成业务无感全自动。
—— 别看了 · 2026