diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java similarity index 91% rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java rename to flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java index adb37e7c27..5f5392d581 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.config; +package org.apache.flink.kubernetes.operator.api; -import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; /** The mode of {@link FlinkDeployment}. */ @@ -45,7 +44,7 @@ public static Mode getMode(FlinkDeployment flinkApp) { : getMode(lastReconciledSpec); } - private static Mode getMode(FlinkDeploymentSpec spec) { + public static Mode getMode(FlinkDeploymentSpec spec) { return spec.getJob() != null ? APPLICATION : SESSION; } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index 136d3415f3..fd2780f37d 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -21,6 +21,7 @@ import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.fabric8.kubernetes.api.model.Condition; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -28,7 +29,9 @@ import lombok.ToString; import lombok.experimental.SuperBuilder; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** Last observed status of the Flink deployment. */ @@ -55,4 +58,7 @@ public class FlinkDeploymentStatus extends CommonStatus { /** Information about the TaskManagers for the scale subresource. */ private TaskManagerInfo taskManager; + + /** Condition of the CR . */ + private List conditions = new ArrayList<>(); } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java index 54a0181bc0..faecf29f85 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java @@ -21,18 +21,36 @@ public enum JobManagerDeploymentStatus { /** JobManager is running and ready to receive REST API calls. */ - READY, + READY("JobManagerReady", "JobManager is running and ready to receive REST API calls"), /** JobManager is running but not ready yet to receive REST API calls. */ - DEPLOYED_NOT_READY, + DEPLOYED_NOT_READY( + "DeployedNotReady", + "JobManager is running but not yet ready to receive REST API calls"), /** JobManager process is starting up. */ - DEPLOYING, + DEPLOYING("JobManagerIsDeploying", "JobManager process is starting up"), /** JobManager deployment not found, probably not started or killed by user. */ // TODO: currently a mix of SUSPENDED and ERROR, needs cleanup - MISSING, + MISSING("JobManagerDeploymentMissing", "JobManager deployment not found"), /** Deployment in terminal error, requires spec change for reconciliation to continue. */ - ERROR; + ERROR("Error", "JobManager deployment failed"); + + private String reason; + private String message; + + JobManagerDeploymentStatus(String reason, String message) { + this.reason = reason; + this.message = message; + } + + public String getReason() { + return reason; + } + + public String getMessage() { + return message; + } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java new file mode 100644 index 0000000000..8428b70a52 --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.utils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.kubernetes.operator.api.Mode; +import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; + +import io.fabric8.kubernetes.api.model.Condition; +import io.fabric8.kubernetes.api.model.ConditionBuilder; + +import java.time.Instant; +import java.util.List; + +import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus.READY; + +/** Creates a condition object with the type, status, message and reason. */ +public class ConditionUtils { + public static final String CONDITION_TYPE_RUNNING = "Running"; + + /** + * Creates a List of Condition object based on the provided FlinkDeploymentStatus. + * + * @param flinkDeploymentStatus the FlinkDeploymentStatus object containing job status + * information + * @return a list of Condition object representing the current status of the Flink deployment + */ + public static List createConditionFromStatus( + FlinkDeploymentStatus flinkDeploymentStatus) { + + FlinkDeploymentReconciliationStatus reconciliationStatus = + flinkDeploymentStatus.getReconciliationStatus(); + Condition conditionToAdd = null; + + if (reconciliationStatus != null) { + FlinkDeploymentSpec deploymentSpec = + reconciliationStatus.deserializeLastReconciledSpec(); + + if (deploymentSpec != null) { + switch (Mode.getMode(deploymentSpec)) { + case APPLICATION: + conditionToAdd = + getApplicationModeCondition( + flinkDeploymentStatus.getJobStatus().getState()); + break; + case SESSION: + conditionToAdd = + getSessionModeCondition( + flinkDeploymentStatus.getJobManagerDeploymentStatus()); + } + updateLastTransitionTime(flinkDeploymentStatus.getConditions(), conditionToAdd); + } + } + return conditionToAdd == null ? List.of() : List.of(conditionToAdd); + } + + private static void updateLastTransitionTime(List conditions, Condition condition) { + if (condition == null) { + return; + } + Condition existingCondition = + conditions.stream() + .filter(c -> c.getType().equals(condition.getType())) + .findFirst() + .orElse(null); + condition.setLastTransitionTime(getLastTransitionTimeStamp(existingCondition, condition)); + } + + private static Condition getApplicationModeCondition(JobStatus jobStatus) { + return new ConditionBuilder() + .withType(CONDITION_TYPE_RUNNING) + .withStatus(jobStatus == RUNNING ? "True" : "False") + .withReason(toCamelCase(jobStatus.name())) + .withMessage("Job status " + jobStatus.name()) + .build(); + } + + private static Condition getSessionModeCondition(JobManagerDeploymentStatus jmStatus) { + return new ConditionBuilder() + .withType(CONDITION_TYPE_RUNNING) + .withStatus(jmStatus == READY ? "True" : "False") + .withReason(jmStatus.getReason()) + .withMessage(jmStatus.getMessage()) + .build(); + } + + /** + * Reason in the condition object should be a CamelCase string, so need to convert JobStatus as + * all the keywords are one noun, so we only need to upper case the first letter. + * + * @return CamelCase reason as String + */ + private static String toCamelCase(String reason) { + reason = reason.toLowerCase(); + return reason.substring(0, 1).toUpperCase() + reason.substring(1); + } + + /** + * get the last transition time for the condition , returns the current time if there is no + * existing condition or if the condition status has changed, otherwise returns existing + * condition LastTransitionTime. + * + * @param existingCondition The current condition object, may be null. + * @param condition The new condition object to compare against the existing one. + * @return A string representing the last transition time in ISO 8601 format with nanosecond + * precision (e.g., "2025-10-30T07:35:35.189752790Z"). Returns a new timestamp if the + * existing condition is null or the status has changed, otherwise returns the last + * transition time of the existing condition. + */ + private static String getLastTransitionTimeStamp( + Condition existingCondition, Condition condition) { + String lastTransitionTime; + if (existingCondition == null + || !existingCondition.getStatus().equals(condition.getStatus())) { + lastTransitionTime = Instant.now().toString(); + } else { + lastTransitionTime = existingCondition.getLastTransitionTime(); + } + return lastTransitionTime; + } +} diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java index f3bd54f2f2..5a47bbd207 100644 --- a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java +++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.api.utils; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -39,8 +40,10 @@ import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.CheckpointType; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; @@ -49,6 +52,7 @@ import io.fabric8.kubernetes.api.model.PodTemplateSpec; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -268,4 +272,43 @@ public static FlinkStateSnapshot buildFlinkStateSnapshotCheckpoint( return snapshot; } + + public static FlinkDeploymentStatus createApplicationModeStatus(JobStatus jobStatus) { + FlinkDeploymentStatus status = new FlinkDeploymentStatus(); + org.apache.flink.kubernetes.operator.api.status.JobStatus flinkJobStatus = + new org.apache.flink.kubernetes.operator.api.status.JobStatus(); + flinkJobStatus.setState(jobStatus); + status.setJobStatus(flinkJobStatus); + status.setConditions(new ArrayList<>()); + + FlinkDeploymentReconciliationStatus reconciliationStatus = + new FlinkDeploymentReconciliationStatus(); + + FlinkDeployment deployment = BaseTestUtils.buildApplicationCluster(); + String serializedSpec = SpecUtils.writeSpecWithMeta(deployment.getSpec(), deployment); + reconciliationStatus.setLastReconciledSpec(serializedSpec); + + status.setReconciliationStatus(reconciliationStatus); + status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); + + return status; + } + + public static FlinkDeploymentStatus createSessionModeStatus( + JobManagerDeploymentStatus jmStatus) { + FlinkDeploymentStatus status = new FlinkDeploymentStatus(); + status.setJobManagerDeploymentStatus(jmStatus); + status.setConditions(new ArrayList<>()); + + FlinkDeploymentReconciliationStatus reconciliationStatus = + new FlinkDeploymentReconciliationStatus(); + + FlinkDeployment deployment = BaseTestUtils.buildSessionCluster(); + String serializedSpec = SpecUtils.writeSpecWithMeta(deployment.getSpec(), deployment); + reconciliationStatus.setLastReconciledSpec(serializedSpec); + + status.setReconciliationStatus(reconciliationStatus); + + return status; + } } diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtilsTest.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtilsTest.java new file mode 100644 index 0000000000..d510d121d1 --- /dev/null +++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtilsTest.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.utils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; + +import io.fabric8.kubernetes.api.model.Condition; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for {@link ConditionUtils}. */ +class ConditionUtilsTest { + + /** + * Helper method to get a condition by type from a list of conditions. + * + * @param conditions the list of conditions + * @param type the condition type to find + * @return the condition with the specified type, or null if not found + */ + private Condition getConditionByType(List conditions, String type) { + return conditions.stream().filter(c -> type.equals(c.getType())).findFirst().orElse(null); + } + + @Test + void testCreateConditionFromStatusWithNullReconciliationStatus() { + FlinkDeploymentStatus status = new FlinkDeploymentStatus(); + status.setReconciliationStatus(null); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertTrue( + conditions.isEmpty(), + "Should return empty list when reconciliation status is null"); + } + + @Test + void testCreateConditionFromStatusWithNullDeploymentSpec() { + FlinkDeploymentStatus status = new FlinkDeploymentStatus(); + FlinkDeploymentReconciliationStatus reconciliationStatus = + new FlinkDeploymentReconciliationStatus(); + status.setReconciliationStatus(reconciliationStatus); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertTrue(conditions.isEmpty(), "Should return empty list when deployment spec is null"); + } + + @Test + void testApplicationModeConditionWithRunningJob() { + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size(), "Should return one condition"); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, condition.getType()); + assertEquals("True", condition.getStatus()); + assertEquals("Running", condition.getReason()); + assertEquals("Job status RUNNING", condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testApplicationModeConditionWithFailedJob() { + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.FAILED); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, condition.getType()); + assertEquals("False", condition.getStatus()); + assertEquals("Failed", condition.getReason()); + assertEquals("Job status FAILED", condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testApplicationModeConditionWithCanceledJob() { + FlinkDeploymentStatus status = + BaseTestUtils.createApplicationModeStatus(JobStatus.CANCELED); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertEquals("Canceled", condition.getReason()); + assertEquals("Job status CANCELED", condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testApplicationModeConditionWithFinishedJob() { + FlinkDeploymentStatus status = + BaseTestUtils.createApplicationModeStatus(JobStatus.FINISHED); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertEquals("Finished", condition.getReason()); + assertEquals("Job status FINISHED", condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testApplicationModeConditionWithCreatedJob() { + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.CREATED); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertEquals("Created", condition.getReason()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testSessionModeConditionWithReadyJobManager() { + FlinkDeploymentStatus status = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, condition.getType()); + assertEquals("True", condition.getStatus()); + assertEquals(JobManagerDeploymentStatus.READY.getReason(), condition.getReason()); + assertEquals(JobManagerDeploymentStatus.READY.getMessage(), condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testSessionModeConditionWithDeployingJobManager() { + FlinkDeploymentStatus status = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.DEPLOYING); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertEquals(JobManagerDeploymentStatus.DEPLOYING.getReason(), condition.getReason()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testSessionModeConditionWithMissingJobManager() { + FlinkDeploymentStatus status = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.MISSING); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testConditionTypeIsAlwaysRunning() { + // Test application mode + FlinkDeploymentStatus appStatus = + BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING); + List appConditions = ConditionUtils.createConditionFromStatus(appStatus); + Condition appCondition = + getConditionByType(appConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(appCondition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, appCondition.getType()); + + // Test session mode + FlinkDeploymentStatus sessionStatus = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY); + List sessionConditions = ConditionUtils.createConditionFromStatus(sessionStatus); + Condition sessionCondition = + getConditionByType(sessionConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(sessionCondition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, sessionCondition.getType()); + } + + @Test + void testGetLastTransitionTimeStamp_StatusUnchanged() throws InterruptedException { + // Test that timestamp is preserved when status doesn't change + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING); + + // First call - creates initial condition with timestamp + List firstConditions = ConditionUtils.createConditionFromStatus(status); + Condition firstCondition = + getConditionByType(firstConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(firstCondition); + String firstTimestamp = firstCondition.getLastTransitionTime(); + + // Add the condition to status + status.getConditions().add(firstCondition); + + // Second call - status unchanged (still RUNNING) + List secondConditions = ConditionUtils.createConditionFromStatus(status); + Condition secondCondition = + getConditionByType(secondConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(secondCondition); + String secondTimestamp = secondCondition.getLastTransitionTime(); + + // Timestamp should be preserved since status didn't change + assertEquals(firstTimestamp, secondTimestamp); + } + + @Test + void testGetLastTransitionTimeStamp_StatusChanged() throws InterruptedException { + // Test that timestamp is updated when status changes + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING); + + // First call - creates initial condition with RUNNING status + List firstConditions = ConditionUtils.createConditionFromStatus(status); + Condition firstCondition = + getConditionByType(firstConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(firstCondition); + String firstTimestamp = firstCondition.getLastTransitionTime(); + + // Add the condition to status + status.getConditions().add(firstCondition); + + // Change status to FAILED + status.getJobStatus().setState(JobStatus.FAILED); + + // Second call - status changed to FAILED + List secondConditions = ConditionUtils.createConditionFromStatus(status); + Condition secondCondition = + getConditionByType(secondConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(secondCondition); + String secondTimestamp = secondCondition.getLastTransitionTime(); + + // Timestamp should be different since status changed + assertTrue( + !firstTimestamp.equals(secondTimestamp), + "Timestamp should be updated when status changes"); + } + + @Test + void testGetLastTransitionTimeStamp_SessionMode() throws InterruptedException { + // Test timestamp behavior in session mode + FlinkDeploymentStatus status = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY); + + // First call + List firstConditions = ConditionUtils.createConditionFromStatus(status); + Condition firstCondition = + getConditionByType(firstConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(firstCondition); + String firstTimestamp = firstCondition.getLastTransitionTime(); + + // Add condition to status + status.getConditions().add(firstCondition); + + // Second call with same status + List secondConditions = ConditionUtils.createConditionFromStatus(status); + Condition secondCondition = + getConditionByType(secondConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(secondCondition); + String secondTimestamp = secondCondition.getLastTransitionTime(); + // Timestamp should be preserved + assertEquals(firstTimestamp, secondTimestamp); + + // Change to DEPLOYING + status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); + + // Third call with changed status + List thirdConditions = ConditionUtils.createConditionFromStatus(status); + Condition thirdCondition = + getConditionByType(thirdConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(thirdCondition); + String thirdTimestamp = thirdCondition.getLastTransitionTime(); + // Timestamp should be updated + assertTrue( + !secondTimestamp.equals(thirdTimestamp), + "Timestamp should be updated when JobManager status changes"); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 32a3109d12..2fa6cae1ec 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.utils.ConditionUtils; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; @@ -165,6 +166,8 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex throw new ReconciliationException(e); } + flinkApp.getStatus() + .setConditions(ConditionUtils.createConditionFromStatus(flinkApp.getStatus())); LOG.debug("End of reconciliation"); statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient()); return ReconciliationUtils.toUpdateControl( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java index c5e1124184..6facfb16ec 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java @@ -19,8 +19,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.utils.EventRecorder; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java index 429caaa657..3a389a79e7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java @@ -20,10 +20,10 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.reconciler.Reconciler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java index 7c483f7f42..ac3dab3382 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java @@ -27,10 +27,10 @@ import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient; import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 4d379139b6..65cdfaf050 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -130,7 +130,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - assertEquals(7, testController.getInternalStatusUpdateCount()); + assertEquals(8, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isPatchStatus()); FlinkDeploymentReconciliationStatus reconciliationStatus = @@ -277,6 +277,8 @@ public void verifyFailedDeployment() throws Exception { validatingResponseProvider.assertValidated(); + validateConditionStatus(appCluster, "Reconciling"); + // Validate status assertNotNull(appCluster.getStatus().getError()); @@ -359,7 +361,7 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro appCluster.getStatus().getJobManagerDeploymentStatus()); assertEquals( "savepoint_1", appCluster.getStatus().getJobStatus().getUpgradeSavepointPath()); - + validateConditionStatus(appCluster, "Finished"); // Resume from last savepoint appCluster.getSpec().getJob().setState(JobState.RUNNING); testController.reconcile(appCluster, context); @@ -618,6 +620,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception // jobStatus has not been set at this time assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING, jobStatus.getState()); + validateConditionStatus(appCluster, "Reconciling"); // Switches operator mode to SESSION appCluster.getSpec().setJob(null); // Validation fails and JobObserver should still be used @@ -639,6 +642,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId()); assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName()); assertEquals(expectedJobStatus.getJobState(), jobStatus.getState()); + validateConditionStatus(appCluster, "Running"); } @Test @@ -652,6 +656,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertEquals( JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); + validateConditionStatus(appCluster, "JobManagerIsDeploying"); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); @@ -662,6 +667,8 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except // jobStatus has not been set at this time assertNull(jobStatus.getState()); + validateConditionStatus(appCluster, "DeployedNotReady"); + // Switches operator mode to APPLICATION appCluster.getSpec().setJob(TestUtils.buildSessionJob().getSpec().getJob()); // Validation fails and JobObserver should still be used @@ -676,6 +683,8 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except .getError() .contains("Cannot switch from session to job cluster")); assertNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob()); + + validateConditionStatus(appCluster, "JobManagerReady"); } private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception { @@ -1169,7 +1178,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - assertEquals(6, testController.getInternalStatusUpdateCount()); + assertEquals(7, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( @@ -1184,7 +1193,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - assertEquals(6, testController.getInternalStatusUpdateCount()); + assertEquals(7, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( @@ -1254,4 +1263,12 @@ private String getIngressHost(HasMetadata ingress) { return ingressRuleV1beta1.getHost(); } } + + private void validateConditionStatus(FlinkDeployment appCluster, String reason) { + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .hasSize(1) + .extracting("reason") + .contains(reason); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java index fde2b12421..a871466bd0 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java @@ -24,11 +24,11 @@ import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; import org.apache.flink.util.concurrent.Executors; diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index 33ab49e82d..c81b2b10f2 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -10685,6 +10685,23 @@ spec: additionalProperties: type: string type: object + conditions: + items: + properties: + lastTransitionTime: + type: string + message: + type: string + observedGeneration: + type: integer + reason: + type: string + status: + type: string + type: + type: string + type: object + type: array error: type: string jobManagerDeploymentStatus: