Skip to content

Commit 9bc5d85

Browse files
authored
[Feature] Support suspend in RayJob (#926)
Native Kubernetes Jobs have a suspend flag that allows to temporarily suspend a Job execution and resume it later, or start Jobs in a suspended state and have a custom controller, such as Kueue, decide later when to start them. So adding it to RayJob spec for consistency. Moreover, some frameworks like Kubeflow are adding it, so it becomes a standard functionality. An example implementation for MPIJob: kubeflow/mpi-operator#511 Implementation details If a RayJob is created with a spec.suspend == true, then RayCluster instance (with corresponding Kubernetes resources) is not created and the Ray job is not submitted to the cluster. The JobDeploymentStatus is set to Suspended and the corresponding event is issued. The RayJob remains in this state until somebody unsuspends the job. If suspend flips from true to false, then the RayJob controller immediately creates a RayCluster instance and submits the job. If suspend flips from false to true while Job is running, then the RayJob controller tries to gracefully stop the job and deletes the RayCluster instance (with underlying Kubernetes resources). The JobDeploymentStatus is set to Suspended; JobStatus is set to STOPPED and the corresponding event is issued. Edge case: suspend flag is ignored if a RayJob is submitted against an existing RayCluster instance (matched with ClusterSelector) since we can't delete a RayCluster created by somebody else. No Kueue-specific code leaked to Kuberay implementation Contributors from Kueue/Kubernetes cc'ed: @alculquicondor @mwielgus
1 parent 28d07c9 commit 9bc5d85

File tree

10 files changed

+466
-68
lines changed

10 files changed

+466
-68
lines changed

helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12060,6 +12060,10 @@ spec:
1206012060
description: ShutdownAfterJobFinishes will determine whether to delete
1206112061
the ray cluster once rayJob succeed or fai
1206212062
type: boolean
12063+
suspend:
12064+
description: suspend specifies whether the RayJob controller should
12065+
create a RayCluster instance If a job is appl
12066+
type: boolean
1206312067
ttlSecondsAfterFinished:
1206412068
description: TTLSecondsAfterFinished is the TTL to clean up RayCluster.
1206512069
format: int32

ray-operator/apis/ray/v1alpha1/rayjob_types.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const (
3939
JobDeploymentStatusRunning JobDeploymentStatus = "Running"
4040
JobDeploymentStatusFailedToGetJobStatus JobDeploymentStatus = "FailedToGetJobStatus"
4141
JobDeploymentStatusComplete JobDeploymentStatus = "Complete"
42+
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
4243
)
4344

4445
// RayJobSpec defines the desired state of RayJob
@@ -61,6 +62,12 @@ type RayJobSpec struct {
6162
RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"`
6263
// clusterSelector is used to select running rayclusters by labels
6364
ClusterSelector map[string]string `json:"clusterSelector,omitempty"`
65+
// suspend specifies whether the RayJob controller should create a RayCluster instance
66+
// If a job is applied with the suspend field set to true,
67+
// the RayCluster will not be created and will wait for the transition to false.
68+
// If the RayCluster is already created, it will be deleted.
69+
// In case of transition to false a new RayCluster will be created.
70+
Suspend bool `json:"suspend,omitempty"`
6471
}
6572

6673
// RayJobStatus defines the observed state of RayJob

ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12060,6 +12060,10 @@ spec:
1206012060
description: ShutdownAfterJobFinishes will determine whether to delete
1206112061
the ray cluster once rayJob succeed or fai
1206212062
type: boolean
12063+
suspend:
12064+
description: suspend specifies whether the RayJob controller should
12065+
create a RayCluster instance If a job is appl
12066+
type: boolean
1206312067
ttlSecondsAfterFinished:
1206412068
description: TTLSecondsAfterFinished is the TTL to clean up RayCluster.
1206512069
format: int32

ray-operator/config/samples/ray_v1alpha1_rayjob.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ kind: RayJob
33
metadata:
44
name: rayjob-sample
55
spec:
6+
suspend: false
67
entrypoint: python /home/ray/samples/sample_code.py
78
# runtimeEnv decoded to '{
89
# "pip": [

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 97 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"k8s.io/apimachinery/pkg/types"
1616
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1717
"sigs.k8s.io/controller-runtime/pkg/manager"
18+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
1819

1920
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
2021
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
@@ -92,7 +93,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
9293
if isJobPendingOrRunning(rayJobInstance.Status.JobStatus) {
9394
rayDashboardClient := utils.GetRayDashboardClientFunc()
9495
rayDashboardClient.InitClient(rayJobInstance.Status.DashboardURL)
95-
err := rayDashboardClient.StopJob(rayJobInstance.Status.JobId, &r.Log)
96+
err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId, &r.Log)
9697
if err != nil {
9798
r.Log.Info("Failed to stop job", "error", err)
9899
}
@@ -150,6 +151,20 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
150151
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedToGetOrCreateRayCluster, err)
151152
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
152153
}
154+
// If there is no cluster instance and no error suspend the job deployment
155+
if rayClusterInstance == nil {
156+
// Already suspended?
157+
if rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusSuspended {
158+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
159+
}
160+
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusSuspended, err)
161+
if err != nil {
162+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
163+
}
164+
r.Log.Info("rayJob suspended", "RayJob", rayJobInstance.Name)
165+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Suspended", "Suspended RayJob %s", rayJobInstance.Name)
166+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
167+
}
153168

