消息积压三百万:一次 Kafka 消费积压与重复消费的复盘

大促那天 Kafka consumer lag 从几百飙到三百万,履约系统延迟数小时;扩容后又冒出重复发货短信。一边积压一边重复,是消费端最经典的一对矛盾。几天消息消费专项治理:看懂 lag 与消费模型、关自动提交做消费端幂等、防消息丢失、批量与并发提吞吐、rebalance 治理、lag 监控。

2024 年我们一个订单消息的消费链路出过一次事故:大促那天,Kafka 的 consumer lag(消费滞后)从平时的几百条,一路涨到几百万条,下游依赖这条消息流的履约系统延迟了好几个小时。我们临时扩了消费者实例,lag 降下去了,可没过两天又冒出新问题——有用户收到了两条一模一样的发货短信,排查发现是消费端在处理消息时重启,offset 没来得及提交,重启后又把同一批消息消费了一遍。一边是消费太慢积压,一边是消费"过头"重复,这两个问题几乎是 Kafka 消费端最常见的一对。投了几天做消息消费专项治理,本文复盘这次实战。

问题背景

业务:订单消息 -> Kafka -> 履约系统消费,Kafka 2.8,单 topic 12 分区
事故现象:
- 大促期间 consumer lag 从几百涨到 300 万+,履约延迟数小时
- 扩容后 lag 降了,但出现"重复发货短信"——同一消息被消费两次
- 偶尔还有 rebalance 风暴,消费者频繁掉线重连

现场排查:
# 1. 看消费组的 lag
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --describe --group fulfill-group
TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
order      0          1203400         1452100         248700
order      1          1198200         1450300         252100
...        # 每个分区都积压 20 万+,总 lag 300 万+

# 2. 看消费端代码
@KafkaListener(topics = "order")
public void onMessage(ConsumerRecord r) {
    fulfillService.handle(parse(r.value()));   // 单条处理,约 200ms
}
# 单线程单条消费,200ms 一条 -> 单实例 5 条/秒,12 分区也就 60 条/秒
# 而大促时生产端 QPS 是 3000+ —— 消费速度根本追不上

# 3. 重复消费的现场:
# enable.auto.commit=true,auto.commit.interval.ms=5000
# 消费者每 5 秒自动提交一次 offset
# 处理到一半实例被 kill -> 这 5 秒内已处理的消息 offset 没提交
# 重启 / rebalance 后,这批消息被重新拉取、重新消费

根因:
1. 消费能力不足:单线程单条消费,吞吐远低于生产速度 -> 积压
2. offset 自动提交,与"消息真正处理完"不同步 -> 重复消费
3. 消费端业务不是幂等的 -> 重复消费直接变成重复发短信
4. 消费者处理太慢,超过 max.poll.interval.ms -> 被踢出组 -> rebalance

修复 1:看懂 consumer lag 与消费模型

=== consumer lag:消费滞后了多少条 ===
LAG = LOG-END-OFFSET - CURRENT-OFFSET
    = 分区里最新的消息位置 - 消费组已经消费到的位置
lag 持续增大 = 生产速度 > 消费速度,消息在堆积。

=== Kafka 消费的几个核心概念 ===
- 一个 topic 分成多个 partition(分区),消息在分区内有序
- 一个 consumer group(消费组)里的多个 consumer 分摊这些分区
- 关键规则:一个分区,同一时刻只能被组内【一个】consumer 消费
  -> 所以【消费组里 consumer 数量 <= 分区数】才有意义
  -> 12 个分区,最多 12 个 consumer 能并行,第 13 个只能空转

=== 提升消费速度的两个方向 ===
方向 1:加分区 + 加 consumer 实例(横向扩展并行度)
        前提是分区数要够,12 分区就最多 12 个实例并行
方向 2:单个 consumer 内部再做并发(把拉到的一批消息丢线程池)
        不增加分区也能提速,但要小心顺序和 offset 提交

=== 一个常见误区 ===
"我加了 20 个消费者实例,怎么没快多少?"
—— 因为只有 12 个分区,多出来的 8 个实例分不到分区,纯属空转。
扩消费者之前,先确认分区数够不够。

修复 2:重复消费 —— offset 提交与幂等

// === 问题根源:自动提交 offset,与"消息处理完成"不同步 ===
// enable.auto.commit=true 时,consumer 每隔几秒自动提交一次 offset,
// 它提交的是"已经 poll 出来的"位置,不管你处理完没有。
// 处理到一半崩溃 -> 已提交的部分丢失进度 / 未提交的部分重复消费。

