Flink CDC 同步 MySQL 到 Doris 半年踩坑实录:8 大坑全解

Flink CDC 同步 200+ MySQL 表到 Doris,半年踩 8 大坑:全量 OOM / 断点续传 / 增量延迟 / DDL 不同步 / JSON 精度 / exactly-once / Kafka 分区 / 主从切换。完整作业代码 + GTID 配置 + Doris Unique Key MOW + 2PC + 监控告警。

用 Flink CDC 同步 MySQL 到 Doris,跑了半年踩了一堆坑:全量阶段 OOM、增量延迟、DDL 没同步、断点续传失效、Kafka 中转背压。本文实录全过程,讲透 Flink CDC 2.x 工作原理和八大坑的修法,给做实时数仓的团队避雷。

架构背景

业务系统(MySQL 主从) → Flink CDC → Kafka → Flink ETL → Doris
                                         ↓
                                      ClickHouse(日志类)

规模:
- MySQL 表 200+,日均 binlog 量 500GB
- 同步延迟要求:p99 < 30s
- 目标库:Doris 集群(报表) + ClickHouse(日志)
- Flink 版本:1.17,Flink CDC 2.4

为什么不直接 Canal:
- Canal 配置繁琐,要按表配
- 不支持全量 + 增量无缝衔接
- Flink CDC 用 Debezium 内核,生态更现代

Flink CDC 工作原理

坑 1:全量阶段 OOM

现象:同步一张 8 亿行表,跑到 2 小时 TaskManager OOM
日志:OutOfMemoryError: Java heap space
检查点:全量 chunk 大小默认 8096,大表分了 10w 个 chunk
        每个 chunk 元数据存 JM 内存,加起来 1GB+
# 修法 1:增加 JM/TM 内存
jobmanager.memory.process.size: 4096m
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.fraction: 0.4

# 修法 2:增大 chunk 大小,减少 chunk 数
table.exec.source.cdc-snapshot-chunk-size: 50000   # 默认 8096
# 8 亿行 ÷ 5 万 = 1.6w 个 chunk(JM 压力小)

# 修法 3:用 chunk key 优化
'scan.incremental.snapshot.chunk.key-column' = 'id'
# 显式指定主键,避免框架猜测

# 修法 4:并行度调高
parallelism.default: 8
# 全量阶段多个 task 并行读不同 chunk

坑 2:断点续传失效

现象:job 重启后从头跑全量,不是从 checkpoint 续
原因:Flink CDC 2.x 必须开 checkpoint,且 source 状态要存在 checkpoint
       默认 checkpoint 不开,断点续传不生效
// 修法:启用 checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000);   // 60 秒一次
env.getCheckpointConfig().setCheckpointTimeout(120_000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 状态后端:大状态用 RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");

// 重启策略
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
    Time.seconds(10),    // initial
    Time.minutes(5),     // max
    2.0,                 // 乘数
    Time.hours(1),       // 重置阈值
    0.1                  // jitter
));

// 重启命令(关键!从最近 checkpoint 恢复)
flink run -s hdfs:///flink/checkpoints//chk-/_metadata \
    -p 8 cdc-job.jar

坑 3:增量延迟突增

现象:Doris 数据延迟从平时的 5s 飙到 5 分钟
监控:Flink 反压 100%,sink 处理慢

原因:
1. MySQL 大事务(单事务改 10w 行)
2. binlog 一次性涌入,sink 处理不过来
3. Doris bulk import 接口慢,每个 mini-batch 写 100ms
# 修法 1:Doris 侧调整 stream load
table-name:
  jdbc-url: jdbc:doris://doris-fe:9030
  sink.label-prefix: cdc
  sink.buffer-size: 33554432              # 32MB buffer
  sink.buffer-count: 10                   # 10 个 buffer 并发
  sink.max-retries: 5
  sink.properties.format: json
  sink.properties.read_json_by_line: true
  sink.properties.merge_type: MERGE       # 配合 unique key 表
  sink.enable-2pc: true                   # 2PC 保证 exactly-once

# 修法 2:中间加 Kafka 削峰
MySQL CDC → Kafka(分区 32)→ Flink ETL → Doris
# Kafka 缓冲,Doris 慢一点不阻塞 source

# 修法 3:调整 batch
sink.batch.size: 10000
sink.batch.interval: 10s

坑 4:DDL 没同步

现象:MySQL 业务表加了一列,Flink CDC 没自动同步到 Doris
       新增列的数据丢失

原因:Flink CDC 默认 DDL events 不直接给下游
       下游表结构得手动改

修法:
1. 启用 schema evolution(部分 sink 支持)
2. 加 DDL 监控告警
3. 业务 DDL 流程化:先改下游,再发 PR 改业务表
# Doris sink 启用 schema evolution
table-name:
  sink.enable-delete: true
  sink.schema-evolution: true             # 自动加列(限简单 ADD COLUMN)

# 监控 DDL 事件
- alert: MySQLDDLDetected
  expr: increase(flink_cdc_ddl_events_total[5m]) > 0
  labels: { severity: warning }
  annotations:
    summary: 'MySQL DDL detected, check downstream'

坑 5:JSON 列同步丢精度

现象:MySQL JSON 列里的 decimal 同步到 Doris 变 float,精度丢失
       金额 1234.56 → 1234.5599999...

原因:Debezium 序列化 JSON 时用 float
修法:配置 debezium 输出格式
source.debezium.properties:
  decimal.handling.mode: string         # decimal 转字符串(精确)
  bigint.unsigned.handling.mode: precise
  time.precision.mode: connect
  include.schema.changes: true
  snapshot.locking.mode: none           # MySQL 8 用 GTID

坑 6:exactly-once 失效

现象:Job 重启后下游 Doris 出现重复数据
原因:sink 没启用 2PC,只保证 at-least-once

修法:
1. sink 必须支持 2PC(Doris 1.2+ 支持)
2. 业务侧用 Unique Key 表(MOW 模式)兜底
-- Doris Unique Key 表
CREATE TABLE order_mirror (
  id BIGINT NOT NULL,
  user_id BIGINT,
  amount DECIMAL(12, 2),
  status VARCHAR(20),
  created_at DATETIME,
  updated_at DATETIME
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 32
PROPERTIES (
  "replication_num" = "3",
  "enable_unique_key_merge_on_write" = "true",       -- MOW 模式
  "store_row_column" = "true"
);

-- 同一个 id 多次写入,自动按 updated_at 取最新
-- 即使 Flink 重发也不重复

坑 7:Kafka 中转的分区策略

// 错:用默认分区(轮询),同一行变更分散到不同分区
KafkaSink sink = KafkaSink.builder()
    .setBootstrapServers(brokers)
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("cdc-orders")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build())
    .build();
// 同一个 order_id 的 UPDATE 可能分到不同分区,下游消费乱序

// 对:按主键分区
KafkaSink sink = KafkaSink.builder()
    .setBootstrapServers(brokers)
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("cdc-orders")
        .setKeySerializationSchema(e -> e.getPrimaryKey().getBytes())  // 主键作 Kafka key
        .setValueSerializationSchema(new JsonSerializationSchema<>())
        .setPartitioner(new HashPartitioner())                          // 按 key hash
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("cdc-orders-")
    .build();
// 同一个 order_id 永远在同一个分区,保证顺序

坑 8:MySQL 主从切换

现象:MySQL 主从切换后 binlog position 变了,Flink CDC 读不到
原因:Flink CDC 默认按 binlog filename + position 续读
       新主的 filename 不一样

修法:必须用 GTID 模式
MySQL 配置:
  gtid_mode = ON
  enforce_gtid_consistency = ON

Flink CDC 配置:
  scan.startup.mode: latest-offset / specific-offset
  + GTID set
source:
  hostname: mysql-cluster.internal
  port: 3306
  username: cdc_user
  password: ${MYSQL_CDC_PASSWORD}
  database-list: business
  table-list: business.orders,business.order_items
  server-id: 5400-5410                    # 范围,每个并行实例用一个
  scan.startup.mode: initial              # 第一次跑全量,后续 binlog
  scan.incremental.snapshot.enabled: true
  scan.snapshot.fetch.size: 10000
  debezium.gtid.source.includes: ''       # 用 GTID 续传

权限设置

-- MySQL 端创建 CDC 账号
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'StrongPassword!';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SHOW VIEW, EVENT, RELOAD
  ON *.* TO 'cdc_user'@'%';
FLUSH PRIVILEGES;

