@@ -51,6 +51,7 @@ import (
51
51
"k8s.io/client-go/tools/record"
52
52
"k8s.io/client-go/util/workqueue"
53
53
"k8s.io/klog"
54
+ "k8s.io/utils/clock"
54
55
"k8s.io/utils/pointer"
55
56
podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
56
57
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
@@ -245,6 +246,9 @@ type MPIJobController struct {
245
246
246
247
// To allow injection of updateStatus for testing.
247
248
updateStatusHandler func (mpijob * kubeflow.MPIJob ) error
249
+
250
+ // Clock for internal use of unit-testing
251
+ clock clock.WithTicker
248
252
}
249
253
250
254
// NewMPIJobController returns a new MPIJob controller.
@@ -260,6 +264,26 @@ func NewMPIJobController(
260
264
podgroupsInformer podgroupsinformer.PodGroupInformer ,
261
265
mpiJobInformer informers.MPIJobInformer ,
262
266
gangSchedulerName string ) * MPIJobController {
267
+ return NewMPIJobControllerWithClock (kubeClient , kubeflowClient , volcanoClientSet ,
268
+ configMapInformer , secretInformer , serviceInformer , jobInformer ,
269
+ podInformer , podgroupsInformer , mpiJobInformer , gangSchedulerName ,
270
+ & clock.RealClock {})
271
+ }
272
+
273
+ // NewMPIJobController returns a new MPIJob controller.
274
+ func NewMPIJobControllerWithClock (
275
+ kubeClient kubernetes.Interface ,
276
+ kubeflowClient clientset.Interface ,
277
+ volcanoClientSet volcanoclient.Interface ,
278
+ configMapInformer coreinformers.ConfigMapInformer ,
279
+ secretInformer coreinformers.SecretInformer ,
280
+ serviceInformer coreinformers.ServiceInformer ,
281
+ jobInformer batchinformers.JobInformer ,
282
+ podInformer coreinformers.PodInformer ,
283
+ podgroupsInformer podgroupsinformer.PodGroupInformer ,
284
+ mpiJobInformer informers.MPIJobInformer ,
285
+ gangSchedulerName string ,
286
+ clock clock.WithTicker ) * MPIJobController {
263
287
264
288
// Create event broadcaster.
265
289
klog .V (4 ).Info ("Creating event broadcaster" )
@@ -296,6 +320,7 @@ func NewMPIJobController(
296
320
queue : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "MPIJobs" ),
297
321
recorder : recorder ,
298
322
gangSchedulerName : gangSchedulerName ,
323
+ clock : clock ,
299
324
}
300
325
301
326
controller .updateStatusHandler = controller .doUpdateJobStatus
@@ -451,9 +476,9 @@ func (c *MPIJobController) processNextWorkItem() bool {
451
476
// converge the two. It then updates the Status block of the MPIJob resource
452
477
// with the current status of the resource.
453
478
func (c * MPIJobController ) syncHandler (key string ) error {
454
- startTime := time .Now ()
479
+ startTime := c . clock .Now ()
455
480
defer func () {
456
- klog .Infof ("Finished syncing job %q (%v)" , key , time .Since (startTime ))
481
+ klog .Infof ("Finished syncing job %q (%v)" , key , c . clock .Since (startTime ))
457
482
}()
458
483
459
484
// Convert the namespace/name string into a distinct namespace and name.
@@ -505,7 +530,10 @@ func (c *MPIJobController) syncHandler(key string) error {
505
530
// cleanup and stop retrying the MPIJob.
506
531
if isFinished (mpiJob .Status ) && mpiJob .Status .CompletionTime != nil {
507
532
if isCleanUpPods (mpiJob .Spec .RunPolicy .CleanPodPolicy ) {
508
- return cleanUpWorkerPods (mpiJob , c )
533
+ if err := cleanUpWorkerPods (mpiJob , c ); err != nil {
534
+ return err
535
+ }
536
+ return c .updateStatusHandler (mpiJob )
509
537
}
510
538
return nil
511
539
}
@@ -570,13 +598,6 @@ func (c *MPIJobController) syncHandler(key string) error {
570
598
}
571
599
}
572
600
573
- // Finally, we update the status block of the MPIJob resource to reflect the
574
- // current state of the world.
575
- err = c .updateMPIJobStatus (mpiJob , launcher , worker )
576
- if err != nil {
577
- return err
578
- }
579
-
580
601
if launcher != nil {
581
602
if isMPIJobSuspended (mpiJob ) != isJobSuspended (launcher ) {
582
603
// align the suspension state of launcher with the MPIJob
@@ -593,6 +614,14 @@ func (c *MPIJobController) syncHandler(key string) error {
593
614
return err
594
615
}
595
616
}
617
+
618
+ // Finally, we update the status block of the MPIJob resource to reflect the
619
+ // current state of the world.
620
+ err = c .updateMPIJobStatus (mpiJob , launcher , worker )
621
+ if err != nil {
622
+ return err
623
+ }
624
+
596
625
return nil
597
626
}
598
627
@@ -608,7 +637,7 @@ func cleanUpWorkerPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error {
608
637
}
609
638
}
610
639
mpiJob .Status .ReplicaStatuses [common .ReplicaType (kubeflow .MPIReplicaTypeWorker )].Active = 0
611
- return c . updateStatusHandler ( mpiJob )
640
+ return nil
612
641
}
613
642
614
643
// getLauncherJob gets the launcher Job controlled by this MPIJob.
@@ -932,10 +961,6 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {
932
961
933
962
func (c * MPIJobController ) updateMPIJobStatus (mpiJob * kubeflow.MPIJob , launcher * batchv1.Job , worker []* corev1.Pod ) error {
934
963
oldStatus := mpiJob .Status .DeepCopy ()
935
- launcherPods , err := c .jobPods (launcher )
936
- if err != nil {
937
- return fmt .Errorf ("checking launcher pods running: %w" , err )
938
- }
939
964
if isMPIJobSuspended (mpiJob ) {
940
965
// it is suspended now
941
966
if updateMPIJobConditions (mpiJob , kubeflow .JobSuspended , v1 .ConditionTrue , "MPIJobSuspended" , "MPIJob suspended" ) {
@@ -945,10 +970,14 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
945
970
// it is not suspended now, consider resumed if the condition was set before
946
971
if updateMPIJobConditions (mpiJob , kubeflow .JobSuspended , v1 .ConditionFalse , "MPIJobResumed" , "MPIJob resumed" ) {
947
972
c .recorder .Event (mpiJob , corev1 .EventTypeNormal , "MPIJobResumed" , "MPIJob resumed" )
948
- now := metav1 .NewTime (time .Now ())
973
+ now := metav1 .NewTime (c . clock .Now ())
949
974
mpiJob .Status .StartTime = & now
950
975
}
951
976
}
977
+ launcherPods , err := c .jobPods (launcher )
978
+ if err != nil {
979
+ return fmt .Errorf ("checking launcher pods running: %w" , err )
980
+ }
952
981
// Job.status.Active accounts for Pending and Running pods. Count running pods
953
982
// from the lister instead.
954
983
launcherPodsCnt := countRunningPods (launcherPods )
@@ -1001,13 +1030,13 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
1001
1030
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , mpiJobEvict , msg )
1002
1031
}
1003
1032
1004
- if launcher != nil && launcherPodsCnt >= 1 && running == len (worker ) {
1033
+ if isMPIJobSuspended (mpiJob ) {
1034
+ msg := fmt .Sprintf ("MPIJob %s/%s is suspended." , mpiJob .Namespace , mpiJob .Name )
1035
+ updateMPIJobConditions (mpiJob , common .JobRunning , v1 .ConditionFalse , mpiJobSuspendedReason , msg )
1036
+ } else if launcher != nil && launcherPodsCnt >= 1 && running == len (worker ) {
1005
1037
msg := fmt .Sprintf ("MPIJob %s/%s is running." , mpiJob .Namespace , mpiJob .Name )
1006
1038
updateMPIJobConditions (mpiJob , common .JobRunning , v1 .ConditionTrue , mpiJobRunningReason , msg )
1007
1039
c .recorder .Eventf (mpiJob , corev1 .EventTypeNormal , "MPIJobRunning" , "MPIJob %s/%s is running" , mpiJob .Namespace , mpiJob .Name )
1008
- } else if isMPIJobSuspended (mpiJob ) {
1009
- msg := fmt .Sprintf ("MPIJob %s/%s is suspended." , mpiJob .Namespace , mpiJob .Name )
1010
- updateMPIJobConditions (mpiJob , common .JobRunning , v1 .ConditionFalse , mpiJobSuspendedReason , msg )
1011
1040
}
1012
1041
1013
1042
// no need to update the mpijob if the status hasn't changed since last time.
0 commit comments