Skip to content

Commit 6e70fd2

Browse files
authored
[Fix] Directly fail if RayJob metadata is invalid (#3981)
* fix: directly raise error if validation fail Signed-off-by: machichima <[email protected]> * feat: directly return if any validation failed Signed-off-by: machichima <[email protected]> * test: validation fail and rayjob fail Signed-off-by: machichima <[email protected]> * test: invalid yaml fail rayjob Signed-off-by: machichima <[email protected]> * fix: wrap validate func into func in struct Signed-off-by: machichima <[email protected]> * refactor: validationErrors to validationRules Signed-off-by: machichima <[email protected]> * refactor: move validation to function Signed-off-by: machichima <[email protected]> * test: use exact error message match Signed-off-by: machichima <[email protected]> * fix: return error if return bool is false Signed-off-by: machichima <[email protected]> * feat: add validation fail status&keep update status in one place Signed-off-by: machichima <[email protected]> * test: use new status Signed-off-by: machichima <[email protected]> * fix: directly update status after validation fail Signed-off-by: machichima <[email protected]> * fix: move err msg prefix into the validation func Signed-off-by: machichima <[email protected]> * refactor: update status outside validation func * test: fix test fail * Trigger CI Signed-off-by: Nary <[email protected]> * refactor: update comment for updateJobStatus * test: add e2e test for rayjob validate * Trigger CI Signed-off-by: machichima <[email protected]> * refactor: use string.Repeat in test for better readability * test: remove unit test with too much details --------- Signed-off-by: machichima <[email protected]> Signed-off-by: Nary <[email protected]>
1 parent aae423a commit 6e70fd2

File tree

4 files changed

+91
-55
lines changed

4 files changed

+91
-55
lines changed

ray-operator/apis/ray/v1/rayjob_types.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,16 @@ func IsJobTerminal(status JobStatus) bool {
4545
type JobDeploymentStatus string
4646

4747
const (
48-
JobDeploymentStatusNew JobDeploymentStatus = ""
49-
JobDeploymentStatusInitializing JobDeploymentStatus = "Initializing"
50-
JobDeploymentStatusRunning JobDeploymentStatus = "Running"
51-
JobDeploymentStatusComplete JobDeploymentStatus = "Complete"
52-
JobDeploymentStatusFailed JobDeploymentStatus = "Failed"
53-
JobDeploymentStatusSuspending JobDeploymentStatus = "Suspending"
54-
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
55-
JobDeploymentStatusRetrying JobDeploymentStatus = "Retrying"
56-
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
48+
JobDeploymentStatusNew JobDeploymentStatus = ""
49+
JobDeploymentStatusInitializing JobDeploymentStatus = "Initializing"
50+
JobDeploymentStatusRunning JobDeploymentStatus = "Running"
51+
JobDeploymentStatusComplete JobDeploymentStatus = "Complete"
52+
JobDeploymentStatusFailed JobDeploymentStatus = "Failed"
53+
JobDeploymentStatusValidationFailed JobDeploymentStatus = "ValidationFailed"
54+
JobDeploymentStatusSuspending JobDeploymentStatus = "Suspending"
55+
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
56+
JobDeploymentStatusRetrying JobDeploymentStatus = "Retrying"
57+
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
5758
)
5859

5960
// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus
@@ -74,6 +75,7 @@ const (
7475
DeadlineExceeded JobFailedReason = "DeadlineExceeded"
7576
AppFailed JobFailedReason = "AppFailed"
7677
JobDeploymentStatusTransitionGracePeriodExceeded JobFailedReason = "JobDeploymentStatusTransitionGracePeriodExceeded"
78+
ValidationFailed JobFailedReason = "ValidationFailed"
7779
)
7880

7981
type JobSubmissionMode string

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -135,29 +135,28 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
135135
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
136136
}
137137

138-
if err := utils.ValidateRayJobMetadata(rayJobInstance.ObjectMeta); err != nil {
139-
logger.Error(err, "The RayJob metadata is invalid")
140-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.InvalidRayJobMetadata),
141-
"The RayJob metadata is invalid %s/%s: %v", rayJobInstance.Namespace, rayJobInstance.Name, err)
142-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
143-
}
138+
// Please do NOT modify `originalRayJobInstance` in the following code.
139+
originalRayJobInstance := rayJobInstance.DeepCopy()
144140

