|  | 
| 22 | 22 | import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; | 
| 23 | 23 | import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy; | 
| 24 | 24 | import org.apache.flink.configuration.Configuration; | 
|  | 25 | +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; | 
| 25 | 26 | import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; | 
| 26 | 27 | import org.apache.flink.runtime.minicluster.MiniCluster; | 
| 27 | 28 | import org.apache.flink.runtime.minicluster.RpcServiceSharing; | 
| @@ -149,11 +150,22 @@ protected static void triggerFailover( | 
| 149 | 150 |         } | 
| 150 | 151 |     } | 
| 151 | 152 | 
 | 
|  | 153 | +    protected static void ensureJmLeaderServiceExists( | 
|  | 154 | +            HaLeadershipControl leadershipControl, JobID jobId) throws Exception { | 
|  | 155 | +        EmbeddedHaServices control = (EmbeddedHaServices) leadershipControl; | 
|  | 156 | + | 
|  | 157 | +        // Make sure JM leader service has been created, or an NPE might be thrown when we're | 
|  | 158 | +        // triggering JM failover later. | 
|  | 159 | +        control.getJobManagerLeaderElection(jobId).close(); | 
|  | 160 | +    } | 
|  | 161 | + | 
| 152 | 162 |     protected static void triggerJobManagerFailover( | 
| 153 | 163 |             JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { | 
| 154 | 164 |         final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); | 
|  | 165 | +        ensureJmLeaderServiceExists(haLeadershipControl, jobId); | 
| 155 | 166 |         haLeadershipControl.revokeJobMasterLeadership(jobId).get(); | 
| 156 | 167 |         afterFailAction.run(); | 
|  | 168 | +        ensureJmLeaderServiceExists(haLeadershipControl, jobId); | 
| 157 | 169 |         haLeadershipControl.grantJobMasterLeadership(jobId).get(); | 
| 158 | 170 |     } | 
| 159 | 171 | 
 | 
|  | 
0 commit comments