Skip to content

[FLINK-37963] Fix potential NPE when triggering JobManager failover prematurely #4044

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

private static final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

@Test
void testGetMongoDBVersion() {
MongoDBSourceConfig config =
Expand Down Expand Up @@ -470,7 +473,6 @@ private List<String> testBackfillWhenWritingEvents(
customerDatabase);
MONGO_CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);

Expand Down Expand Up @@ -580,7 +582,6 @@ private void testMongoDBParallelSourceWithMetadataColumns(
customerDatabase);
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setParallelism(parallelism);
Expand Down Expand Up @@ -752,7 +753,6 @@ private void testMongoDBParallelSource(
customerDatabase);
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setParallelism(parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

private static final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

@Test
void testReadSingleCollectionWithSingleParallelism() throws Exception {
testMongoDBParallelSource(
Expand Down Expand Up @@ -406,7 +409,6 @@ private List<String> testBackfillWhenWritingEvents(
boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions)
throws Exception {
String customerDatabase = MONGO_CONTAINER.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);

Expand Down Expand Up @@ -517,7 +519,6 @@ private void testMongoDBParallelSource(

String customerDatabase = MONGO_CONTAINER.executeCommandFileInSeparateDatabase("customer");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setParallelism(parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.mongodb.utils;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -126,11 +127,24 @@ public static void triggerFailover(
}
}

public static void ensureJmLeaderServiceExists(
HaLeadershipControl leadershipControl, JobID jobId) throws Exception {
EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl;

// Make sure JM leader service has been created, or an NPE might be thrown when we're
// triggering JM failover later.
control.getJobManagerLeaderElection(jobId).close();
}

public static void triggerJobManagerFailover(
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
haLeadershipControl.revokeJobMasterLeadership(jobId).get();

afterFailAction.run();

ensureJmLeaderServiceExists(haLeadershipControl, jobId);
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
Expand Down Expand Up @@ -149,11 +150,22 @@ protected static void triggerFailover(
}
}

protected static void ensureJmLeaderServiceExists(
HaLeadershipControl leadershipControl, JobID jobId) throws Exception {
EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl;

// Make sure JM leader service has been created, or an NPE might be thrown when we're
// triggering JM failover later.
control.getJobManagerLeaderElection(jobId).close();
}

protected static void triggerJobManagerFailover(
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
afterFailAction.run();
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.oracle.testutils;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
Expand Down Expand Up @@ -61,11 +62,24 @@ public static void triggerFailover(
}
}

public static void ensureJmLeaderServiceExists(
HaLeadershipControl leadershipControl, JobID jobId) throws Exception {
EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl;

// Make sure JM leader service has been created, or an NPE might be thrown when we're
// triggering JM failover later.
control.getJobManagerLeaderElection(jobId).close();
}

public static void triggerJobManagerFailover(
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
haLeadershipControl.revokeJobMasterLeadership(jobId).get();

afterFailAction.run();

ensureJmLeaderServiceExists(haLeadershipControl, jobId);
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.postgres.testutils;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.table.api.TableResult;
Expand Down Expand Up @@ -71,11 +72,24 @@ public static void triggerFailover(
}
}

protected static void triggerJobManagerFailover(
public static void ensureJmLeaderServiceExists(
HaLeadershipControl leadershipControl, JobID jobId) throws Exception {
EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl;

// Make sure JM leader service has been created, or an NPE might be thrown when we're
// triggering JM failover later.
control.getJobManagerLeaderElection(jobId).close();
}

public static void triggerJobManagerFailover(
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
ensureJmLeaderServiceExists(haLeadershipControl, jobId);
haLeadershipControl.revokeJobMasterLeadership(jobId).get();

afterFailAction.run();

ensureJmLeaderServiceExists(haLeadershipControl, jobId);
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}

Expand Down