Skip to content
This repository was archived by the owner on Sep 12, 2023. It is now read-only.

Commit 66db21b

Browse files
committed
add job suspend run Policy
add job partial success status
1 parent 2b40c8f commit 66db21b

File tree

11 files changed

+87
-12
lines changed

11 files changed

+87
-12
lines changed

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG
564564
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
565565
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
566566
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
567+
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
567568
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
568569
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
569570
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -804,6 +805,7 @@ k8s.io/component-base v0.19.2/go.mod h1:g5LrsiTiabMLZ40AR6Hl45f088DevyGY+cCE2agE
804805
k8s.io/component-base v0.19.6 h1:V76d3rIEWvP95peWgRycKslQnEwlaPy4UORvh3+YBbU=
805806
k8s.io/component-base v0.19.6/go.mod h1:8Btsf8J00/fVDa/YFmXjei7gVkcFrlKZXjSeP4SZNJg=
806807
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
808+
k8s.io/gengo v0.0.0-20200428234225-8167cfdcfc14 h1:t4L10Qfx/p7ASH3gXCdIUtPbbIuegCoUJf3TMSFekjw=
807809
k8s.io/gengo v0.0.0-20200428234225-8167cfdcfc14/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
808810
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
809811
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=

pkg/apis/common/v1/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type ControllerInterface interface {
6464
// It will requeue the job in case of an error while creating/deleting pods.
6565
// Common implementation will be provided and User can still override this to implement their own reconcile logic
6666
ReconcilePods(job interface{}, jobStatus *JobStatus, pods []*v1.Pod, rtype ReplicaType, spec *ReplicaSpec,
67-
replicas map[ReplicaType]*ReplicaSpec) error
67+
replicas map[ReplicaType]*ReplicaSpec, runPolicy *RunPolicy) error
6868

6969
// ReconcileServices checks and updates services for each given ReplicaSpec.
7070
// It will requeue the job in case of an error while creating/deleting services.

pkg/apis/common/v1/openapi_generated.go

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/common/v1/types.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,17 @@ const (
135135
// reached phase failed with no restarting.
136136
// The training has failed its execution.
137137
JobFailed JobConditionType = "Failed"
138+
139+
// JobSuspended means sub-resources (e.g. services/pods) of this job
140+
// has been terminated.
141+
JobSuspended JobConditionType = "Suspended"
142+
143+
// JobResumed means job Resumed from suspended
144+
JobResumed JobConditionType = "Resumed"
145+
146+
// JobPartialSucceed means all sub-resources (e.g. services/pods) of this job's one worker
147+
// reached phase have terminated in success.
148+
JobPartialSucceeded JobConditionType = "PartialSucceeded"
138149
)
139150

140151
// +k8s:openapi-gen=true
@@ -196,6 +207,17 @@ type RunPolicy struct {
196207
// SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
197208
// +optional
198209
SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"`
210+
211+
// Suspend specifies whether the Job controller should create Pods or not. If
212+
// a Job is created with suspend set to true, no Pods are created by the Job
213+
// controller. If a Job is suspended after creation (i.e. the flag goes from
214+
// false to true), the Job controller will delete all active Pods associated
215+
// with this Job. Users must design their workload to gracefully handle this.
216+
// Suspending a Job will reset the StartTime field of the Job, effectively
217+
// resetting the ActiveDeadlineSeconds timer too.
218+
// Defaults to false.
219+
// +optional
220+
Suspend *bool `json:"suspend,omitempty"`
199221
}
200222

201223
// +k8s:openapi-gen=true

pkg/apis/common/v1/zz_generated.deepcopy.go

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/common/v1/zz_generated.defaults.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/controller.v1/common/job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func (jc *JobController) ReconcileJobs(
270270

271271
// Diff current active pods/services with replicas.
272272
for rtype, spec := range replicas {
273-
err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas)
273+
err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas, runPolicy)
274274
if err != nil {
275275
log.Warnf("ReconcilePods error %v", err)
276276
return err

pkg/controller.v1/common/pod.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,8 @@ func (jc *JobController) ReconcilePods(
274274
pods []*v1.Pod,
275275
rType apiv1.ReplicaType,
276276
spec *apiv1.ReplicaSpec,
277-
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {
277+
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec,
278+
runPolicy *apiv1.RunPolicy) error {
278279

279280
rt := strings.ToLower(string(rType))
280281
metaObject, ok := job.(metav1.Object)
@@ -317,6 +318,10 @@ func (jc *JobController) ReconcilePods(
317318
} else if len(podSlice) == 0 {
318319
logger.Infof("Need to create new pod: %s-%d", rt, index)
319320

321+
if JobSuspended(runPolicy) || commonutil.IsSuspended(*jobStatus) {
322+
logger.Warningf("job is Suspended %s/%s", metaObject.GetNamespace(), metaObject.GetName())
323+
continue
324+
}
320325
// check if this replica is the master role
321326
masterRole = jc.Controller.IsMasterRole(replicas, rType, index)
322327
err = jc.createNewPod(job, rt, index, spec, masterRole, replicas)
@@ -328,7 +333,7 @@ func (jc *JobController) ReconcilePods(
328333
pod := podSlice[0]
329334

330335
// check if the index is in the valid range, if not, we should kill the pod
331-
if index < 0 || index >= numReplicas {
336+
if index < 0 || index >= numReplicas || JobSuspended(runPolicy) {
332337
err = jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject)
333338
if err != nil {
334339
return err

pkg/controller.v1/common/util.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,9 @@ func CalcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.R
143143

144144
return &minAvailableTasksRes
145145
}
146+
147+
// JobSuspended returns whether a Job is suspended while taking the feature
148+
// gate into account.
149+
func JobSuspended(runPolicy *apiv1.RunPolicy) bool {
150+
return runPolicy.Suspend != nil && *runPolicy.Suspend
151+
}

pkg/core/job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func RecordAbnormalPods(activePods []*v1.Pod, object runtime.Object, recorder re
6464

6565
// PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
6666
func PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool {
67-
if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil {
67+
if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil || (runPolicy.Suspend != nil && *runPolicy.Suspend) {
6868
return false
6969
}
7070
now := metav1.Now()

0 commit comments

Comments
 (0)