// === 改 1:关闭自动提交,处理完再手动提交 ===
// 配置:enable.auto.commit=false
@KafkaListener(topics = "order")
public void onMessage(ConsumerRecord r,
                      Acknowledgment ack) {
    fulfillService.handle(parse(r.value()));   // 1. 先把消息处理完
    ack.acknowledge();                         // 2. 再手动提交 offset
    // 这样保证:offset 提交了 == 消息一定处理完了
}
// 注意:这是"至少一次(at-least-once)"语义 ——
// 处理完了、提交前崩溃,消息仍会重复。所以幂等是【必须】的。

// === 改 2:消费端做幂等,这是重复消费的终极解法 ===
// 不管 Kafka 怎么重投,业务只要保证"同一条消息处理多次 == 处理一次"。

// 方案 A:每条消息带唯一 msgId,处理前先查"处理过没"
@KafkaListener(topics = "order")
public void onMessage(ConsumerRecord r, Acknowledgment ack) {
    OrderMsg msg = parse(r.value());
    String msgId = msg.getMsgId();
    // 用 Redis SETNX 抢一个"处理标记",抢不到说明已处理过
    Boolean first = redis.setIfAbsent("msg:" + msgId, "1", 24, HOURS);
    if (Boolean.FALSE.equals(first)) {
        ack.acknowledge();                     // 已处理过,直接确认跳过
        return;
    }
    fulfillService.handle(msg);
    ack.acknowledge();
}

// 方案 B:利用数据库唯一约束做幂等(更可靠,不怕 Redis 丢)
// 履约表上对 msg_id 建唯一索引
public void handle(OrderMsg msg) {
    try {
        fulfillMapper.insert(msg.getMsgId(), ...);  // msg_id 唯一索引
    } catch (DuplicateKeyException e) {
        log.info("消息已处理过,幂等跳过: {}", msg.getMsgId());
        return;                                      // 重复,安全跳过
    }
    doFulfill(msg);
}
// 经验:不要试图让 Kafka "恰好一次",成本极高;
// 老老实实"至少一次 + 消费端幂等",简单且可靠。

修复 3:消息丢失 —— 自动提交的另一面

// === 自动提交不仅会重复,还可能丢消息 ===
// 场景:poll 出 100 条,自动提交先把 offset 提交了(提交到第 100 条),
// 然后线程池异步处理这 100 条,处理到第 30 条时实例崩溃 ——
// offset 已经停在 100,重启后从 101 开始,31~100 这 70 条【永久丢失】。

// === 防丢失:offset 必须在消息"确实处理完"之后才提交 ===
// 关闭自动提交,手动提交,且严格遵守"先处理、后提交"的顺序。

// === 生产端也要配合,否则消息根本没到 Kafka ===
// application.yml (producer)
// acks=all              所有 ISR 副本都写入才算成功(最高可靠)
// retries=3             发送失败自动重试
// 错误示范:acks=0 / acks=1,leader 还没同步给副本就宕机 -> 丢消息

// === 完整的不丢链路 ===
// 生产端:acks=all + retries  -> 保证消息到了 Kafka
// Kafka :replication-factor>=3 + min.insync.replicas=2 -> 副本不丢
// 消费端:手动提交 offset,处理完才提交 -> 保证不漏处理

// === 关键认知 ===
// "消息不丢" 和 "消息不重复" 往往不能同时低成本满足:
// - 想绝对不丢:offset 提交要晚(处理完再提交)-> 可能重复
// - 想绝对不重:offset 提交要早 -> 可能丢
// 工程上的标准选择:选"不丢 + 可能重复",再用幂等消化掉重复。
@KafkaListener(topics = "order")
public void onMessage(ConsumerRecord r, Acknowledgment ack) {
    try {
        fulfillService.handle(parse(r.value()));   // 处理
        ack.acknowledge();                         // 成功才提交 -> 不丢
    } catch (Exception e) {
        log.error("消费失败,不提交 offset,等待重试", e);
        // 不 ack,这条消息下次还会被拉到,配合幂等,安全
    }
}

修复 4:提升消费吞吐

// === 方案 A:批量消费,一次 poll 处理一批 ===
// 配置:max.poll.records=500(一次最多拉 500 条)
//       并开启 batch listener
@KafkaListener(topics = "order", batch = "true")
public void onBatch(List> records,
                    Acknowledgment ack) {
    // 批量处理:比如 500 条订单一次性批量写库,
    // 把 500 次单条 insert 合并成 1 次 batch insert,吞吐量级提升
    List msgs = records.stream().map(r -> parse(r.value())).toList();
    fulfillService.handleBatch(msgs);   // 批量写库 / 批量调用
    ack.acknowledge();                  // 整批处理完再提交
}

// === 方案 B:单 consumer 内部用线程池并发处理 ===
// 拉到一批后,丢给线程池并发跑,但要小心 offset 提交时机
private final ExecutorService pool = Executors.newFixedThreadPool(16);

