From 2564ad53a2818da852d594d2f4f9ec082100c7b6 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 17 Jun 2025 13:15:23 +0800 Subject: [PATCH 1/2] [FLINK-37963] Fix potential NPE when triggering JobManager failover prematurely --- .../source/MongoDBFullChangelogITCase.java | 6 +++--- .../source/MongoDBParallelSourceITCase.java | 5 +++-- .../mongodb/utils/MongoDBTestUtils.java | 14 ++++++++++++++ .../oracle/testutils/OracleTestUtils.java | 14 ++++++++++++++ .../postgres/testutils/PostgresTestUtils.java | 16 +++++++++++++++- 5 files changed, 49 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index 659c68604a5..788c324c044 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -77,6 +77,9 @@ class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { private static final int USE_PRE_HIGHWATERMARK_HOOK = 2; private static final int USE_POST_HIGHWATERMARK_HOOK = 3; + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + @Test void testGetMongoDBVersion() { MongoDBSourceConfig config = @@ -470,7 +473,6 @@ private List testBackfillWhenWritingEvents( customerDatabase); MONGO_CONTAINER.executeCommandFileInDatabase("customer", customerDatabase); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.setParallelism(1); @@ -580,7 +582,6 @@ private void testMongoDBParallelSourceWithMetadataColumns( customerDatabase); } - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); env.setParallelism(parallelism); @@ -752,7 +753,6 @@ private void testMongoDBParallelSource( customerDatabase); } - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); env.setParallelism(parallelism); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java index 6830e966df2..e755b43886e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java @@ -71,6 +71,9 @@ class MongoDBParallelSourceITCase extends MongoDBSourceTestBase { private static final int USE_PRE_HIGHWATERMARK_HOOK = 2; private static final int USE_POST_HIGHWATERMARK_HOOK = 3; + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + @Test void testReadSingleCollectionWithSingleParallelism() throws Exception { testMongoDBParallelSource( @@ -406,7 +409,6 @@ private List testBackfillWhenWritingEvents( boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions) throws Exception { String customerDatabase = MONGO_CONTAINER.executeCommandFileInSeparateDatabase("customer"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.setParallelism(1); @@ -517,7 +519,6 @@ private void testMongoDBParallelSource( String customerDatabase = MONGO_CONTAINER.executeCommandFileInSeparateDatabase("customer"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); env.setParallelism(parallelism); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java index 3f4f7032d79..c760c3f86f9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.mongodb.utils; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.table.data.RowData; @@ -126,11 +127,24 @@ public static void triggerFailover( } } + public static void ensureJmLeaderServiceExists( + HaLeadershipControl leadershipControl, JobID jobId) throws Exception { + EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl; + + // Make sure JM leader service has been created, or an NPE might be thrown when we're + // triggering JM failover later. + control.getJobManagerLeaderElection(jobId).close(); + } + public static void triggerJobManagerFailover( JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); + ensureJmLeaderServiceExists(haLeadershipControl, jobId); haLeadershipControl.revokeJobMasterLeadership(jobId).get(); + afterFailAction.run(); + + ensureJmLeaderServiceExists(haLeadershipControl, jobId); haLeadershipControl.grantJobMasterLeadership(jobId).get(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java index b43927712d0..4b070ab644d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.oracle.testutils; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.table.planner.factories.TestValuesTableFactory; @@ -61,11 +62,24 @@ public static void triggerFailover( } } + public static void ensureJmLeaderServiceExists( + HaLeadershipControl leadershipControl, JobID jobId) throws Exception { + EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl; + + // Make sure JM leader service has been created, or an NPE might be thrown when we're + // triggering JM failover later. + control.getJobManagerLeaderElection(jobId).close(); + } + public static void triggerJobManagerFailover( JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); + ensureJmLeaderServiceExists(haLeadershipControl, jobId); haLeadershipControl.revokeJobMasterLeadership(jobId).get(); + afterFailAction.run(); + + ensureJmLeaderServiceExists(haLeadershipControl, jobId); haLeadershipControl.grantJobMasterLeadership(jobId).get(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java index fce9ece5e3a..f57333e59c9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.postgres.testutils; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.table.api.TableResult; @@ -71,11 +72,24 @@ public static void triggerFailover( } } - protected static void triggerJobManagerFailover( + public static void ensureJmLeaderServiceExists( + HaLeadershipControl leadershipControl, JobID jobId) throws Exception { + EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl; + + // Make sure JM leader service has been created, or an NPE might be thrown when we're + // triggering JM failover later. + control.getJobManagerLeaderElection(jobId).close(); + } + + public static void triggerJobManagerFailover( JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); + ensureJmLeaderServiceExists(haLeadershipControl, jobId); haLeadershipControl.revokeJobMasterLeadership(jobId).get(); + afterFailAction.run(); + + ensureJmLeaderServiceExists(haLeadershipControl, jobId); haLeadershipControl.grantJobMasterLeadership(jobId).get(); } From 31f25f937935412c2c249a2d4b6397b820b1c1a4 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 23 Jun 2025 09:51:27 +0800 Subject: [PATCH 2/2] Also fix MySQL NewlyAddedTableITCase --- .../connectors/mysql/source/MySqlSourceTestBase.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 08ccfdcce4d..52c5f1ce9b8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.RpcServiceSharing; @@ -149,11 +150,22 @@ protected static void triggerFailover( } } + protected static void ensureJmLeaderServiceExists( + HaLeadershipControl leadershipControl, JobID jobId) throws Exception { + EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl; + + // Make sure JM leader service has been created, or an NPE might be thrown when we're + // triggering JM failover later. + control.getJobManagerLeaderElection(jobId).close(); + } + protected static void triggerJobManagerFailover( JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); + ensureJmLeaderServiceExists(haLeadershipControl, jobId); haLeadershipControl.revokeJobMasterLeadership(jobId).get(); afterFailAction.run(); + ensureJmLeaderServiceExists(haLeadershipControl, jobId); haLeadershipControl.grantJobMasterLeadership(jobId).get(); }