154169
// Always update RayClusterStatus along with jobStatus and jobDeploymentStatus updates.
155170
rayJobInstance.Status.RayClusterStatus = rayClusterInstance.Status
@@ -178,7 +193,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
178193
}
179194

180195
// Check the current status of ray jobs before submitting.
181-
jobInfo, err := rayDashboardClient.GetJobInfo(rayJobInstance.Status.JobId)
196+
jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
182197
if err != nil {
183198
err = r.updateState(ctx, rayJobInstance, jobInfo, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus, err)
184199
// Dashboard service in head pod takes time to start, it's possible we get connection refused error.
@@ -189,7 +204,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
189204
r.Log.V(1).Info("RayJob information", "RayJob", rayJobInstance.Name, "jobInfo", jobInfo, "rayJobInstance", rayJobInstance.Status.JobStatus)
190205
if jobInfo == nil {
191206
// Submit the job if no id set
192-
jobId, err := rayDashboardClient.SubmitJob(rayJobInstance, &r.Log)
207+
jobId, err := rayDashboardClient.SubmitJob(ctx, rayJobInstance, &r.Log)
193208
if err != nil {
194209
r.Log.Error(err, "failed to submit job")
195210
err = r.updateState(ctx, rayJobInstance, jobInfo, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedJobDeploy, err)
@@ -213,9 +228,48 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
213228
return ctrl.Result{}, err
214229
}
215230

216-
// Job may takes long time to start and finish, let's just periodically requeue the job and check status.
217-
if isJobPendingOrRunning(jobInfo.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning {
218-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
231+
if rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning {
232+
// If suspend flag is set AND
233+
// the RayJob is submitted against the RayCluster created by THIS job, then
234+
// try to gracefully stop the Ray job and delete (suspend) the cluster
235+
if rayJobInstance.Spec.Suspend && len(rayJobInstance.Spec.ClusterSelector) == 0 {
236+
info, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
237+
if err != nil {
238+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
239+
}
240+
if !rayv1alpha1.IsJobTerminal(info.JobStatus) {
241+
err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId, &r.Log)
242+
if err != nil {
243+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
244+
}
245+
}
246+
if info.JobStatus != rayv1alpha1.JobStatusStopped {
247+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
248+
}
249+
250+
_, err = r.deleteCluster(ctx, rayJobInstance)
251+
if err != nil && !errors.IsNotFound(err) {
252+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
253+
}
254+
// Since RayCluster instance is gone, remove it status also
255+
// on RayJob resource
256+
rayJobInstance.Status.RayClusterStatus = rayv1alpha1.RayClusterStatus{}
257+
rayJobInstance.Status.RayClusterName = ""
258+
rayJobInstance.Status.DashboardURL = ""
259+
rayJobInstance.Status.JobId = ""
260+
rayJobInstance.Status.Message = ""
261+
err = r.updateState(ctx, rayJobInstance, jobInfo, rayv1alpha1.JobStatusStopped, rayv1alpha1.JobDeploymentStatusSuspended, nil)
262+
if err != nil {
263+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
264+
}
265+
r.Log.Info("rayJob suspended", "RayJob", rayJobInstance.Name)
266+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Suspended", "Suspended RayJob %s", rayJobInstance.Name)
267+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
268+
// Job may takes long time to start and finish, let's just periodically requeue the job and check status.
269+
}
270+
if isJobPendingOrRunning(jobInfo.JobStatus) {
271+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
272+
}
219273
}
220274

221275
// Let's use rayJobInstance.Status.JobStatus to make sure we only delete cluster after the CR is updated.
@@ -231,34 +285,38 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
231285
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
232286
}
233287
}
234-
235288
r.Log.Info("shutdownAfterJobFinishes set to true, we will delete cluster",
236289
"RayJob", rayJobInstance.Name, "clusterName", fmt.Sprintf("%s/%s", rayJobInstance.Namespace, rayJobInstance.Status.RayClusterName))
237-
clusterIdentifier := types.NamespacedName{
238-
Name: rayJobInstance.Status.RayClusterName,
239-
Namespace: rayJobInstance.Namespace,
240-
}
241-
cluster := rayv1alpha1.RayCluster{}
242-
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
243-
if !errors.IsNotFound(err) {
244-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
245-
}
246-
r.Log.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
247-
} else {
248-
if cluster.DeletionTimestamp != nil {
249-
r.Log.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
250-
} else {
251-
if err := r.Delete(ctx, &cluster); err != nil {
252-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
253-
}
254-
r.Log.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
255-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
256-
return ctrl.Result{Requeue: true}, nil
257-
}
258-
}
290+
return r.deleteCluster(ctx, rayJobInstance)
259291
}
260292
}
293+
return ctrl.Result{}, nil
294+
}
261295