145-
if err := utils.ValidateRayJobSpec(rayJobInstance); err != nil {
146-
logger.Error(err, "The RayJob spec is invalid")
147-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.InvalidRayJobSpec),
148-
"The RayJob spec is invalid %s/%s: %v", rayJobInstance.Namespace, rayJobInstance.Name, err)
149-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
150-
}
141+
// Perform all validations and directly fail the RayJob if any of the validation fails
142+
errType, err := validateRayJob(ctx, rayJobInstance)
143+
// Immediately update the status after validation
144+
if err != nil {
145+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(errType),
146+
"%s/%s: %v", rayJobInstance.Namespace, rayJobInstance.Name, err)
151147

152-
if err := utils.ValidateRayJobStatus(rayJobInstance); err != nil {
153-
logger.Error(err, "The RayJob status is invalid")
154-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.InvalidRayJobStatus),
155-
"The RayJob status is invalid %s/%s: %v", rayJobInstance.Namespace, rayJobInstance.Name, err)
156-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
157-
}
148+
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusValidationFailed
149+
rayJobInstance.Status.Reason = rayv1.ValidationFailed
150+
rayJobInstance.Status.Message = err.Error()
158151

159-
// Please do NOT modify `originalRayJobInstance` in the following code.
160-
originalRayJobInstance := rayJobInstance.DeepCopy()
152+
// This is the only 2 places where we update the RayJob status. This will directly
153+
// update the JobDeploymentStatus to ValidationFailed if there's validation error
154+
if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil {
155+
logger.Info("Failed to update RayJob status", "error", err)
156+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
157+
}
158+
return ctrl.Result{}, nil
159+
}
161160