-- MySQL 5.7 还要加 LOCK TABLES(初始化快照用)
-- 但 Flink CDC 2.x 用 chunk-based 不需要锁
-- 所以 LOCK TABLES 可省

-- 确认 binlog 配置
SHOW VARIABLES LIKE 'binlog_format';      -- ROW
SHOW VARIABLES LIKE 'binlog_row_image';   -- FULL
SHOW VARIABLES LIKE 'gtid_mode';           -- ON
SHOW VARIABLES LIKE 'log_bin';             -- ON

完整作业代码

public class MySQLToDorisJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60_000);
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
        env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
            Time.seconds(10), Time.minutes(5), 2.0,
            Time.hours(1), 0.1));

        MySqlSource source = MySqlSource.builder()
            .hostname("mysql-cluster.internal")
            .port(3306)
            .databaseList("business")
            .tableList("business.orders", "business.order_items")
            .username("cdc_user")
            .password(System.getenv("MYSQL_CDC_PASSWORD"))
            .serverId("5400-5410")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .splitSize(50_000)
            .startupOptions(StartupOptions.initial())
            .debeziumProperties(new Properties() {{
                setProperty("decimal.handling.mode", "string");
                setProperty("bigint.unsigned.handling.mode", "precise");
                setProperty("snapshot.locking.mode", "none");
            }})
            .build();

        DataStreamSource stream = env.fromSource(
            source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source"
        );

        // 转换 + sink Doris
        stream
            .map(new DebeziumToRowFunction())
            .name("Transform")
            .sinkTo(DorisSink.builder()
                .setDorisOptions(DorisOptions.builder()
                    .setFenodes("doris-fe:8030")
                    .setUsername("flink")
                    .setPassword(System.getenv("DORIS_PASSWORD"))
                    .build())
                .setDorisExecutionOptions(DorisExecutionOptions.builder()
                    .setBatchSize(10000)
                    .setBatchIntervalMs(10_000L)
                    .setEnable2PC(true)
                    .setLabelPrefix("cdc-orders")
                    .build())
                .build())
            .name("Doris Sink");

        env.execute("MySQL to Doris CDC");
    }
}

监控指标

# Prometheus alert
- alert: FlinkCDCLag
  expr: flink_cdc_currentEmitEventTimeLag_seconds > 60
  for: 2m
  labels: { severity: warning }
  annotations:
    summary: 'CDC lag > 60s for {{ $labels.task_name }}'

- alert: FlinkBackpressureHigh
  expr: flink_taskmanager_job_task_backPressureTimeMsPerSecond > 500
  for: 2m
  labels: { severity: warning }

- alert: FlinkCheckpointFailed
  expr: increase(flink_jobmanager_job_numberOfFailedCheckpoints[10m]) > 2
  labels: { severity: critical }

避坑清单

  1. 必须开 checkpoint(60s),状态后端用 RocksDB
  2. MySQL 必须开 GTID,否则主从切换断流
  3. 大表全量调整 chunk-size,默认 8096 太小
  4. Kafka 中转按主键 hash 分区,保证顺序
  5. Doris 用 Unique Key + MOW + 2PC,exactly-once
  6. DDL 不自动同步,流程上"先改下游再改业务"
  7. decimal/bigint 配置 debezium handling mode
  8. 权限只给 SELECT + REPLICATION,不要给 SUPER
  9. binlog 保留 7 天以上,Flink 宕机有恢复窗口
  10. 不要在 Flink 里做复杂业务逻辑,sink 端处理

反思

实时 CDC 是个看似简单实际坑深的工程。Flink CDC 把全量 + 增量统一了,比 Canal 现代,但底层是 Debezium,所有 Debezium 的坑都会遇到。最坑的是 DDL 没自动同步,需要业务流程兜底。最难调的是延迟问题,常常是 sink 端瓶颈而不是 source。这半年下来,200+ 表稳定同步,延迟 p99 控在 15 秒内,基本满足报表实时性要求。下一步在测 Flink CDC 3.0 的 pipeline 模式,简化作业开发。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

ClickHouse 生产 3 年实战:日 5 亿日志 + 200ms 聚合 + Doris 对比

2026-5-19 12:08:47

技术教程

eBPF 性能诊断实战:Go 服务 sys 60% 的隐藏 fsync 案

2026-5-19 12:13:08

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索