diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index e295fb51fdb..1acbeac941b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -271,18 +271,10 @@ private void captureNewlyAddedTables() { // case 2: there are new tables to add if (!newlyAddedTables.isEmpty()) { - // if job is still in snapshot reading phase, directly add all newly added - // tables LOG.info("Found newly added tables, start capture newly added tables process"); - // add new tables remainingTables.addAll(newlyAddedTables); - if (AssignerStatus.isAssigningFinished(assignerStatus)) { - // start the newly added tables process under binlog reading phase - LOG.info( - "Found newly added tables, start capture newly added tables process under binlog reading phase"); - this.startAssignNewlyAddedTables(); - } + this.startAssignNewlyAddedTables(); } } catch (Exception e) { throw new FlinkRuntimeException(