Kafka 消费幂等 + Offset 管理:2 年踩过的 7 个真实坑

Kafka 文档说 At-least-once,实际生产里 重复消费 是日常。本文 7 个真实坑:auto.offset.commit / 消费跟不上 rebalance / 幂等漏做扣款两次 / 死信队列 / 消息顺序 / 大消息 / 监控指标。每个附 Java + Go 代码 + 核心配置。

Kafka 文档反复强调"At least once" —— 至少一次,可能重复。但很多团队上线 Kafka 后才发现:这"可能重复"是日常,不是偶发。下游业务如果不做幂等,重复处理就是事故。本文把我们 2 年里 Kafka 消费踩的 7 个坑写出来,每个都附最小复现 + 正确写法。

坑 1:auto.offset.commit 自动提交

新人最爱用默认配置,因为"就工作了"。但 enable.auto.commit=true 是 Kafka 数据丢失的头号原因。

// 错:默认自动提交
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "my-consumer");
props.put("enable.auto.commit", "true");      // ← 默认就是这个
props.put("auto.commit.interval.ms", "5000"); // 5 秒自动提交一次

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> r : records) {
        process(r);   // 处理耗时 30 秒
        // 处理过程中 5 秒自动提交了 offset,但记录还没处理完
        // 如果此时进程挂了,这些"已提交但未处理"的记录永久丢失
    }
}

修法:关掉自动提交,处理完再手动提交:

props.put("enable.auto.commit", "false");

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    if (records.isEmpty()) continue;

    for (ConsumerRecord<String, String> r : records) {
        try {
            process(r);
        } catch (Exception e) {
            // 错误处理:重试、入死信队列、或者跳过
            handleError(r, e);
        }
    }
    // 一批处理完后同步提交
    consumer.commitSync();
}

坑 2:消费速度跟不上,触发 rebalance

消费者每次 poll 都要在 max.poll.interval.ms(默认 5 分钟)内再次 poll,否则 broker 认为它挂了,触发 rebalance。如果你单条记录处理 30 秒,一次 poll 500 条,处理完要 4 小时 —— 早就 rebalance 几十次了。

// 错:max.poll.records 默认 500,但每条处理慢
props.put("max.poll.records", "500");
props.put("max.poll.interval.ms", "300000");  // 5 分钟,默认

// 对:压低 max.poll.records,缩短一批的处理时间
props.put("max.poll.records", "10");           // 一次只拿 10 条
props.put("max.poll.interval.ms", "600000");   // 给 10 分钟兜底

// 或者:多线程消费(消费线程只 poll,业务线程处理)
ExecutorService workerPool = Executors.newFixedThreadPool(20);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    List<Future<?>> futures = new ArrayList<>();
    for (ConsumerRecord<String, String> r : records) {
        futures.add(workerPool.submit(() -> process(r)));
    }
    // 等所有任务完成再提交 offset
    for (Future<?> f : futures) f.get();
    consumer.commitSync();
}

多线程消费要注意:必须等所有 worker 完成再 commit,否则会丢消息。

坑 3:消费幂等没做,重复扣款

Kafka 保证 at-least-once,意味着同一条消息可能被消费 2 次甚至更多。常见场景:消费者处理完业务但提交 offset 前挂掉,重启后 broker 重发这条消息。

幂等的正确做法:

@Transactional
public void process(ConsumerRecord<String, String> r) {
    OrderEvent event = parse(r.value());

    // 1. 幂等检查:Redis SET NX
    String key = "kafka:processed:" + event.getEventId();
    Boolean isNew = redis.opsForValue().setIfAbsent(key, "1", Duration.ofDays(7));
    if (Boolean.FALSE.equals(isNew)) {
        log.info("event already processed, skip: {}", event.getEventId());
        return;
    }

    // 2. 业务处理(数据库)
    try {
        orderService.handle(event);
    } catch (Exception e) {
        // 业务失败,撤销 Redis 标记,让下次重试
        redis.delete(key);
        throw e;
    }
}

