用 Flink CDC 同步 MySQL 到 Doris,跑了半年踩了一堆坑:全量阶段 OOM、增量延迟、DDL 没同步、断点续传失效、Kafka 中转背压。本文实录全过程,讲透 Flink CDC 2.x 工作原理和八大坑的修法,给做实时数仓的团队避雷。
架构背景
业务系统(MySQL 主从) → Flink CDC → Kafka → Flink ETL → Doris
↓
ClickHouse(日志类)
规模:
- MySQL 表 200+,日均 binlog 量 500GB
- 同步延迟要求:p99 < 30s
- 目标库:Doris 集群(报表) + ClickHouse(日志)
- Flink 版本:1.17,Flink CDC 2.4
为什么不直接 Canal:
- Canal 配置繁琐,要按表配
- 不支持全量 + 增量无缝衔接
- Flink CDC 用 Debezium 内核,生态更现代
Flink CDC 工作原理
坑 1:全量阶段 OOM
现象:同步一张 8 亿行表,跑到 2 小时 TaskManager OOM
日志:OutOfMemoryError: Java heap space
检查点:全量 chunk 大小默认 8096,大表分了 10w 个 chunk
每个 chunk 元数据存 JM 内存,加起来 1GB+
# 修法 1:增加 JM/TM 内存
jobmanager.memory.process.size: 4096m
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.fraction: 0.4
# 修法 2:增大 chunk 大小,减少 chunk 数
table.exec.source.cdc-snapshot-chunk-size: 50000 # 默认 8096
# 8 亿行 ÷ 5 万 = 1.6w 个 chunk(JM 压力小)
# 修法 3:用 chunk key 优化
'scan.incremental.snapshot.chunk.key-column' = 'id'
# 显式指定主键,避免框架猜测
# 修法 4:并行度调高
parallelism.default: 8
# 全量阶段多个 task 并行读不同 chunk
坑 2:断点续传失效
现象:job 重启后从头跑全量,不是从 checkpoint 续
原因:Flink CDC 2.x 必须开 checkpoint,且 source 状态要存在 checkpoint
默认 checkpoint 不开,断点续传不生效
// 修法:启用 checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000); // 60 秒一次
env.getCheckpointConfig().setCheckpointTimeout(120_000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 状态后端:大状态用 RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
// 重启策略
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.seconds(10), // initial
Time.minutes(5), // max
2.0, // 乘数
Time.hours(1), // 重置阈值
0.1 // jitter
));
// 重启命令(关键!从最近 checkpoint 恢复)
flink run -s hdfs:///flink/checkpoints//chk-/_metadata \
-p 8 cdc-job.jar
坑 3:增量延迟突增
现象:Doris 数据延迟从平时的 5s 飙到 5 分钟
监控:Flink 反压 100%,sink 处理慢
原因:
1. MySQL 大事务(单事务改 10w 行)
2. binlog 一次性涌入,sink 处理不过来
3. Doris bulk import 接口慢,每个 mini-batch 写 100ms
# 修法 1:Doris 侧调整 stream load
table-name:
jdbc-url: jdbc:doris://doris-fe:9030
sink.label-prefix: cdc
sink.buffer-size: 33554432 # 32MB buffer
sink.buffer-count: 10 # 10 个 buffer 并发
sink.max-retries: 5
sink.properties.format: json
sink.properties.read_json_by_line: true
sink.properties.merge_type: MERGE # 配合 unique key 表
sink.enable-2pc: true # 2PC 保证 exactly-once
# 修法 2:中间加 Kafka 削峰
MySQL CDC → Kafka(分区 32)→ Flink ETL → Doris
# Kafka 缓冲,Doris 慢一点不阻塞 source
# 修法 3:调整 batch
sink.batch.size: 10000
sink.batch.interval: 10s
坑 4:DDL 没同步
现象:MySQL 业务表加了一列,Flink CDC 没自动同步到 Doris
新增列的数据丢失
原因:Flink CDC 默认 DDL events 不直接给下游
下游表结构得手动改
修法:
1. 启用 schema evolution(部分 sink 支持)
2. 加 DDL 监控告警
3. 业务 DDL 流程化:先改下游,再发 PR 改业务表
# Doris sink 启用 schema evolution
table-name:
sink.enable-delete: true
sink.schema-evolution: true # 自动加列(限简单 ADD COLUMN)
# 监控 DDL 事件
- alert: MySQLDDLDetected
expr: increase(flink_cdc_ddl_events_total[5m]) > 0
labels: { severity: warning }
annotations:
summary: 'MySQL DDL detected, check downstream'
坑 5:JSON 列同步丢精度
现象:MySQL JSON 列里的 decimal 同步到 Doris 变 float,精度丢失
金额 1234.56 → 1234.5599999...
原因:Debezium 序列化 JSON 时用 float
修法:配置 debezium 输出格式
source.debezium.properties:
decimal.handling.mode: string # decimal 转字符串(精确)
bigint.unsigned.handling.mode: precise
time.precision.mode: connect
include.schema.changes: true
snapshot.locking.mode: none # MySQL 8 用 GTID
坑 6:exactly-once 失效
现象:Job 重启后下游 Doris 出现重复数据
原因:sink 没启用 2PC,只保证 at-least-once
修法:
1. sink 必须支持 2PC(Doris 1.2+ 支持)
2. 业务侧用 Unique Key 表(MOW 模式)兜底
-- Doris Unique Key 表
CREATE TABLE order_mirror (
id BIGINT NOT NULL,
user_id BIGINT,
amount DECIMAL(12, 2),
status VARCHAR(20),
created_at DATETIME,
updated_at DATETIME
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 32
PROPERTIES (
"replication_num" = "3",
"enable_unique_key_merge_on_write" = "true", -- MOW 模式
"store_row_column" = "true"
);
-- 同一个 id 多次写入,自动按 updated_at 取最新
-- 即使 Flink 重发也不重复
坑 7:Kafka 中转的分区策略
// 错:用默认分区(轮询),同一行变更分散到不同分区
KafkaSink sink = KafkaSink.builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("cdc-orders")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.build();
// 同一个 order_id 的 UPDATE 可能分到不同分区,下游消费乱序
// 对:按主键分区
KafkaSink sink = KafkaSink.builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("cdc-orders")
.setKeySerializationSchema(e -> e.getPrimaryKey().getBytes()) // 主键作 Kafka key
.setValueSerializationSchema(new JsonSerializationSchema<>())
.setPartitioner(new HashPartitioner()) // 按 key hash
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("cdc-orders-")
.build();
// 同一个 order_id 永远在同一个分区,保证顺序
坑 8:MySQL 主从切换
现象:MySQL 主从切换后 binlog position 变了,Flink CDC 读不到
原因:Flink CDC 默认按 binlog filename + position 续读
新主的 filename 不一样
修法:必须用 GTID 模式
MySQL 配置:
gtid_mode = ON
enforce_gtid_consistency = ON
Flink CDC 配置:
scan.startup.mode: latest-offset / specific-offset
+ GTID set
source:
hostname: mysql-cluster.internal
port: 3306
username: cdc_user
password: ${MYSQL_CDC_PASSWORD}
database-list: business
table-list: business.orders,business.order_items
server-id: 5400-5410 # 范围,每个并行实例用一个
scan.startup.mode: initial # 第一次跑全量,后续 binlog
scan.incremental.snapshot.enabled: true
scan.snapshot.fetch.size: 10000
debezium.gtid.source.includes: '' # 用 GTID 续传
权限设置
-- MySQL 端创建 CDC 账号
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'StrongPassword!';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SHOW VIEW, EVENT, RELOAD
ON *.* TO 'cdc_user'@'%';
FLUSH PRIVILEGES;
-- MySQL 5.7 还要加 LOCK TABLES(初始化快照用)
-- 但 Flink CDC 2.x 用 chunk-based 不需要锁
-- 所以 LOCK TABLES 可省
-- 确认 binlog 配置
SHOW VARIABLES LIKE 'binlog_format'; -- ROW
SHOW VARIABLES LIKE 'binlog_row_image'; -- FULL
SHOW VARIABLES LIKE 'gtid_mode'; -- ON
SHOW VARIABLES LIKE 'log_bin'; -- ON
完整作业代码
public class MySQLToDorisJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.seconds(10), Time.minutes(5), 2.0,
Time.hours(1), 0.1));
MySqlSource source = MySqlSource.builder()
.hostname("mysql-cluster.internal")
.port(3306)
.databaseList("business")
.tableList("business.orders", "business.order_items")
.username("cdc_user")
.password(System.getenv("MYSQL_CDC_PASSWORD"))
.serverId("5400-5410")
.deserializer(new JsonDebeziumDeserializationSchema())
.splitSize(50_000)
.startupOptions(StartupOptions.initial())
.debeziumProperties(new Properties() {{
setProperty("decimal.handling.mode", "string");
setProperty("bigint.unsigned.handling.mode", "precise");
setProperty("snapshot.locking.mode", "none");
}})
.build();
DataStreamSource stream = env.fromSource(
source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source"
);
// 转换 + sink Doris
stream
.map(new DebeziumToRowFunction())
.name("Transform")
.sinkTo(DorisSink.builder()
.setDorisOptions(DorisOptions.builder()
.setFenodes("doris-fe:8030")
.setUsername("flink")
.setPassword(System.getenv("DORIS_PASSWORD"))
.build())
.setDorisExecutionOptions(DorisExecutionOptions.builder()
.setBatchSize(10000)
.setBatchIntervalMs(10_000L)
.setEnable2PC(true)
.setLabelPrefix("cdc-orders")
.build())
.build())
.name("Doris Sink");
env.execute("MySQL to Doris CDC");
}
}
监控指标
# Prometheus alert
- alert: FlinkCDCLag
expr: flink_cdc_currentEmitEventTimeLag_seconds > 60
for: 2m
labels: { severity: warning }
annotations:
summary: 'CDC lag > 60s for {{ $labels.task_name }}'
- alert: FlinkBackpressureHigh
expr: flink_taskmanager_job_task_backPressureTimeMsPerSecond > 500
for: 2m
labels: { severity: warning }
- alert: FlinkCheckpointFailed
expr: increase(flink_jobmanager_job_numberOfFailedCheckpoints[10m]) > 2
labels: { severity: critical }
避坑清单
- 必须开 checkpoint(60s),状态后端用 RocksDB
- MySQL 必须开 GTID,否则主从切换断流
- 大表全量调整 chunk-size,默认 8096 太小
- Kafka 中转按主键 hash 分区,保证顺序
- Doris 用 Unique Key + MOW + 2PC,exactly-once
- DDL 不自动同步,流程上"先改下游再改业务"
- decimal/bigint 配置 debezium handling mode
- 权限只给 SELECT + REPLICATION,不要给 SUPER
- binlog 保留 7 天以上,Flink 宕机有恢复窗口
- 不要在 Flink 里做复杂业务逻辑,sink 端处理
反思
实时 CDC 是个看似简单实际坑深的工程。Flink CDC 把全量 + 增量统一了,比 Canal 现代,但底层是 Debezium,所有 Debezium 的坑都会遇到。最坑的是 DDL 没自动同步,需要业务流程兜底。最难调的是延迟问题,常常是 sink 端瓶颈而不是 source。这半年下来,200+ 表稳定同步,延迟 p99 控在 15 秒内,基本满足报表实时性要求。下一步在测 Flink CDC 3.0 的 pipeline 模式,简化作业开发。
—— 别看了 · 2026