From 7a85c5eefca624d1c6c81dd33db188554f0e4a59 Mon Sep 17 00:00:00 2001 From: Dustin Washington Date: Fri, 19 Sep 2025 08:02:46 -0400 Subject: [PATCH] [FLINK-38279] Add SnapshotPendingSplitsState to MySQL checkpoint state restoration [FLINK-38279] This issue currently prevents restoring snapshot jobs from being restart-able from save points/checkpoints which means that large table and database replication must be done in one shot. Any thing that crashes the snapshotting process creates an entirely non-recoverable situation which is highly inconvenient and un-workable in many set ups. --- .../flink/cdc/connectors/mysql/source/MySqlSource.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index cb06cc45a6d..9dfd9db5cb7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -36,6 +36,7 @@ import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner; import org.apache.flink.cdc.connectors.mysql.source.assigners.state.BinlogPendingSplitsState; import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState; +import org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState; import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState; import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; @@ -251,6 +252,13 @@ public SplitEnumerator restoreEnumerator( enumContext.currentParallelism(), (HybridPendingSplitsState) checkpoint, enumContext); + } else if (checkpoint instanceof SnapshotPendingSplitsState) { + splitAssigner = + new MySqlSnapshotSplitAssigner( + sourceConfig, + enumContext.currentParallelism(), + (SnapshotPendingSplitsState) checkpoint, + enumContext); } else if (checkpoint instanceof BinlogPendingSplitsState) { splitAssigner = new MySqlBinlogSplitAssigner(