更稳的做法用数据库唯一索引:

CREATE TABLE event_processed (
    event_id VARCHAR(64) PRIMARY KEY,
    processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;

-- 业务事务内同时 INSERT 这张表
-- 重复消费时 INSERT 会触发 Duplicate key,直接回滚整个事务
@Transactional(rollbackFor = Exception.class)
public void process(ConsumerRecord<String, String> r) {
    OrderEvent event = parse(r.value());

    // 利用 DB 唯一索引做幂等
    try {
        jdbc.update("INSERT INTO event_processed(event_id) VALUES(?)", event.getEventId());
    } catch (DuplicateKeyException e) {
        log.info("event already processed, skip");
        return;
    }

    // 业务在同一个事务里
    orderService.handle(event);
}

坑 4:消费失败的处理

消息处理失败时,常见 4 种处理策略:

  1. 跳过:记录失败 offset,继续消费下一条(数据丢失风险)
  2. 重试:本地重试 N 次,失败后跳过 → 简单但可能阻塞分区
  3. 死信队列:重试 N 次失败后转发到 DLQ topic,后续人工处理
  4. 停止消费:遇到失败就停,等人介入(数据强一致场景)
// 死信队列模式
KafkaProducer<String, String> dlqProducer = createProducer("dlq");

void processWithDLQ(ConsumerRecord<String, String> r) {
    for (int attempt = 0; attempt < 3; attempt++) {
        try {
            process(r);
            return;
        } catch (TransientException e) {
            log.warn("retry {}/3 for {}: {}", attempt + 1, r.offset(), e.getMessage());
            try { Thread.sleep(1000L << attempt); } catch (Exception ignored) {}
        } catch (Exception e) {
            // 业务异常直接转 DLQ
            break;
        }
    }
    // 3 次失败 → DLQ
    dlqProducer.send(new ProducerRecord<>(
        "orders.dlq",
        r.key(),
        buildDlqValue(r, "max retries exceeded")
    ));
}

坑 5:消费顺序问题

Kafka 单 partition 内有序,跨 partition 无序。如果业务依赖顺序(同一个 user_id 的事件按时间处理),必须保证同 key 落到同 partition:

// Producer 端:按业务 key hash 分区(Kafka 默认行为)
producer.send(new ProducerRecord<>(
    "orders",
    String.valueOf(event.getUserId()),    // ← key 是 user_id
    event.toJson()
));
// 同一 user_id 的消息永远到同一 partition,消费有序

// Consumer 端:别多线程乱序处理同一 key
// 错:用通用线程池,同 user_id 可能被不同线程并发处理
workerPool.submit(() -> process(record));

// 对:按 key 分配到固定线程
int slot = Math.abs(record.key().hashCode()) % WORKER_COUNT;
workerPools.get(slot).submit(() -> process(record));

坑 6:rebalance 期间的处理

rebalance 时,consumer 暂停消费,分区在组员之间重新分配。如果你的代码在 rebalance 时正在处理消息但没提交 offset,新拿到这个分区的消费者会重新拉这条消息。

// 用 ConsumerRebalanceListener 在 rebalance 前提交 offset
consumer.subscribe(Collections.singletonList("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 这个分区马上要被分给其他消费者,提交我已处理的 offset
        log.info("partitions revoked: {}", partitions);
        consumer.commitSync();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.info("partitions assigned: {}", partitions);
        // 可以做初始化工作
    }
});

Kafka 2.4+ 引入了 Cooperative Sticky Assignment 协议,可以让 rebalance 更平滑(不停顿所有消费者,只迁移必要的分区):

partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

坑 7:消息体太大,producer 报错

Kafka 默认单条消息限制 1MB(message.max.bytes)。业务直接把大对象塞进去:

// 错:把订单详情整个序列化
producer.send(new ProducerRecord<>("orders", orderId, BIG_ORDER_JSON_5MB));
// RecordTooLargeException: The message is 5242880 bytes which is larger than 1048576

