Skip to content

Commit 13c54ea

Browse files
committed
Implement support for suspend semantics for MPIJob
1 parent c5a0c32 commit 13c54ea

File tree

12 files changed

+169
-42
lines changed

12 files changed

+169
-42
lines changed

crd/kubeflow.org_mpijobs.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7866,6 +7866,18 @@ spec:
78667866
format: int32
78677867
type: integer
78687868
type: object
7869+
suspend:
7870+
default: false
7871+
description: "suspend specifies whether the MPIJob controller
7872+
should create Pods or not. If a MPIJob is created with suspend
7873+
set to true, no Pods are created by the MPIJob controller. If
7874+
a MPIJob is suspended after creation (i.e. the flag goes from
7875+
false to true), the MPIJob controller will delete all active
7876+
Pods associated with this MPIJob. Also, it will suspend the
7877+
Launcher Job. Users must design their workload to gracefully
7878+
handle this. Suspending a Job will reset the StartTime field
7879+
of the MPIJob. \n Defaults to false."
7880+
type: boolean
78697881
ttlSecondsAfterFinished:
78707882
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
78717883
It may take extra ReconcilePeriod seconds for the cleanup, since

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
k8s.io/klog v1.0.0
1818
k8s.io/kube-openapi v0.0.0-20230109183929-3758b55a6596
1919
k8s.io/sample-controller v0.25.6
20+
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed
2021
sigs.k8s.io/controller-runtime v0.13.1
2122
volcano.sh/apis v1.7.0
2223
)
@@ -73,7 +74,6 @@ require (
7374
k8s.io/component-base v0.25.6 // indirect
7475
k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect
7576
k8s.io/klog/v2 v2.70.1 // indirect
76-
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
7777
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
7878
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
7979
sigs.k8s.io/yaml v1.3.0 // indirect

manifests/base/crd.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ spec:
4949
type: integer
5050
minimum: 0
5151
description: "Specifies the number of retries before marking the launcher Job as failed. Defaults to 6."
52+
suspend:
53+
type: boolean
5254
sshAuthMountPath:
5355
type: string
5456
mpiImplementation:
@@ -94,7 +96,7 @@ spec:
9496
properties:
9597
type:
9698
type: string
97-
enum: ["Created", "Running", "Restarting", "Succeeded", "Failed"]
99+
enum: ["Created", "Running", "Restarting", "Succeeded", "Suspended", "Failed"]
98100
status:
99101
type: string
100102
enum: ["True", "False", "Unknown"]

pkg/apis/kubeflow/v2beta1/openapi_generated.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/kubeflow/v2beta1/swagger.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,10 @@
295295
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
296296
"$ref": "#/definitions/v2beta1.SchedulingPolicy"
297297
},
298+
"suspend": {
299+
"description": "suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob.\n\nDefaults to false.",
300+
"type": "boolean"
301+
},
298302
"ttlSecondsAfterFinished": {
299303
"description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.",
300304
"type": "integer",

pkg/apis/kubeflow/v2beta1/types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,18 @@ type RunPolicy struct {
8484
// SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
8585
// +optional
8686
SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"`
87+
88+
// suspend specifies whether the MPIJob controller should create Pods or not.
89+
// If a MPIJob is created with suspend set to true, no Pods are created by
90+
// the MPIJob controller. If a MPIJob is suspended after creation (i.e. the
91+
// flag goes from false to true), the MPIJob controller will delete all
92+
// active Pods associated with this MPIJob. Also, it will suspend the
93+
// Launcher Job. Users must design their workload to gracefully handle this.
94+
// Suspending a Job will reset the StartTime field of the MPIJob.
95+
//
96+
// Defaults to false.
97+
// +kubebuilder:default:=false
98+
Suspend *bool `json:"suspend,omitempty"`
8799
}
88100

89101
type MPIJobSpec struct {
@@ -130,3 +142,8 @@ const (
130142
MPIImplementationOpenMPI MPIImplementation = "OpenMPI"
131143
MPIImplementationIntel MPIImplementation = "Intel"
132144
)
145+
146+
const (
147+
// JobSuspended means the job has been suspended.
148+
JobSuspended common.JobConditionType = "Suspended"
149+
)

pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/controller/mpi_job_controller.go

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"golang.org/x/crypto/ssh"
3434
batchv1 "k8s.io/api/batch/v1"
3535
corev1 "k8s.io/api/core/v1"
36+
v1 "k8s.io/api/core/v1"
3637
"k8s.io/apimachinery/pkg/api/equality"
3738
"k8s.io/apimachinery/pkg/api/errors"
3839
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -50,6 +51,7 @@ import (
5051
"k8s.io/client-go/tools/record"
5152
"k8s.io/client-go/util/workqueue"
5253
"k8s.io/klog"
54+
"k8s.io/utils/pointer"
5355
podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
5456
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
5557
podgroupsinformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
@@ -493,7 +495,7 @@ func (c *MPIJobController) syncHandler(key string) error {
493495

494496
if len(mpiJob.Status.Conditions) == 0 {
495497
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name)
496-
updateMPIJobConditions(mpiJob, common.JobCreated, mpiJobCreatedReason, msg)
498+
updateMPIJobConditions(mpiJob, common.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg)
497499
c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobCreated", msg)
498500
mpiJobsCreatedCount.Inc()
499501
}
@@ -503,24 +505,13 @@ func (c *MPIJobController) syncHandler(key string) error {
503505
// cleanup and stop retrying the MPIJob.
504506
if isFinished(mpiJob.Status) && mpiJob.Status.CompletionTime != nil {
505507
if isCleanUpPods(mpiJob.Spec.RunPolicy.CleanPodPolicy) {
506-
// set worker StatefulSet Replicas to 0.
507-
if err := c.deleteWorkerPods(mpiJob); err != nil {
508-
return err
509-
}
510-
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
511-
if c.gangSchedulerName != "" {
512-
if err := c.deletePodGroups(mpiJob); err != nil {
513-
return err
514-
}
515-
}
516-
mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0
517-
return c.updateStatusHandler(mpiJob)
508+
return cleanUpWorkerPods(mpiJob, c)
518509
}
519510
return nil
520511
}
521512

522513
// first set StartTime.
523-
if mpiJob.Status.StartTime == nil {
514+
if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) {
524515
now := metav1.Now()
525516
mpiJob.Status.StartTime = &now
526517
}
@@ -532,9 +523,10 @@ func (c *MPIJobController) syncHandler(key string) error {
532523
}
533524

534525
var worker []*corev1.Pod
535-
// We're done if the launcher either succeeded or failed.
536-
done := launcher != nil && isJobFinished(launcher)
537-
if !done {
526+
// We're done if the launcher either succeeded or failed. We also skip
527+
// creation of the auxiliary objects if the MPIJob is suspended.
528+
running := !(launcher != nil && isJobFinished(launcher)) && !isMPIJobSuspended(mpiJob)
529+
if running {
538530
_, err := c.getOrCreateService(mpiJob, newWorkersService(mpiJob))
539531
if err != nil {
540532
return fmt.Errorf("getting or creating Service to front workers: %w", err)
@@ -585,9 +577,40 @@ func (c *MPIJobController) syncHandler(key string) error {
585577
return err
586578
}
587579

580+
if launcher != nil {
581+
if isMPIJobSuspended(mpiJob) != isJobSuspended(launcher) {
582+
// align the suspension state of launcher with the MPIJob
583+
launcher.Spec.Suspend = pointer.Bool(isMPIJobSuspended(mpiJob))
584+
if _, err := c.kubeClient.BatchV1().Jobs(namespace).Update(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil {
585+
return err
586+
}
587+
}
588+
}
589+
590+
// cleanup the running worker pods if the MPI job is suspended
591+
if isMPIJobSuspended(mpiJob) {
592+
if err := cleanUpWorkerPods(mpiJob, c); err != nil {
593+
return err
594+
}
595+
}
588596
return nil
589597
}
590598

599+
func cleanUpWorkerPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error {
600+
// set worker StatefulSet Replicas to 0.
601+
if err := c.deleteWorkerPods(mpiJob); err != nil {
602+
return err
603+
}
604+
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
605+
if c.gangSchedulerName != "" {
606+
if err := c.deletePodGroups(mpiJob); err != nil {
607+
return err
608+
}
609+
}
610+
mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0
611+
return c.updateStatusHandler(mpiJob)
612+
}
613+
591614
// getLauncherJob gets the launcher Job controlled by this MPIJob.
592615
func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) {
593616
launcher, err := c.jobLister.Jobs(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix)
@@ -857,6 +880,14 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
857880
return workerPods, nil
858881
}
859882

883+
func isMPIJobSuspended(mpiJob *kubeflow.MPIJob) bool {
884+
return pointer.BoolDeref(mpiJob.Spec.RunPolicy.Suspend, false)
885+
}
886+
887+
func isJobSuspended(job *batchv1.Job) bool {
888+
return pointer.BoolDeref(job.Spec.Suspend, false)
889+
}
890+
860891
func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {
861892
var (
862893
workerPrefix = mpiJob.Name + workerSuffix
@@ -905,6 +936,19 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
905936
if err != nil {
906937
return fmt.Errorf("checking launcher pods running: %w", err)
907938
}
939+
if isMPIJobSuspended(mpiJob) {
940+
// it is suspended now
941+
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "MPIJobSuspended", "MPIJob suspended") {
942+
c.recorder.Event(mpiJob, corev1.EventTypeNormal, "Suspended", "MPIJob suspended")
943+
}
944+
} else if getCondition(mpiJob.Status, kubeflow.JobSuspended) != nil {
945+
// it is not suspended now, consider resumed if the condition was set before
946+
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "MPIJobResumed", "MPIJob resumed") {
947+
c.recorder.Event(mpiJob, corev1.EventTypeNormal, "Resumed", "MPIJob resumed")
948+
now := metav1.NewTime(time.Now())
949+
mpiJob.Status.StartTime = &now
950+
}
951+
}
908952
// Job.status.Active accounts for Pending and Running pods. Count running pods
909953
// from the lister instead.
910954
launcherPodsCnt := countRunningPods(launcherPods)
@@ -919,7 +963,7 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
919963
if mpiJob.Status.CompletionTime == nil {
920964
mpiJob.Status.CompletionTime = launcher.Status.CompletionTime
921965
}
922-
updateMPIJobConditions(mpiJob, common.JobSucceeded, mpiJobSucceededReason, msg)
966+
updateMPIJobConditions(mpiJob, common.JobSucceeded, v1.ConditionTrue, mpiJobSucceededReason, msg)
923967
mpiJobsSuccessCount.Inc()
924968
} else if isJobFailed(launcher) {
925969
c.updateMPIJobFailedStatus(mpiJob, launcher, launcherPods)
@@ -953,13 +997,13 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
953997
if evict > 0 {
954998
msg := fmt.Sprintf("%d/%d workers are evicted", evict, len(worker))
955999
klog.Infof("MPIJob <%s/%s>: %v", mpiJob.Namespace, mpiJob.Name, msg)
956-
updateMPIJobConditions(mpiJob, common.JobFailed, mpiJobEvict, msg)
1000+
updateMPIJobConditions(mpiJob, common.JobFailed, v1.ConditionTrue, mpiJobEvict, msg)
9571001
c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg)
9581002
}
9591003

9601004
if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) {
9611005
msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
962-
updateMPIJobConditions(mpiJob, common.JobRunning, mpiJobRunningReason, msg)
1006+
updateMPIJobConditions(mpiJob, common.JobRunning, v1.ConditionTrue, mpiJobRunningReason, msg)
9631007
c.recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name)
9641008
}
9651009

@@ -999,7 +1043,7 @@ func (c *MPIJobController) updateMPIJobFailedStatus(mpiJob *kubeflow.MPIJob, lau
9991043
now := metav1.Now()
10001044
mpiJob.Status.CompletionTime = &now
10011045
}
1002-
updateMPIJobConditions(mpiJob, common.JobFailed, reason, msg)
1046+
updateMPIJobConditions(mpiJob, common.JobFailed, v1.ConditionTrue, reason, msg)
10031047
mpiJobsFailureCount.Inc()
10041048
}
10051049

@@ -1304,7 +1348,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
13041348
}
13051349

13061350
func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job {
1307-
return &batchv1.Job{
1351+
job := &batchv1.Job{
13081352
ObjectMeta: metav1.ObjectMeta{
13091353
Name: mpiJob.Name + launcherSuffix,
13101354
Namespace: mpiJob.Namespace,
@@ -1322,6 +1366,10 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job
13221366
Template: c.newLauncherPodTemplate(mpiJob),
13231367
},
13241368
}
1369+
if isMPIJobSuspended(mpiJob) {
1370+
job.Spec.Suspend = pointer.Bool(true)
1371+
}
1372+
return job
13251373
}
13261374

13271375
// newLauncherPodTemplate creates a new launcher Job for an MPIJob resource. It also sets

pkg/controller/mpi_job_controller_status.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,16 @@ func initializeMPIJobStatuses(mpiJob *kubeflow.MPIJob, mtype kubeflow.MPIReplica
4646
}
4747

4848
// updateMPIJobConditions updates the conditions of the given mpiJob.
49-
func updateMPIJobConditions(mpiJob *kubeflow.MPIJob, conditionType common.JobConditionType, reason, message string) {
50-
condition := newCondition(conditionType, reason, message)
51-
setCondition(&mpiJob.Status, condition)
49+
func updateMPIJobConditions(mpiJob *kubeflow.MPIJob, conditionType common.JobConditionType, status v1.ConditionStatus, reason, message string) bool {
50+
condition := newCondition(conditionType, status, reason, message)
51+
return setCondition(&mpiJob.Status, condition)
5252
}
5353

5454
// newCondition creates a new mpiJob condition.
55-
func newCondition(conditionType common.JobConditionType, reason, message string) common.JobCondition {
55+
func newCondition(conditionType common.JobConditionType, status v1.ConditionStatus, reason, message string) common.JobCondition {
5656
return common.JobCondition{
5757
Type: conditionType,
58-
Status: v1.ConditionTrue,
58+
Status: status,
5959
LastUpdateTime: metav1.Now(),
6060
LastTransitionTime: metav1.Now(),
6161
Reason: reason,
@@ -97,13 +97,14 @@ func isFailed(status common.JobStatus) bool {
9797
// setCondition updates the mpiJob to include the provided condition.
9898
// If the condition that we are about to add already exists
9999
// and has the same status and reason then we are not going to update.
100-
func setCondition(status *common.JobStatus, condition common.JobCondition) {
100+
// Returns true if the condition is actually updated. False otherwise.
101+
func setCondition(status *common.JobStatus, condition common.JobCondition) bool {
101102

102103
currentCond := getCondition(*status, condition.Type)
103104

104105
// Do nothing if condition doesn't change
105106
if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
106-
return
107+
return false
107108
}
108109

109110
// Do not update lastTransitionTime if the status of the condition doesn't change.
@@ -114,6 +115,7 @@ func setCondition(status *common.JobStatus, condition common.JobCondition) {
114115
// Append the updated condition
115116
newConditions := filterOutCondition(status.Conditions, condition.Type)
116117
status.Conditions = append(newConditions, condition)
118+
return true
117119
}
118120

119121
// filterOutCondition returns a new slice of mpiJob conditions without conditions with the provided type.

0 commit comments

Comments
 (0)