@KafkaListener(topics = "order", batch = "true")
public void onBatchConcurrent(List> records,
                              Acknowledgment ack) {
    List> futures = records.stream()
        .map(r -> CompletableFuture.runAsync(
            () -> fulfillService.handle(parse(r.value())), pool))
        .toList();
    // 必须等这一整批都处理完,才能提交 offset
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    ack.acknowledge();
}
// ⚠ 并发处理打破了分区内的顺序性。
//   若业务要求"同一订单的消息按序处理",要按订单 id 取模分发到固定线程,
//   保证同一 key 的消息始终串行。

// === 方案 C:扩分区 + 扩实例(根本性扩容)===
// 12 分区不够,先把分区扩到 36,再把消费者实例扩到 36 个,
// 并行度直接翻 3 倍。
// $ kafka-topics.sh --alter --topic order --partitions 36
// ⚠ 分区只能加不能减;加分区会改变 key 的分区路由,
//   对"按 key 保证顺序"的场景要评估影响。

// === 方案 D:慢消息隔离,别让一条慢消息拖垮整批 ===
// 处理特别慢 / 反复失败的消息,转投到"重试 topic"或死信队列,
// 主流程不被它阻塞,保持高吞吐。

修复 5:rebalance 治理

# === rebalance:消费组成员变化时,重新分配分区的过程 ===
# rebalance 期间,整个消费组【暂停消费】,频繁 rebalance 严重拖慢消费。
# 常见诱因:消费者处理太慢,超时被判定"死了",踢出组触发 rebalance。

# === 关键参数 1:max.poll.interval.ms ===
# 两次 poll() 之间的最大间隔,默认 5 分钟。
# 如果一批消息处理时间超过它,Kafka 认为这个 consumer 卡死了,
# 把它踢出组 -> rebalance。
max.poll.interval.ms=600000        # 处理慢就调大,给足处理时间

# === 关键参数 2:max.poll.records ===
# 一次 poll 拉多少条。拉太多 + 处理慢 -> 容易超过上面的 interval。
max.poll.records=200               # 拉的量和处理速度要匹配

# === 关键参数 3:session.timeout.ms + heartbeat.interval.ms ===
# consumer 靠后台心跳告诉 Kafka "我还活着"。
session.timeout.ms=45000           # 多久没心跳判定掉线
heartbeat.interval.ms=15000        # 心跳间隔,一般为 session 的 1/3

# === 经验法则 ===
# 一次 poll 出来的消息,要能在 max.poll.interval.ms 内处理完。
# 估算:max.poll.records * 单条处理耗时 < max.poll.interval.ms
# 处理不完,要么调小 max.poll.records,要么调大 interval,要么提速处理。

# === 减少 rebalance 影响:用 CooperativeStickyAssignor ===
# 默认的 rebalance 是"全部分区收回再重分",期间全组停消费;
# 协作式粘性分配只挪动需要变动的分区,影响小很多
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

修复 6:消费监控告警

# 消息消费的核心是 lag,围绕它建监控
groups:
- name: kafka-consumer
  rules:
  # 1. consumer lag 过高(消费跟不上生产)
  - alert: ConsumerLagHigh
    expr: sum(kafka_consumergroup_lag{group="fulfill-group"}) > 100000
    for: 5m
    annotations:
      summary: "消费组 lag > 10 万,消费能力不足,考虑扩容/批量"

  # 2. lag 持续增长(积压在恶化,而非偶发抖动)
  - alert: ConsumerLagGrowing
    expr: deriv(sum(kafka_consumergroup_lag{group="fulfill-group"})[10m:]) > 0
    for: 15m
    annotations:
      summary: "消费 lag 持续 15 分钟上涨,生产已稳定超过消费速度"

  # 3. rebalance 频繁(消费组不稳定)
  - alert: FrequentRebalance
    expr: increase(kafka_consumer_rebalance_total[10m]) > 3
    annotations:
      summary: "10 分钟内 rebalance 超过 3 次,排查消费者处理超时"

  # 4. 消费失败率(消息处理异常)
  - alert: ConsumeErrorRate
    expr: rate(consume_error_total[5m]) / rate(consume_total[5m]) > 0.01
    for: 5m
    annotations:
      summary: "消息消费失败率 > 1%,排查下游依赖或脏消息"

优化效果

指标                      治理前              治理后
=============================================================
消费模型                  单线程单条          批量 + 线程池并发
单实例消费速度            约 5 条/秒          约 600 条/秒
分区数 / 消费实例         12 / 12             36 / 36
大促峰值 lag              300 万+             峰值 < 2 万,分钟级消化
offset 提交              自动提交(5s)       处理完手动提交
重复消费                 重复发货短信        幂等拦截,处理多次 = 一次
消息丢失风险             自动提交可能丢      acks=all + 手动提交,不丢
rebalance                偶发风暴            协作式分配,影响极小
消费可观测性             无                  lag/rebalance/失败率监控

