电商订单系统遇到一个奇怪的 bug:用户先下单后取消,数据库里却出现"已取消的订单又回到了已支付状态"。日志里两个事件顺序明明是对的,问题出在 Kafka 消费端顺序错乱。本文把 Kafka 顺序消费的"七层防线"讲清楚 —— 从生产端到消费端到业务层,每一层都可能出错。
故障还原
2026-03-15 14:23:01.123 订单服务 → Kafka: order_event {orderId=A, event=PAID, ts=14:23:01.000}
2026-03-15 14:23:05.456 订单服务 → Kafka: order_event {orderId=A, event=CANCELED, ts=14:23:05.000}
消费端:
2026-03-15 14:23:05.500 消费 CANCELED,update orders set status=CANCELED
2026-03-15 14:23:05.520 消费 PAID,update orders set status=PAID
# ← 颠倒了!订单又变成已支付
典型的"晚到的消息后处理"导致的状态错乱。Kafka 文档说"分区内有序",但实际生产里这条"分区内有序"也能被破坏。
Kafka 顺序的层级理解
防线 1:Producer 内乱序
# 错的配置:可能乱序
max.in.flight.requests.per.connection=5 # 允许 5 个 in-flight
retries=10 # 重试,某个失败会被后面消息追上
enable.idempotence=false
# Producer 发 [M1, M2, M3, M4, M5]
# M1 失败,producer 重试 M1
# 此时 M2 M3 已经在 broker 上,M1 重试成功后排在 M4 后面
# broker 上的顺序变成: M2, M3, M4, M1(乱了)
# 正确配置
enable.idempotence=true # 开幂等,自动处理重试顺序
max.in.flight.requests.per.connection=5 # idempotence 开启后仍可设 5
acks=all # 等所有 ISR 副本确认
retries=Integer.MAX_VALUE
delivery.timeout.ms=120000
# 开 idempotence 后,producer 会给每个消息加 sequence number
# broker 检查 sequence 是否连续,不连续则拒绝写入并要求 producer 重试整个 batch
# 保证 partition 内严格顺序
防线 2:Partition 路由
跨 partition 的消息没有顺序保证。需要让"同一业务对象"的所有事件落在同一个 partition:
// 错:无 key,Kafka 用 round-robin 分配 partition
producer.send(new ProducerRecord<>("order_event", null, event));
// 同一订单的 PAID 和 CANCELED 可能落到不同 partition,完全无序
// 对:用业务 key,Kafka 用 hash(key) % partitions 路由
producer.send(new ProducerRecord<>("order_event", String.valueOf(orderId), event));
// 同一 orderId 永远落在同一 partition
key 的选择直接影响"哪些消息有序"。常见模式:
业务 Key 选择 意义
=======================================================
订单状态变更 orderId 同订单的事件有序
用户操作流水 userId 同用户的操作有序
支付状态机 paymentId 同支付单的事件有序
商品库存变更 skuId 同 SKU 的库存变更有序
聊天消息 chatRoomId 同房间的消息有序
分布式追踪 span traceId 同链路的 span 在一起(顺便,不需有序)
防线 3:扩 partition 后的乱序
这个坑很大很多人不知道。Kafka topic 的 partition 数可以扩容,扩容之后同一个 key 可能落到不同的 partition:
扩容前:3 个 partition
hash("order_A") % 3 = 1 → P1
扩容后:6 个 partition
hash("order_A") % 6 = 4 → P4
后续 order_A 的事件去 P4,P1 上还有它的旧事件没消费完
消费 P4 时可能赶在 P1 上的旧事件之前,顺序乱了
// 自定义 partitioner,扩容后仍保证旧 key 落原 partition
public class StablePartitioner implements Partitioner {
private static final int FIXED_PARTITIONS = 32; // 预留足够多,以后不再扩
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int partitionCount = cluster.partitionCountForTopic(topic);
if (partitionCount < FIXED_PARTITIONS) {
throw new IllegalStateException("topic " + topic + " needs " + FIXED_PARTITIONS + " partitions");
}
// 用 32 作为 mod,即使后续扩到 64 也不会影响
return Utils.toPositive(Utils.murmur2(keyBytes)) % FIXED_PARTITIONS;
}
}
// Producer 配置
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, StablePartitioner.class.getName());
建议:topic 创建时一次性给足 partition 数,以后不扩。32 / 64 / 128 都行,够用 5 年。
防线 4:消费端并发模型
// 错:单 consumer 拉一批,扔线程池并发处理
@KafkaListener(topics = "order_event")
public void consume(ConsumerRecord<String, String> record) {
threadPool.submit(() -> { // ← 线程池里乱序了!
processOrderEvent(record.value());
});
}
// 对:同一 partition 内单线程顺序处理
@KafkaListener(topics = "order_event", concurrency = "6")
public void consume(ConsumerRecord<String, String> record) {
// Spring Kafka concurrency=6 启 6 个 consumer 实例
// 每个 consumer 对应 1 个 partition,单线程处理本 partition 的消息
// 不同 partition 之间并行,同 partition 内串行
processOrderEvent(record.value());
}
进阶:同 partition 内但不同 key 也想并发?
// key-based 并发:同 key 串行,跨 key 并行
public class KeyOrderedExecutor {
private final ExecutorService[] executors;
public KeyOrderedExecutor(int parallelism) {
executors = new ExecutorService[parallelism];
for (int i = 0; i < parallelism; i++) {
executors[i] = Executors.newSingleThreadExecutor();
}
}
public void submit(String key, Runnable task) {
int idx = Math.abs(key.hashCode()) % executors.length;
executors[idx].submit(task);
}
}
@KafkaListener(topics = "order_event")
public void consume(ConsumerRecord<String, String> record) {
keyOrderedExecutor.submit(record.key(), () -> processOrderEvent(record.value()));
// 同 orderId 永远去同一个单线程 executor,顺序保证
// 不同 orderId 走不同 executor,并行
}
防线 5:offset 提交时机
// 错:自动提交 offset(默认 5 秒一次)
// 处理失败但 offset 已提交 → 消息丢失
// 处理成功但 offset 没提交 → 重启后重复消费
spring.kafka.consumer.enable-auto-commit=true
// 对:手动提交,处理完才提交
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
@KafkaListener(topics = "order_event")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
processOrderEvent(record.value());
ack.acknowledge(); // 只在成功后才提交 offset
} catch (Exception e) {
// 不提交 offset,消息会被重新消费
log.error("process failed, will retry", e);
throw e; // 抛出让 Spring Kafka 进入错误处理流程
}
}
防线 6:失败重试时的乱序
// 错:消息 M1 失败,跳过去消费 M2,后台重试 M1
// 业务上 M2 已经处理,但 M1 失败的回滚发生时 M2 已经入库
// 顺序错乱
// 对:暂停 partition,直到 M1 成功
@Component
public class OrderErrorHandler implements ConsumerAwareErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) message.getPayload();
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
// 暂停这个 partition,seek 回失败位置
consumer.pause(Collections.singletonList(tp));
consumer.seek(tp, record.offset());
// 间隔后恢复重试
scheduler.schedule(() -> consumer.resume(Collections.singletonList(tp)),
Duration.ofSeconds(10));
return null;
}
}
// Spring Kafka 推荐用 SeekToCurrentErrorHandler(已重命名 DefaultErrorHandler)
@Bean
public DefaultErrorHandler errorHandler() {
BackOff backOff = new FixedBackOff(1000L, 5); // 重试 5 次,每次间隔 1 秒
DefaultErrorHandler handler = new DefaultErrorHandler((record, exception) -> {
// 重试 5 次后还失败,扔死信队列
kafkaTemplate.send("order_event.DLT", record.key().toString(), record.value());
}, backOff);
return handler;
}
防线 7:消费端业务层的乱序兜底
就算前面 6 层都做对了,生产环境总会出现"先到的消息处理慢,后到的快"导致乱序。业务层兜底:
// 用版本号 / 时间戳防"后到的消息被旧的覆盖"
@Transactional
public void processOrderEvent(OrderEvent event) {
Order order = orderRepo.findById(event.orderId).orElse(null);
if (order == null) {
order = new Order(event.orderId);
}
// 关键:只有事件时间戳 > 当前订单的 last_event_ts 才更新
if (event.timestamp <= order.getLastEventTs()) {
log.warn("stale event ignored: orderId={} eventTs={} currentTs={}",
event.orderId, event.timestamp, order.getLastEventTs());
return; // 忽略旧事件
}
// 应用状态变更
applyStateTransition(order, event);
order.setLastEventTs(event.timestamp);
orderRepo.save(order);
}
// SQL 层面的乐观锁
// UPDATE orders SET status=?, last_event_ts=?
// WHERE id=? AND last_event_ts < ?
// 受影响行数 = 0 说明事件已被更新的版本覆盖,记日志告警
压测 + 故障注入验证
# chaos-monkey 模拟 Kafka broker 网络抖动
$ kafka-chaos --broker kafka-0 --action partition --duration 30s
# 在故障期间发 1 万条订单事件,观察:
# - producer 重试次数
# - 消费端是否出现乱序
# - 业务层 last_event_ts 兜底拦截了几条
# - DLT 死信队列有多少消息
# 用 kafka-consumer-groups 看 lag
$ kafka-consumer-groups.sh --bootstrap-server kafka-0:9092 \
--group order-consumer --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-consumer order_event 0 12345 12500 155
order-consumer order_event 1 12200 12500 300
监控告警
- alert: KafkaConsumerLagHigh
expr: kafka_consumergroup_lag > 1000
for: 5m
annotations:
summary: '消费组 {{ $labels.group }} 在 {{ $labels.topic }} 上落后 > 1000'
- alert: KafkaProducerRetryHigh
expr: rate(kafka_producer_record_retry_total[5m]) > 10
annotations:
summary: 'Producer 重试率高,可能 broker 不稳'
- alert: OrderEventStaleHigh
expr: rate(order_event_stale_ignored[5m]) > 1
annotations:
summary: '订单事件 last_event_ts 兜底拦截 > 1 / 秒,顺序问题可能严重'
- alert: KafkaDeadLetterHigh
expr: rate(kafka_dlt_messages[5m]) > 0
annotations:
summary: '出现死信消息,有消息无法处理'
顺序消费 checklist
- Producer:
enable.idempotence=true+acks=all - Producer: 业务消息必须设 key
- Topic: partition 数一次性给足,以后不扩
- Consumer: 单 partition 单线程,跨 partition 才并发
- Consumer: 手动 offset,处理完才提交
- Consumer: 失败必须暂停 partition 而不是跳过
- 业务层: 用 last_event_ts 或 version 防"晚到消息覆盖"
- 监控: lag / 重试率 / 兜底拦截 / 死信全要告警
Kafka 的"分区内有序"是个最低限度承诺,顺序消费是个工程问题,要从 Producer 到 Consumer 到业务层一层一层把关。我们当年那个订单 bug 就是因为 Producer 没开 idempotence + 业务层没 last_event_ts 双重保险,事故之后整套防线建好,后面再没出现过类似乱序问题。
—— 别看了 · 2026