diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 6c5260390..43293b940 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -23,6 +23,8 @@ spec: properties: spec: properties: + priorityClassName: + type: string pytorchReplicaSpecs: properties: Master: diff --git a/manifests/podgroup.yaml b/manifests/podgroup.yaml index 1432fc160..cc9e7ec78 100644 --- a/manifests/podgroup.yaml +++ b/manifests/podgroup.yaml @@ -22,6 +22,8 @@ spec: minMember: format: int32 type: integer + priorityClassName: + type: string type: object status: properties: diff --git a/pkg/apis/pytorch/v1/types.go b/pkg/apis/pytorch/v1/types.go index d90cbeb4c..1ed00bf6e 100644 --- a/pkg/apis/pytorch/v1/types.go +++ b/pkg/apis/pytorch/v1/types.go @@ -69,6 +69,10 @@ type PyTorchJobSpec struct { // "Worker": PyTorchReplicaSpec, // } PyTorchReplicaSpecs map[PyTorchReplicaType]*common.ReplicaSpec `json:"pytorchReplicaSpecs"` + + //add PriorityClassName + //PriorityClassName is a type of k8s resource.(kubectl get priorityclass) + PriorityClassName *string `json:"priorityClassName,omitempty"` } // PyTorchReplicaType is the type for PyTorchReplica. Can be one of "Master" or "Worker". diff --git a/pkg/apis/pytorch/v1/zz_generated.deepcopy.go b/pkg/apis/pytorch/v1/zz_generated.deepcopy.go index 63f3fc770..20209d50a 100644 --- a/pkg/apis/pytorch/v1/zz_generated.deepcopy.go +++ b/pkg/apis/pytorch/v1/zz_generated.deepcopy.go @@ -122,6 +122,11 @@ func (in *PyTorchJobSpec) DeepCopyInto(out *PyTorchJobSpec) { (*out)[key] = outVal } } + if in.PriorityClassName != nil { + in, out := &in.PriorityClassName, &out.PriorityClassName + *out = new(string) + **out = **in + } return } diff --git a/pkg/apis/pytorch/v1beta2/types.go b/pkg/apis/pytorch/v1beta2/types.go index 3287c945f..452e3e906 100644 --- a/pkg/apis/pytorch/v1beta2/types.go +++ b/pkg/apis/pytorch/v1beta2/types.go @@ -72,6 +72,10 @@ type PyTorchJobSpec struct { // "Worker": PyTorchReplicaSpec, // } PyTorchReplicaSpecs map[PyTorchReplicaType]*common.ReplicaSpec `json:"pytorchReplicaSpecs"` + + //add PriorityClassName + //PriorityClassName is a type of k8s resource.(kubectl get priorityclass) + PriorityClassName *string `json:"priorityClassName,omitempty"` } // PyTorchReplicaType is the type for PyTorchReplica. diff --git a/pkg/apis/pytorch/v1beta2/zz_generated.deepcopy.go b/pkg/apis/pytorch/v1beta2/zz_generated.deepcopy.go index e24ddebec..c633b3a9a 100644 --- a/pkg/apis/pytorch/v1beta2/zz_generated.deepcopy.go +++ b/pkg/apis/pytorch/v1beta2/zz_generated.deepcopy.go @@ -122,6 +122,11 @@ func (in *PyTorchJobSpec) DeepCopyInto(out *PyTorchJobSpec) { (*out)[key] = outVal } } + if in.PriorityClassName != nil { + in, out := &in.PriorityClassName, &out.PriorityClassName + *out = new(string) + **out = **in + } return } diff --git a/pkg/controller.v1/pytorch/controller.go b/pkg/controller.v1/pytorch/controller.go index a5e6f021b..612b5e603 100644 --- a/pkg/controller.v1/pytorch/controller.go +++ b/pkg/controller.v1/pytorch/controller.go @@ -23,7 +23,7 @@ import ( kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" log "github.com/sirupsen/logrus" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -437,7 +437,9 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error { if pc.Config.EnableGangScheduling { minAvailableReplicas := getTotalReplicas(job) - _, err := pc.SyncPodGroup(job, minAvailableReplicas) + priorityClassName := getPriorityClassName(job) + //_, err := pc.SyncPodGroup(job, minAvailableReplicas) + _, err := pc.SyncPodGroup(job, minAvailableReplicas, priorityClassName) if err != nil { logger.Warnf("Sync PodGroup %v: %v", job.Name, err) } diff --git a/pkg/controller.v1/pytorch/job.go b/pkg/controller.v1/pytorch/job.go index 5ddfd7708..a8abef626 100644 --- a/pkg/controller.v1/pytorch/job.go +++ b/pkg/controller.v1/pytorch/job.go @@ -216,3 +216,8 @@ func getTotalFailedReplicas(job *pyv1.PyTorchJob) int32 { } return totalFailedReplicas } + +func getPriorityClassName(job *pyv1.PyTorchJob) string { + priorityClassName := *(job.Spec.PriorityClassName) + return priorityClassName +} diff --git a/pkg/controller.v1/pytorch/status.go b/pkg/controller.v1/pytorch/status.go index c083d9f80..989a35f66 100644 --- a/pkg/controller.v1/pytorch/status.go +++ b/pkg/controller.v1/pytorch/status.go @@ -69,6 +69,7 @@ func (pc *PyTorchController) updateStatusSingle(job *pyv1.PyTorchJob, rtype pyv1 // Expect to have `replicas - succeeded` pods alive. commonType := common.ReplicaType(rtype) + //expected is a flag of success.if expected==0,PyTorchJob is successfully completed. expected := replicas - int(job.Status.ReplicaStatuses[commonType].Succeeded) running := int(job.Status.ReplicaStatuses[commonType].Active) failed := int(job.Status.ReplicaStatuses[commonType].Failed) diff --git a/pkg/controller.v1beta2/pytorch/controller.go b/pkg/controller.v1beta2/pytorch/controller.go index 94dd4f248..2abd310aa 100644 --- a/pkg/controller.v1beta2/pytorch/controller.go +++ b/pkg/controller.v1beta2/pytorch/controller.go @@ -23,7 +23,7 @@ import ( kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" log "github.com/sirupsen/logrus" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -428,7 +428,8 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error if pc.Config.EnableGangScheduling { minAvailableReplicas := getTotalReplicas(job) - _, err := pc.SyncPodGroup(job, minAvailableReplicas) + priorityClassName := getPriorityClassName(job) + _, err := pc.SyncPodGroup(job, minAvailableReplicas, priorityClassName) if err != nil { logger.Warnf("Sync PodGroup %v: %v", job.Name, err) } diff --git a/pkg/controller.v1beta2/pytorch/job.go b/pkg/controller.v1beta2/pytorch/job.go index 91b26c842..eea493808 100644 --- a/pkg/controller.v1beta2/pytorch/job.go +++ b/pkg/controller.v1beta2/pytorch/job.go @@ -206,3 +206,8 @@ func getTotalFailedReplicas(job *v1beta2.PyTorchJob) int32 { } return totalFailedReplicas } + +func getPriorityClassName(job *v1beta2.PyTorchJob) string { + priorityClassName := *(job.Spec.PriorityClassName) + return priorityClassName +} diff --git a/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/jobcontroller.go b/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/jobcontroller.go index 5fa59d97e..c8d45d06b 100644 --- a/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/jobcontroller.go +++ b/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/jobcontroller.go @@ -215,7 +215,7 @@ func (jc *JobController) GenLabels(jobName string) map[string]string { } } -func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32) (*v1alpha1.PodGroup, error) { +func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32,priorityClassName string) (*v1alpha1.PodGroup, error) { kubeBatchClientInterface := jc.KubeBatchClientSet // Check whether podGroup exists or not @@ -236,6 +236,7 @@ func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas in }, Spec: v1alpha1.PodGroupSpec{ MinMember: minAvailable.IntVal, + PriorityClassName: priorityClassName, }, } return kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Create(createPodGroup) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/types.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/types.go index d439edb4c..1a7fe7665 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/types.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/types.go @@ -52,6 +52,15 @@ type PodGroupSpec struct { // Queue defines the queue to allocate resource for PodGroup; if queue does not exist, // the PodGroup will not be scheduled. Queue string `json:"queue,omitempty" protobuf:"bytes,2,opt,name=queue"` + + // If specified, indicates the PodGroup's priority. "system-node-critical" and + // "system-cluster-critical" are two special keywords which indicate the + // highest priorities with the former being the highest priority. Any other + // name must be defined by creating a PriorityClass object with that name. + // If not specified, the PodGroup priority will be default or zero if there is no + // default. + // +optional + PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,3,opt,name=priorityClassName"` } // PodGroupStatus represents the current state of a pod group.