治理过程:
- 定位积压与重复消费根因:0.5 天
- 关自动提交 + 消费端幂等改造:2 天
- 批量消费 + 线程池并发改造:2 天
- 扩分区扩实例 + rebalance 参数调优:1.5 天
- 消费监控接入:1 天

避坑清单

  1. consumer lag = 最新 offset - 已消费 offset,持续增大说明消费跟不上生产
  2. 一个分区同时只能被组内一个 consumer 消费,消费实例数超过分区数纯属空转
  3. 扩消费者前先确认分区数够,分区不够加再多实例也不提速
  4. enable.auto.commit 自动提交与"消息真正处理完"不同步,会重复也会丢
  5. 关闭自动提交,严格"先处理完、后手动提交 offset",保证不漏处理
  6. Kafka 是"至少一次"语义,消费端幂等是重复消费的终极解法,必须做
  7. 幂等用唯一索引或 Redis SETNX,保证同一消息处理多次等于处理一次
  8. 提升吞吐:批量消费合并写库、单 consumer 内线程池并发、扩分区扩实例
  9. 并发消费会打破分区内顺序,需要保序就按 key 取模分发到固定线程
  10. 处理慢超过 max.poll.interval.ms 会被踢出组触发 rebalance,参数要和处理速度匹配

总结

这次 Kafka 消费链路的治理,让我对消息队列有了一个更清醒的认识:消息队列从来不是"把消息扔进去就万事大吉"的黑盒,它的生产端、Broker、消费端三段,每一段都有自己要操心的可靠性问题,而我们这次集中爆发的,是消费端最经典的一对矛盾——积压和重复。先说积压。大促那天 lag 从几百飙到三百万,本质原因朴素得近乎尴尬:我们的消费速度,根本就追不上生产速度。消费端是单线程、一条一条处理,每条要两百毫秒,一个实例满打满算每秒也就消费五条,十二个分区十二个实例,峰值也才六十多条每秒,而生产端大促时每秒涌进来三千多条——这是一道小学算术题,积压是必然的。想明白这一点,提速的方向也就清晰了:要么把一次拉到的一批消息合并起来批量处理,把五百次单条写库压缩成一次批量写库;要么在单个消费者内部用线程池并发地跑;要么干脆从根上扩容,把分区数和消费者实例数一起翻几倍——但这里有个一定要记住的前提,一个分区同一时刻只能被消费组里的一个消费者消费,所以消费者实例数超过分区数,多出来的实例就是纯粹空转,扩消费者之前一定要先看分区数够不够。再说重复。重复消费的根,在于 offset 的提交时机和消息"真正处理完"这两件事没有对齐。我们开着自动提交,消费者每隔五秒不管三七二十一就把 offset 往前推,于是处理到一半崩溃重启,这五秒内的消息就被重新消费一遍,用户就收到了第二条发货短信。这件事让我彻底接受了一个工程上的现实:想让 Kafka 做到"恰好一次"投递,代价高得不切实际,正确的姿势是坦然接受"至少一次"——也就是消息可能重复——然后把保证正确性的责任,从消息中间件转移到消费端自己身上,用幂等去消化掉重复。具体做法就是关掉自动提交、严格遵守"先把消息处理完、再手动提交 offset"的顺序,这保证了消息绝不会漏;同时给消费逻辑加上幂等,用数据库唯一索引或者 Redis 的 SETNX,确保同一条消息哪怕被投递十次,业务上也只生效一次。"不丢"靠手动提交的顺序来保证,"不重复造成影响"靠幂等来兜底,这两条加在一起,才构成一个真正可靠的消费链路。最后还有一个容易被忽略的隐患是 rebalance:消费者处理一批消息太慢,超过了 max.poll.interval.ms,Kafka 就以为它死了,把它踢出消费组,触发整个组的分区重新分配,而 rebalance 期间整个组是停止消费的——处理慢导致被踢、被踢导致 rebalance、rebalance 导致更慢,这是一个恶性循环。所以拉取参数 max.poll.records 和处理速度必须匹配,一次拉出来的消息量,要能在超时时间内从容处理完。这次治理之后,我们在消费链路上立了一条朴素的规矩:永远盯着 lag 这个指标,它是消费健康度最直接的体温计;一旦它持续上涨,就说明生产已经稳定地超过了消费,该扩容扩容、该提速提速,绝不能等它涨到几百万、等下游系统延迟了几个小时,才被动地去救火。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

加了 @Transactional 却不回滚:一次 Spring 事务失效的复盘

2026-5-20 13:16:46

技术教程

单表一亿三千万行:一次订单表分库分表落地的复盘

2026-5-20 13:22:07

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索