162161
logger.Info("RayJob", "JobStatus", rayJobInstance.Status.JobStatus, "JobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus, "SubmissionMode", rayJobInstance.Spec.SubmissionMode)
163162
switch rayJobInstance.Status.JobDeploymentStatus {
@@ -454,7 +453,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
454453
}
455454
checkBackoffLimitAndUpdateStatusIfNeeded(ctx, rayJobInstance)
456455

457-
// This is the only place where we update the RayJob status. Please do NOT add any code
456+
// This is the only 2 places where we update the RayJob status. Please do NOT add any code
458457
// between `checkBackoffLimitAndUpdateStatusIfNeeded` and the following code.
459458
if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil {
460459
logger.Info("Failed to update RayJob status", "error", err)
@@ -464,6 +463,26 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
464463
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
465464
}
466465

466+
func validateRayJob(ctx context.Context, rayJobInstance *rayv1.RayJob) (utils.K8sEventType, error) {
467+
logger := ctrl.LoggerFrom(ctx)
468+
validationRules := []struct {
469+
validate func() error
470+
errType utils.K8sEventType
471+
}{
472+
{func() error { return utils.ValidateRayJobMetadata(rayJobInstance.ObjectMeta) }, utils.InvalidRayJobMetadata},
473+
{func() error { return utils.ValidateRayJobSpec(rayJobInstance) }, utils.InvalidRayJobSpec},
474+
{func() error { return utils.ValidateRayJobStatus(rayJobInstance) }, utils.InvalidRayJobStatus},
475+
}
476+
477+
for _, validation := range validationRules {
478+
if err := validation.validate(); err != nil {
479+
logger.Error(err, err.Error())
480+
return validation.errType, err
481+
}
482+
}
483+
return "", nil
484+
}
485+
467486
func emitRayJobMetrics(rayJobMetricsManager *metrics.RayJobMetricsManager, rayJobName, rayJobNamespace string, rayJobUID types.UID, originalRayJobStatus, rayJobStatus rayv1.RayJobStatus) {
468487
if rayJobMetricsManager == nil {
469488
return

ray-operator/controllers/ray/utils/validation.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -134,17 +134,17 @@ func ValidateRayClusterSpec(spec *rayv1.RayClusterSpec, annotations map[string]s
134134

135135
func ValidateRayJobStatus(rayJob *rayv1.RayJob) error {
136136
if rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusWaiting && rayJob.Spec.SubmissionMode != rayv1.InteractiveMode {
137-
return fmt.Errorf("invalid RayJob State: JobDeploymentStatus cannot be `Waiting` when SubmissionMode is not InteractiveMode")
137+
return fmt.Errorf("The RayJob status is invalid: JobDeploymentStatus cannot be `Waiting` when SubmissionMode is not InteractiveMode")
138138
}
139139
return nil
140140
}
141141

142142
func ValidateRayJobMetadata(metadata metav1.ObjectMeta) error {
143143
if len(metadata.Name) > MaxRayJobNameLength {
144-
return fmt.Errorf("RayJob name should be no more than %d characters", MaxRayJobNameLength)
144+
return fmt.Errorf("The RayJob metadata is invalid: RayJob name should be no more than %d characters", MaxRayJobNameLength)
145145
}
146146
if errs := validation.IsDNS1035Label(metadata.Name); len(errs) > 0 {
147-
return fmt.Errorf("RayJob name should be a valid DNS1035 label: %v", errs)
147+
return fmt.Errorf("The RayJob metadata is invalid: RayJob name should be a valid DNS1035 label: %v", errs)
148148
}
149149
return nil
150150
}
@@ -154,23 +154,23 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
154154
// Kueue (https://kueue.sigs.k8s.io/docs/tasks/run_rayjobs/#c-limitations). For example, KubeRay allows users
155155
// to suspend a RayJob with autoscaling enabled, but Kueue doesn't.
156156
if rayJob.Spec.Suspend && !rayJob.Spec.ShutdownAfterJobFinishes {
157-
return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended")
157+
return fmt.Errorf("The RayJob spec is invalid: a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended")
158158
}
159159

160160
if rayJob.Spec.TTLSecondsAfterFinished < 0 {
161-
return fmt.Errorf("TTLSecondsAfterFinished must be a non-negative integer")
161+
return fmt.Errorf("The RayJob spec is invalid: TTLSecondsAfterFinished must be a non-negative integer")
162162
}
163163

164164
if !rayJob.Spec.ShutdownAfterJobFinishes && rayJob.Spec.TTLSecondsAfterFinished > 0 {
165-
return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false cannot have TTLSecondsAfterFinished")
165+
return fmt.Errorf("The RayJob spec is invalid: a RayJob with shutdownAfterJobFinishes set to false cannot have TTLSecondsAfterFinished")
166166
}
167167

168168
isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0
169169
if rayJob.Spec.Suspend && isClusterSelectorMode {
170-
return fmt.Errorf("the ClusterSelector mode doesn't support the suspend operation")
170+
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support the suspend operation")
171171
}
172172
if rayJob.Spec.RayClusterSpec == nil && !isClusterSelectorMode {
173-
return fmt.Errorf("one of RayClusterSpec or ClusterSelector must be set")
173+
return fmt.Errorf("The RayJob spec is invalid: one of RayClusterSpec or ClusterSelector must be set")
174174
}
175175
if isClusterSelectorMode {
176176
clusterName := rayJob.Spec.ClusterSelector[RayJobClusterSelectorKey]
@@ -187,7 +187,7 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
187187
// to avoid ambiguous state handling and unintended behavior.
188188
// https://github.com/ray-project/kuberay/issues/3525
189189
if rayJob.Spec.SubmissionMode == rayv1.InteractiveMode && rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit > 0 {
190-
return fmt.Errorf("BackoffLimit is incompatible with InteractiveMode")
190+
return fmt.Errorf("The RayJob spec is invalid: BackoffLimit is incompatible with InteractiveMode")
191191
}
192192

193193
if rayJob.Spec.SubmissionMode == rayv1.SidecarMode {
@@ -206,7 +206,7 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
206206

207207
if rayJob.Spec.RayClusterSpec != nil {
208208
if err := ValidateRayClusterSpec(rayJob.Spec.RayClusterSpec, rayJob.Annotations); err != nil {
209-
return err
209+
return fmt.Errorf("The RayJob spec is invalid: %w", err)
210210
}
211211
}
212212

@@ -216,49 +216,49 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
216216
return err
217217
}
218218
if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 {
219-
return fmt.Errorf("activeDeadlineSeconds must be a positive integer")
219+
return fmt.Errorf("The RayJob spec is invalid: activeDeadlineSeconds must be a positive integer")
220220
}
221221
if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 {
222-
return fmt.Errorf("backoffLimit must be a positive integer")
222+
return fmt.Errorf("The RayJob spec is invalid: backoffLimit must be a positive integer")
223223
}
224224
if !features.Enabled(features.RayJobDeletionPolicy) && rayJob.Spec.DeletionStrategy != nil {
225-
return fmt.Errorf("RayJobDeletionPolicy feature gate must be enabled to use the DeletionStrategy feature")
225+
return fmt.Errorf("The RayJob spec is invalid: RayJobDeletionPolicy feature gate must be enabled to use the DeletionStrategy feature")
226226
}
227227

228228
if rayJob.Spec.DeletionStrategy != nil {
229229
onSuccessPolicy := rayJob.Spec.DeletionStrategy.OnSuccess
230230
onFailurePolicy := rayJob.Spec.DeletionStrategy.OnFailure
231231

232232
if onSuccessPolicy.Policy == nil {
233-
return fmt.Errorf("the DeletionPolicyType field of DeletionStrategy.OnSuccess cannot be unset when DeletionStrategy is enabled")
233+
return fmt.Errorf("The RayJob spec is invalid: the DeletionPolicyType field of DeletionStrategy.OnSuccess cannot be unset when DeletionStrategy is enabled")
234234
}
235235
if onFailurePolicy.Policy == nil {
236-
return fmt.Errorf("the DeletionPolicyType field of DeletionStrategy.OnFailure cannot be unset when DeletionStrategy is enabled")
236+
return fmt.Errorf("The RayJob spec is invalid: the DeletionPolicyType field of DeletionStrategy.OnFailure cannot be unset when DeletionStrategy is enabled")
237237
}
238238

239239
if isClusterSelectorMode {
240240
switch *onSuccessPolicy.Policy {
241241
case rayv1.DeleteCluster:
242-
return fmt.Errorf("the ClusterSelector mode doesn't support DeletionStrategy=DeleteCluster on success")
242+
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support DeletionStrategy=DeleteCluster on success")
243243
case rayv1.DeleteWorkers:
244-
return fmt.Errorf("the ClusterSelector mode doesn't support DeletionStrategy=DeleteWorkers on success")
244+
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support DeletionStrategy=DeleteWorkers on success")
245245
}
246246

247247
switch *onFailurePolicy.Policy {
248248
case rayv1.DeleteCluster:
249-
return fmt.Errorf("the ClusterSelector mode doesn't support DeletionStrategy=DeleteCluster on failure")
249+
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support DeletionStrategy=DeleteCluster on failure")
250250
case rayv1.DeleteWorkers:
251-
return fmt.Errorf("the ClusterSelector mode doesn't support DeletionStrategy=DeleteWorkers on failure")
251+
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support DeletionStrategy=DeleteWorkers on failure")
252252
}
253253
}
254254

255255
if (*onSuccessPolicy.Policy == rayv1.DeleteWorkers || *onFailurePolicy.Policy == rayv1.DeleteWorkers) && IsAutoscalingEnabled(rayJob.Spec.RayClusterSpec) {
256256
// TODO (rueian): This can be supported in a future Ray version. We should check the RayVersion once we know it.
257-
return fmt.Errorf("DeletionStrategy=DeleteWorkers currently does not support RayCluster with autoscaling enabled")
257+
return fmt.Errorf("The RayJob spec is invalid: DeletionStrategy=DeleteWorkers currently does not support RayCluster with autoscaling enabled")
258258
}
259259

260260
if rayJob.Spec.ShutdownAfterJobFinishes && (*onSuccessPolicy.Policy == rayv1.DeleteNone || *onFailurePolicy.Policy == rayv1.DeleteNone) {
261-
return fmt.Errorf("shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'")
261+
return fmt.Errorf("The RayJob spec is invalid: shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'")
262262
}
263263
}
264264
return nil

ray-operator/test/e2erayjob/rayjob_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package e2erayjob
22

33
import (
4+
"strings"
45
"testing"
5-
"time"
66

77
. "github.com/onsi/gomega"
88
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -232,9 +232,24 @@ env_vars:
232232
g.Expect(err).NotTo(HaveOccurred())
233233
LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
234234

235-
// `RuntimeEnvYAML` is not a valid YAML string, so the RayJob controller will not do anything with the CR.
236-
g.Consistently(RayJob(test, rayJob.Namespace, rayJob.Name), 5*time.Second).
237-
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusNew)))
235+
// `RuntimeEnvYAML` is not a valid YAML string, so the RayJob controller should set status to ValidationFailed.
236+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort).
237+
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusValidationFailed)))
238+
})
239+
240+
test.T().Run("RayJob name too long with 48 characters", func(_ *testing.T) {
241+
rayJobAC := rayv1ac.RayJob(strings.Repeat("a", 48), namespace.Name).
242+
WithSpec(rayv1ac.RayJobSpec().
243+
WithEntrypoint("python /home/ray/jobs/counter.py").
244+
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
245+
246+
rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
247+
g.Expect(err).NotTo(HaveOccurred())
248+
LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
249+
250+
// Rayjob name is too long, so the RayJob controller should set status to ValidationFailed.
251+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort).
252+
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusValidationFailed)))
238253
})
239254

240255
test.T().Run("RayJob has passed ActiveDeadlineSeconds", func(_ *testing.T) {

0 commit comments

Comments
 (0)