296+
func (r *RayJobReconciler) deleteCluster(ctx context.Context, rayJobInstance *rayv1alpha1.RayJob) (reconcile.Result, error) {
297+
clusterIdentifier := types.NamespacedName{
298+
Name: rayJobInstance.Status.RayClusterName,
299+
Namespace: rayJobInstance.Namespace,
300+
}
301+
cluster := rayv1alpha1.RayCluster{}
302+
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
303+
if !errors.IsNotFound(err) {
304+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
305+
}
306+
r.Log.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
307+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
308+
} else {
309+
if cluster.DeletionTimestamp != nil {
310+
r.Log.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
311+
} else {
312+
if err := r.Delete(ctx, &cluster); err != nil {
313+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
314+
}
315+
r.Log.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
316+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
317+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
318+
}
319+
}
262320
return ctrl.Result{}, nil
263321
}
264322

@@ -343,7 +401,11 @@ func (r *RayJobReconciler) updateState(ctx context.Context, rayJob *rayv1alpha1.
343401
if jobInfo != nil {
344402
rayJob.Status.Message = jobInfo.Message
345403
rayJob.Status.StartTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.StartTime)
346-
rayJob.Status.EndTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.EndTime)
404+
if jobInfo.StartTime >= jobInfo.EndTime {
405+
rayJob.Status.EndTime = nil
406+
} else {
407+
rayJob.Status.EndTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.EndTime)
408+
}
347409
}
348410

349411
// TODO (kevin85421): ObservedGeneration should be used to determine whether update this CR or not.
@@ -391,11 +453,15 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
391453
return nil, err
392454
}
393455

394-
// one special case is the job is complete status and cluster has been recycled.
456+
// special case: is the job is complete status and cluster has been recycled.
395457
if isJobSucceedOrFailed(rayJobInstance.Status.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusComplete {
396458
r.Log.Info("The cluster has been recycled for the job, skip duplicate creation", "rayjob", rayJobInstance.Name)
397459
return nil, err
398460
}
461+
// special case: don't create a cluster instance and don't return an error if the suspend flag of the job is true
462+
if rayJobInstance.Spec.Suspend {
463+
return nil, nil
464+
}
399465

400466
r.Log.Info("RayCluster not found, creating rayCluster!", "raycluster", rayClusterNamespacedName)
401467
rayClusterInstance, err = r.constructRayClusterForRayJob(rayJobInstance, rayClusterInstanceName)

0 commit comments

Comments
 (0)