@@ -33,6 +33,7 @@ import (
33
33
"golang.org/x/crypto/ssh"
34
34
batchv1 "k8s.io/api/batch/v1"
35
35
corev1 "k8s.io/api/core/v1"
36
+ v1 "k8s.io/api/core/v1"
36
37
"k8s.io/apimachinery/pkg/api/equality"
37
38
"k8s.io/apimachinery/pkg/api/errors"
38
39
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -50,6 +51,7 @@ import (
50
51
"k8s.io/client-go/tools/record"
51
52
"k8s.io/client-go/util/workqueue"
52
53
"k8s.io/klog"
54
+ "k8s.io/utils/pointer"
53
55
podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
54
56
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
55
57
podgroupsinformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
@@ -493,7 +495,7 @@ func (c *MPIJobController) syncHandler(key string) error {
493
495
494
496
if len (mpiJob .Status .Conditions ) == 0 {
495
497
msg := fmt .Sprintf ("MPIJob %s/%s is created." , mpiJob .Namespace , mpiJob .Name )
496
- updateMPIJobConditions (mpiJob , kubeflow .JobCreated , mpiJobCreatedReason , msg )
498
+ updateMPIJobConditions (mpiJob , kubeflow .JobCreated , v1 . ConditionTrue , mpiJobCreatedReason , msg )
497
499
c .recorder .Event (mpiJob , corev1 .EventTypeNormal , "MPIJobCreated" , msg )
498
500
mpiJobsCreatedCount .Inc ()
499
501
}
@@ -503,24 +505,13 @@ func (c *MPIJobController) syncHandler(key string) error {
503
505
// cleanup and stop retrying the MPIJob.
504
506
if isFinished (mpiJob .Status ) && mpiJob .Status .CompletionTime != nil {
505
507
if isCleanUpPods (mpiJob .Spec .RunPolicy .CleanPodPolicy ) {
506
- // set worker StatefulSet Replicas to 0.
507
- if err := c .deleteWorkerPods (mpiJob ); err != nil {
508
- return err
509
- }
510
- initializeMPIJobStatuses (mpiJob , kubeflow .MPIReplicaTypeWorker )
511
- if c .gangSchedulerName != "" {
512
- if err := c .deletePodGroups (mpiJob ); err != nil {
513
- return err
514
- }
515
- }
516
- mpiJob .Status .ReplicaStatuses [kubeflow .MPIReplicaTypeWorker ].Active = 0
517
- return c .updateStatusHandler (mpiJob )
508
+ return cleanUpWorkerPods (mpiJob , c )
518
509
}
519
510
return nil
520
511
}
521
512
522
513
// first set StartTime.
523
- if mpiJob .Status .StartTime == nil {
514
+ if mpiJob .Status .StartTime == nil && ! isMPIJobSuspended ( mpiJob ) {
524
515
now := metav1 .Now ()
525
516
mpiJob .Status .StartTime = & now
526
517
}
@@ -549,17 +540,18 @@ func (c *MPIJobController) syncHandler(key string) error {
549
540
return fmt .Errorf ("creating SSH auth secret: %w" , err )
550
541
}
551
542
552
- // Get the PodGroup for this MPIJob
553
- if c .gangSchedulerName != "" {
554
- if podgroup , err := c .getOrCreatePodGroups (mpiJob , workerReplicas (mpiJob )+ 1 ); podgroup == nil || err != nil {
543
+ if ! isMPIJobSuspended (mpiJob ) {
544
+ // Get the PodGroup for this MPIJob
545
+ if c .gangSchedulerName != "" {
546
+ if podgroup , err := c .getOrCreatePodGroups (mpiJob , workerReplicas (mpiJob )+ 1 ); podgroup == nil || err != nil {
547
+ return err
548
+ }
549
+ }
550
+ worker , err = c .getOrCreateWorker (mpiJob )
551
+ if err != nil {
555
552
return err
556
553
}
557
554
}
558
-
559
- worker , err = c .getOrCreateWorker (mpiJob )
560
- if err != nil {
561
- return err
562
- }
563
555
if mpiJob .Spec .MPIImplementation == kubeflow .MPIImplementationIntel {
564
556
// The Intel implementation requires workers to communicate with the
565
557
// launcher through its hostname. For that, we create a Service which
@@ -585,9 +577,40 @@ func (c *MPIJobController) syncHandler(key string) error {
585
577
return err
586
578
}
587
579
580
+ if launcher != nil {
581
+ if isMPIJobSuspended (mpiJob ) != isJobSuspended (launcher ) {
582
+ // align the suspension state of launcher with the MPIJob
583
+ launcher .Spec .Suspend = pointer .Bool (isMPIJobSuspended (mpiJob ))
584
+ if _ , err := c .kubeClient .BatchV1 ().Jobs (namespace ).Update (context .TODO (), launcher , metav1.UpdateOptions {}); err != nil {
585
+ return err
586
+ }
587
+ }
588
+ }
589
+
590
+ // cleanup the running worker pods if the MPI job is suspended
591
+ if isMPIJobSuspended (mpiJob ) {
592
+ if err := cleanUpWorkerPods (mpiJob , c ); err != nil {
593
+ return err
594
+ }
595
+ }
588
596
return nil
589
597
}
590
598
599
+ func cleanUpWorkerPods (mpiJob * kubeflow.MPIJob , c * MPIJobController ) error {
600
+ // set worker StatefulSet Replicas to 0.
601
+ if err := c .deleteWorkerPods (mpiJob ); err != nil {
602
+ return err
603
+ }
604
+ initializeMPIJobStatuses (mpiJob , kubeflow .MPIReplicaTypeWorker )
605
+ if c .gangSchedulerName != "" {
606
+ if err := c .deletePodGroups (mpiJob ); err != nil {
607
+ return err
608
+ }
609
+ }
610
+ mpiJob .Status .ReplicaStatuses [kubeflow .MPIReplicaTypeWorker ].Active = 0
611
+ return c .updateStatusHandler (mpiJob )
612
+ }
613
+
591
614
// getLauncherJob gets the launcher Job controlled by this MPIJob.
592
615
func (c * MPIJobController ) getLauncherJob (mpiJob * kubeflow.MPIJob ) (* batchv1.Job , error ) {
593
616
launcher , err := c .jobLister .Jobs (mpiJob .Namespace ).Get (mpiJob .Name + launcherSuffix )
@@ -857,6 +880,14 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
857
880
return workerPods , nil
858
881
}
859
882
883
+ func isMPIJobSuspended (mpiJob * kubeflow.MPIJob ) bool {
884
+ return pointer .BoolDeref (mpiJob .Spec .RunPolicy .Suspend , false )
885
+ }
886
+
887
+ func isJobSuspended (job * batchv1.Job ) bool {
888
+ return pointer .BoolDeref (job .Spec .Suspend , false )
889
+ }
890
+
860
891
func (c * MPIJobController ) deleteWorkerPods (mpiJob * kubeflow.MPIJob ) error {
861
892
var (
862
893
workerPrefix = mpiJob .Name + workerSuffix
@@ -905,6 +936,19 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
905
936
if err != nil {
906
937
return fmt .Errorf ("checking launcher pods running: %w" , err )
907
938
}
939
+ if isMPIJobSuspended (mpiJob ) {
940
+ // it is suspended now
941
+ if updateMPIJobConditions (mpiJob , kubeflow .JobSuspended , v1 .ConditionTrue , "MPIJobSuspended" , "MPIJob suspended" ) {
942
+ c .recorder .Event (mpiJob , corev1 .EventTypeNormal , "MPIJobSuspended" , "MPIJob suspended" )
943
+ }
944
+ } else if getCondition (mpiJob .Status , kubeflow .JobSuspended ) != nil {
945
+ // it is not suspended now, consider resumed if the condition was set before
946
+ if updateMPIJobConditions (mpiJob , kubeflow .JobSuspended , v1 .ConditionFalse , "MPIJobResumed" , "MPIJob resumed" ) {
947
+ c .recorder .Event (mpiJob , corev1 .EventTypeNormal , "MPIJobResumed" , "MPIJob resumed" )
948
+ now := metav1 .NewTime (time .Now ())
949
+ mpiJob .Status .StartTime = & now
950
+ }
951
+ }
908
952
// Job.status.Active accounts for Pending and Running pods. Count running pods
909
953
// from the lister instead.
910
954
launcherPodsCnt := countRunningPods (launcherPods )
@@ -919,7 +963,7 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
919
963
if mpiJob .Status .CompletionTime == nil {
920
964
mpiJob .Status .CompletionTime = launcher .Status .CompletionTime
921
965
}
922
- updateMPIJobConditions (mpiJob , kubeflow .JobSucceeded , mpiJobSucceededReason , msg )
966
+ updateMPIJobConditions (mpiJob , kubeflow .JobSucceeded , v1 . ConditionTrue , mpiJobSucceededReason , msg )
923
967
mpiJobsSuccessCount .Inc ()
924
968
} else if isJobFailed (launcher ) {
925
969
c .updateMPIJobFailedStatus (mpiJob , launcher , launcherPods )
@@ -953,14 +997,17 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
953
997
if evict > 0 {
954
998
msg := fmt .Sprintf ("%d/%d workers are evicted" , evict , len (worker ))
955
999
klog .Infof ("MPIJob <%s/%s>: %v" , mpiJob .Namespace , mpiJob .Name , msg )
956
- updateMPIJobConditions (mpiJob , kubeflow .JobFailed , mpiJobEvict , msg )
1000
+ updateMPIJobConditions (mpiJob , kubeflow .JobFailed , v1 . ConditionTrue , mpiJobEvict , msg )
957
1001
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , mpiJobEvict , msg )
958
1002
}
959
1003
960
1004
if launcher != nil && launcherPodsCnt >= 1 && running == len (worker ) {
961
1005
msg := fmt .Sprintf ("MPIJob %s/%s is running." , mpiJob .Namespace , mpiJob .Name )
962
- updateMPIJobConditions (mpiJob , kubeflow .JobRunning , mpiJobRunningReason , msg )
1006
+ updateMPIJobConditions (mpiJob , kubeflow .JobRunning , v1 . ConditionTrue , mpiJobRunningReason , msg )
963
1007
c .recorder .Eventf (mpiJob , corev1 .EventTypeNormal , "MPIJobRunning" , "MPIJob %s/%s is running" , mpiJob .Namespace , mpiJob .Name )
1008
+ } else if isMPIJobSuspended (mpiJob ) {
1009
+ msg := fmt .Sprintf ("MPIJob %s/%s is suspended." , mpiJob .Namespace , mpiJob .Name )
1010
+ updateMPIJobConditions (mpiJob , kubeflow .JobRunning , v1 .ConditionFalse , mpiJobSuspendedReason , msg )
964
1011
}
965
1012
966
1013
// no need to update the mpijob if the status hasn't changed since last time.
@@ -999,7 +1046,7 @@ func (c *MPIJobController) updateMPIJobFailedStatus(mpiJob *kubeflow.MPIJob, lau
999
1046
now := metav1 .Now ()
1000
1047
mpiJob .Status .CompletionTime = & now
1001
1048
}
1002
- updateMPIJobConditions (mpiJob , kubeflow .JobFailed , reason , msg )
1049
+ updateMPIJobConditions (mpiJob , kubeflow .JobFailed , v1 . ConditionTrue , reason , msg )
1003
1050
mpiJobsFailureCount .Inc ()
1004
1051
}
1005
1052
@@ -1304,7 +1351,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
1304
1351
}
1305
1352
1306
1353
func (c * MPIJobController ) newLauncherJob (mpiJob * kubeflow.MPIJob ) * batchv1.Job {
1307
- return & batchv1.Job {
1354
+ job := & batchv1.Job {
1308
1355
ObjectMeta : metav1.ObjectMeta {
1309
1356
Name : mpiJob .Name + launcherSuffix ,
1310
1357
Namespace : mpiJob .Namespace ,
@@ -1322,6 +1369,10 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job
1322
1369
Template : c .newLauncherPodTemplate (mpiJob ),
1323
1370
},
1324
1371
}
1372
+ if isMPIJobSuspended (mpiJob ) {
1373
+ job .Spec .Suspend = pointer .Bool (true )
1374
+ }
1375
+ return job
1325
1376
}
1326
1377
1327
1378
// newLauncherPodTemplate creates a new launcher Job for an MPIJob resource. It also sets
0 commit comments