@@ -33,6 +33,7 @@ import (
33
33
"golang.org/x/crypto/ssh"
34
34
batchv1 "k8s.io/api/batch/v1"
35
35
corev1 "k8s.io/api/core/v1"
36
+ v1 "k8s.io/api/core/v1"
36
37
"k8s.io/apimachinery/pkg/api/equality"
37
38
"k8s.io/apimachinery/pkg/api/errors"
38
39
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -50,6 +51,7 @@ import (
50
51
"k8s.io/client-go/tools/record"
51
52
"k8s.io/client-go/util/workqueue"
52
53
"k8s.io/klog"
54
+ "k8s.io/utils/pointer"
53
55
podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
54
56
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
55
57
podgroupsinformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
@@ -493,7 +495,7 @@ func (c *MPIJobController) syncHandler(key string) error {
493
495
494
496
if len (mpiJob .Status .Conditions ) == 0 {
495
497
msg := fmt .Sprintf ("MPIJob %s/%s is created." , mpiJob .Namespace , mpiJob .Name )
496
- updateMPIJobConditions (mpiJob , common .JobCreated , mpiJobCreatedReason , msg )
498
+ updateMPIJobConditions (mpiJob , common .JobCreated , v1 . ConditionTrue , mpiJobCreatedReason , msg )
497
499
c .recorder .Event (mpiJob , corev1 .EventTypeNormal , "MPIJobCreated" , msg )
498
500
mpiJobsCreatedCount .Inc ()
499
501
}
@@ -503,24 +505,13 @@ func (c *MPIJobController) syncHandler(key string) error {
503
505
// cleanup and stop retrying the MPIJob.
504
506
if isFinished (mpiJob .Status ) && mpiJob .Status .CompletionTime != nil {
505
507
if isCleanUpPods (mpiJob .Spec .RunPolicy .CleanPodPolicy ) {
506
- // set worker StatefulSet Replicas to 0.
507
- if err := c .deleteWorkerPods (mpiJob ); err != nil {
508
- return err
509
- }
510
- initializeMPIJobStatuses (mpiJob , kubeflow .MPIReplicaTypeWorker )
511
- if c .gangSchedulerName != "" {
512
- if err := c .deletePodGroups (mpiJob ); err != nil {
513
- return err
514
- }
515
- }
516
- mpiJob .Status .ReplicaStatuses [common .ReplicaType (kubeflow .MPIReplicaTypeWorker )].Active = 0
517
- return c .updateStatusHandler (mpiJob )
508
+ return cleanUpWorkerPods (mpiJob , c )
518
509
}
519
510
return nil
520
511
}
521
512
522
513
// first set StartTime.
523
- if mpiJob .Status .StartTime == nil {
514
+ if mpiJob .Status .StartTime == nil && ! isMPIJobSuspended ( mpiJob ) {
524
515
now := metav1 .Now ()
525
516
mpiJob .Status .StartTime = & now
526
517
}
@@ -535,38 +526,40 @@ func (c *MPIJobController) syncHandler(key string) error {
535
526
// We're done if the launcher either succeeded or failed.
536
527
done := launcher != nil && isJobFinished (launcher )
537
528
if ! done {
538
- _ , err := c .getOrCreateService (mpiJob , newWorkersService (mpiJob ))
539
- if err != nil {
540
- return fmt .Errorf ("getting or creating Service to front workers: %w" , err )
541
- }
529
+ if ! isMPIJobSuspended (mpiJob ) {
530
+ _ , err := c .getOrCreateService (mpiJob , newWorkersService (mpiJob ))
531
+ if err != nil {
532
+ return fmt .Errorf ("getting or creating Service to front workers: %w" , err )
533
+ }
542
534
543
- if config , err := c .getOrCreateConfigMap (mpiJob ); config == nil || err != nil {
544
- return fmt .Errorf ("getting or creating ConfigMap: %w" , err )
545
- }
535
+ if config , err := c .getOrCreateConfigMap (mpiJob ); config == nil || err != nil {
536
+ return fmt .Errorf ("getting or creating ConfigMap: %w" , err )
537
+ }
546
538
547
- _ , err = c .getOrCreateSSHAuthSecret (mpiJob )
548
- if err != nil {
549
- return fmt .Errorf ("creating SSH auth secret: %w" , err )
550
- }
539
+ _ , err = c .getOrCreateSSHAuthSecret (mpiJob )
540
+ if err != nil {
541
+ return fmt .Errorf ("creating SSH auth secret: %w" , err )
542
+ }
551
543
552
- // Get the PodGroup for this MPIJob
553
- if c .gangSchedulerName != "" {
554
- if podgroup , err := c .getOrCreatePodGroups (mpiJob , workerReplicas (mpiJob )+ 1 ); podgroup == nil || err != nil {
555
- return err
544
+ // Get the PodGroup for this MPIJob
545
+ if c .gangSchedulerName != "" {
546
+ if podgroup , err := c .getOrCreatePodGroups (mpiJob , workerReplicas (mpiJob )+ 1 ); podgroup == nil || err != nil {
547
+ return err
548
+ }
556
549
}
557
- }
558
550
559
- worker , err = c .getOrCreateWorker (mpiJob )
560
- if err != nil {
561
- return err
562
- }
563
- if mpiJob .Spec .MPIImplementation == kubeflow .MPIImplementationIntel {
564
- // The Intel implementation requires workers to communicate with the
565
- // launcher through its hostname. For that, we create a Service which
566
- // has the same name as the launcher's hostname.
567
- _ , err := c .getOrCreateService (mpiJob , newLauncherService (mpiJob ))
551
+ worker , err = c .getOrCreateWorker (mpiJob )
568
552
if err != nil {
569
- return fmt .Errorf ("getting or creating Service to front launcher: %w" , err )
553
+ return err
554
+ }
555
+ if mpiJob .Spec .MPIImplementation == kubeflow .MPIImplementationIntel {
556
+ // The Intel implementation requires workers to communicate with the
557
+ // launcher through its hostname. For that, we create a Service which
558
+ // has the same name as the launcher's hostname.
559
+ _ , err := c .getOrCreateService (mpiJob , newLauncherService (mpiJob ))
560
+ if err != nil {
561
+ return fmt .Errorf ("getting or creating Service to front launcher: %w" , err )
562
+ }
570
563
}
571
564
}
572
565
if launcher == nil {
@@ -585,9 +578,40 @@ func (c *MPIJobController) syncHandler(key string) error {
585
578
return err
586
579
}
587
580
581
+ if launcher != nil {
582
+ if isMPIJobSuspended (mpiJob ) != isJobSuspended (launcher ) {
583
+ // align the suspension state of launcher with the MPIJob
584
+ launcher .Spec .Suspend = pointer .Bool (isMPIJobSuspended (mpiJob ))
585
+ if _ , err := c .kubeClient .BatchV1 ().Jobs (namespace ).Update (context .TODO (), launcher , metav1.UpdateOptions {}); err != nil {
586
+ return err
587
+ }
588
+ }
589
+ }
590
+
591
+ // cleanup the running worker pods if the MPI job is suspended
592
+ if isMPIJobSuspended (mpiJob ) {
593
+ if err := cleanUpWorkerPods (mpiJob , c ); err != nil {
594
+ return err
595
+ }
596
+ }
588
597
return nil
589
598
}
590
599
600
+ func cleanUpWorkerPods (mpiJob * kubeflow.MPIJob , c * MPIJobController ) error {
601
+ // set worker StatefulSet Replicas to 0.
602
+ if err := c .deleteWorkerPods (mpiJob ); err != nil {
603
+ return err
604
+ }
605
+ initializeMPIJobStatuses (mpiJob , kubeflow .MPIReplicaTypeWorker )
606
+ if c .gangSchedulerName != "" {
607
+ if err := c .deletePodGroups (mpiJob ); err != nil {
608
+ return err
609
+ }
610
+ }
611
+ mpiJob .Status .ReplicaStatuses [common .ReplicaType (kubeflow .MPIReplicaTypeWorker )].Active = 0
612
+ return c .updateStatusHandler (mpiJob )
613
+ }
614
+
591
615
// getLauncherJob gets the launcher Job controlled by this MPIJob.
592
616
func (c * MPIJobController ) getLauncherJob (mpiJob * kubeflow.MPIJob ) (* batchv1.Job , error ) {
593
617
launcher , err := c .jobLister .Jobs (mpiJob .Namespace ).Get (mpiJob .Name + launcherSuffix )
@@ -857,6 +881,14 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
857
881
return workerPods , nil
858
882
}
859
883
884
+ func isMPIJobSuspended (mpiJob * kubeflow.MPIJob ) bool {
885
+ return pointer .BoolDeref (mpiJob .Spec .RunPolicy .Suspend , false )
886
+ }
887
+
888
+ func isJobSuspended (job * batchv1.Job ) bool {
889
+ return pointer .BoolDeref (job .Spec .Suspend , false )
890
+ }
891
+
860
892
func (c * MPIJobController ) deleteWorkerPods (mpiJob * kubeflow.MPIJob ) error {
861
893
var (
862
894
workerPrefix = mpiJob .Name + workerSuffix
@@ -905,6 +937,19 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
905
937
if err != nil {
906
938
return fmt .Errorf ("checking launcher pods running: %w" , err )
907
939
}
940
+ if isMPIJobSuspended (mpiJob ) {
941
+ // it is suspended now
942
+ if updateMPIJobConditions (mpiJob , kubeflow .JobSuspended , v1 .ConditionTrue , "MPIJobSuspended" , "MPIJob suspended" ) {
943
+ c .recorder .Event (mpiJob , corev1 .EventTypeNormal , "Suspended" , "MPIJob suspended" )
944
+ }
945
+ } else if getCondition (mpiJob .Status , kubeflow .JobSuspended ) != nil {
946
+ // it is not suspended now, consider resumed if the condition was set before
947
+ if updateMPIJobConditions (mpiJob , kubeflow .JobSuspended , v1 .ConditionTrue , "MPIJobResumed" , "MPIJob resumed" ) {
948
+ c .recorder .Event (mpiJob , corev1 .EventTypeNormal , "Resumed" , "MPIJob resumed" )
949
+ now := metav1 .NewTime (time .Now ())
950
+ mpiJob .Status .StartTime = & now
951
+ }
952
+ }
908
953
// Job.status.Active accounts for Pending and Running pods. Count running pods
909
954
// from the lister instead.
910
955
launcherPodsCnt := countRunningPods (launcherPods )
@@ -919,7 +964,7 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
919
964
if mpiJob .Status .CompletionTime == nil {
920
965
mpiJob .Status .CompletionTime = launcher .Status .CompletionTime
921
966
}
922
- updateMPIJobConditions (mpiJob , common .JobSucceeded , mpiJobSucceededReason , msg )
967
+ updateMPIJobConditions (mpiJob , common .JobSucceeded , v1 . ConditionTrue , mpiJobSucceededReason , msg )
923
968
mpiJobsSuccessCount .Inc ()
924
969
} else if isJobFailed (launcher ) {
925
970
c .updateMPIJobFailedStatus (mpiJob , launcher , launcherPods )
@@ -953,13 +998,13 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
953
998
if evict > 0 {
954
999
msg := fmt .Sprintf ("%d/%d workers are evicted" , evict , len (worker ))
955
1000
klog .Infof ("MPIJob <%s/%s>: %v" , mpiJob .Namespace , mpiJob .Name , msg )
956
- updateMPIJobConditions (mpiJob , common .JobFailed , mpiJobEvict , msg )
1001
+ updateMPIJobConditions (mpiJob , common .JobFailed , v1 . ConditionTrue , mpiJobEvict , msg )
957
1002
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , mpiJobEvict , msg )
958
1003
}
959
1004
960
1005
if launcher != nil && launcherPodsCnt >= 1 && running == len (worker ) {
961
1006
msg := fmt .Sprintf ("MPIJob %s/%s is running." , mpiJob .Namespace , mpiJob .Name )
962
- updateMPIJobConditions (mpiJob , common .JobRunning , mpiJobRunningReason , msg )
1007
+ updateMPIJobConditions (mpiJob , common .JobRunning , v1 . ConditionTrue , mpiJobRunningReason , msg )
963
1008
c .recorder .Eventf (mpiJob , corev1 .EventTypeNormal , "MPIJobRunning" , "MPIJob %s/%s is running" , mpiJob .Namespace , mpiJob .Name )
964
1009
}
965
1010
@@ -999,7 +1044,7 @@ func (c *MPIJobController) updateMPIJobFailedStatus(mpiJob *kubeflow.MPIJob, lau
999
1044
now := metav1 .Now ()
1000
1045
mpiJob .Status .CompletionTime = & now
1001
1046
}
1002
- updateMPIJobConditions (mpiJob , common .JobFailed , reason , msg )
1047
+ updateMPIJobConditions (mpiJob , common .JobFailed , v1 . ConditionTrue , reason , msg )
1003
1048
mpiJobsFailureCount .Inc ()
1004
1049
}
1005
1050
@@ -1304,7 +1349,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
1304
1349
}
1305
1350
1306
1351
func (c * MPIJobController ) newLauncherJob (mpiJob * kubeflow.MPIJob ) * batchv1.Job {
1307
- return & batchv1.Job {
1352
+ job := & batchv1.Job {
1308
1353
ObjectMeta : metav1.ObjectMeta {
1309
1354
Name : mpiJob .Name + launcherSuffix ,
1310
1355
Namespace : mpiJob .Namespace ,
@@ -1322,6 +1367,10 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job
1322
1367
Template : c .newLauncherPodTemplate (mpiJob ),
1323
1368
},
1324
1369
}
1370
+ if isMPIJobSuspended (mpiJob ) {
1371
+ job .Spec .Suspend = pointer .Bool (true )
1372
+ }
1373
+ return job
1325
1374
}
1326
1375
1327
1376
// newLauncherPodTemplate creates a new launcher Job for an MPIJob resource. It also sets
0 commit comments