// 对:Kafka 只发"事件",大对象存对象存储,事件里只放 URL
ObjectStorage.upload("orders/" + orderId, orderJson);
producer.send(new ProducerRecord<>("orders", orderId,
    String.format("{\"order_id\":\"%s\",\"s3_url\":\"s3://bucket/orders/%s\"}", orderId, orderId)
));

// 或者拆分大消息为多条,加 chunk_id / total_chunks 字段
// 消费端按 order_id 聚合

Go 版的消费者实现

package main

import (
    "context"
    "log"
    "github.com/segmentio/kafka-go"
)

func main() {
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        []string{"kafka:9092"},
        GroupID:        "my-consumer",
        Topic:          "orders",
        MinBytes:       10e3,                  // 10KB,批量 fetch 提效
        MaxBytes:       10e6,                  // 10MB
        CommitInterval: 0,                     // 0 = 手动提交
    })
    defer r.Close()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    for {
        m, err := r.FetchMessage(ctx)
        if err != nil {
            log.Printf("fetch error: %v", err)
            continue
        }

        if err := processMessage(m); err != nil {
            log.Printf("process error: %v, msg=%s", err, string(m.Value))
            // 错误处理:重试 / 死信 / 跳过
            continue
        }

        // 处理成功才提交 offset
        if err := r.CommitMessages(ctx, m); err != nil {
            log.Printf("commit failed: %v", err)
        }
    }
}

func processMessage(m kafka.Message) error {
    // 幂等:用 Redis 或 DB 唯一索引
    // 业务处理
    return nil
}

监控指标

# Prometheus 必备指标
- alert: ConsumerLagHigh
  expr: kafka_consumergroup_lag > 10000
  for: 5m
  annotations:
    summary: '{{ $labels.consumergroup }} 滞后 {{ $value }} 条'

- alert: ConsumerRebalanceFrequent
  expr: rate(kafka_consumer_rebalance_total[5m]) > 0.1
  for: 5m
  annotations:
    summary: 'consumer 频繁 rebalance,可能是 max.poll.interval 设错'

- alert: DLQGrowing
  expr: kafka_topic_partition_oldest_offset{topic="orders.dlq"} > 100
  annotations:
    summary: '死信队列堆积,需要人工处理'

核心配置清单

# Consumer 配置
enable.auto.commit=false                  # 手动提交
max.poll.records=10                       # 一批 10 条,处理时间可控
max.poll.interval.ms=600000               # 10 分钟兜底
session.timeout.ms=30000
heartbeat.interval.ms=3000
auto.offset.reset=earliest                # 新消费者从最早开始(还是 latest 看场景)
isolation.level=read_committed            # 事务消息只读 committed
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

# Producer 配置
acks=all                                  # 等所有 ISR 副本确认
retries=10
enable.idempotence=true                   # producer 幂等(避免网络重试导致重复)
max.in.flight.requests.per.connection=5
compression.type=zstd                     # zstd 压缩率高速度快
linger.ms=10                              # 攒 10ms 再发,提升批量效率
batch.size=32768

消费者代码 review 6 问

  1. 是不是关了 auto.commit?
  2. 幂等怎么做?Redis SET NX 还是 DB 唯一索引?
  3. 失败如何处理?重试 / DLQ / 跳过 哪一种?
  4. rebalance listener 提交 offset 了吗?
  5. 消息顺序要求是什么?用 key 分区?
  6. 消费延迟监控有没有?>1 万条要告警吗?

把这 6 问当 code review checklist,Kafka 消费侧 bug 能砍掉 90%。剩下 10% 是真正的疑难杂症,值得专门复盘。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

pandas 内存从 8GB 压到 800MB:60 万行 CSV 处理的 7 步优化

2026-5-19 10:45:55

技术教程

Spring @Transactional 失效的 7 种真实场景 + 修法

2026-5-19 10:50:18

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索