diff --git a/go.mod b/go.mod index 3eb79665e4..c225dea3bc 100644 --- a/go.mod +++ b/go.mod @@ -130,4 +130,5 @@ replace ( k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.25.0 k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.25.0 k8s.io/sample-controller => k8s.io/sample-controller v0.25.0 + volcano.sh/apis => github.com/predibase/volcano-apis v0.0.0-20230208222101-1946093f2249 ) diff --git a/go.sum b/go.sum index 53bfa48286..6220210e7c 100644 --- a/go.sum +++ b/go.sum @@ -317,6 +317,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/predibase/volcano-apis v0.0.0-20230208222101-1946093f2249 h1:IKdy8kADUHn8OPAjatWMxCZXfHWiEWk65QXPlJ4SsQs= +github.com/predibase/volcano-apis v0.0.0-20230208222101-1946093f2249/go.mod h1:xe38GChdXXam/g/FkQXIsR0vhqp4twoZdY2gaGkEP24= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= @@ -831,5 +833,3 @@ sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= -volcano.sh/apis v1.6.0-alpha.0.0.20230214095022-ad92502b1a57 h1:aQhXCHqcaOtkCtFn3XSniKWlIb1xxwJ8G7SQeHkZ6vM= -volcano.sh/apis v1.6.0-alpha.0.0.20230214095022-ad92502b1a57/go.mod h1:h+xbUpkjfRaHjktAi8h+7JNnNahjwhRSgpN9FUUwNXQ= diff --git a/installer/helm/chart/volcano/config/volcano-scheduler.conf b/installer/helm/chart/volcano/config/volcano-scheduler.conf index d8e26a26d8..43e8385d12 100644 --- a/installer/helm/chart/volcano/config/volcano-scheduler.conf +++ b/installer/helm/chart/volcano/config/volcano-scheduler.conf @@ -2,14 +2,13 @@ actions: "enqueue, allocate, backfill" tiers: - plugins: - name: priority - - name: gang - enablePreemptable: false +# - name: gang - name: conformance - plugins: - - name: overcommit - - name: drf - enablePreemptable: false + - name: fit +# - name: proportion +# - name: overcommit +# - name: drf - name: predicates - - name: proportion - - name: nodeorder - - name: binpack +# - name: nodeorder +# - name: binpack diff --git a/installer/helm/chart/volcano/templates/admission.yaml b/installer/helm/chart/volcano/templates/admission.yaml index 892a0659fa..7609ba777d 100644 --- a/installer/helm/chart/volcano/templates/admission.yaml +++ b/installer/helm/chart/volcano/templates/admission.yaml @@ -12,11 +12,15 @@ kind: ServiceAccount metadata: name: {{ .Release.Name }}-admission namespace: {{ .Release.Namespace }} + annotations: + "helm.sh/hook": pre-install,pre-upgrade --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: {{ .Release.Name }}-admission + annotations: + "helm.sh/hook": pre-install,pre-upgrade rules: - apiGroups: [""] resources: ["configmaps"] @@ -49,6 +53,8 @@ kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: {{ .Release.Name }}-admission-role + annotations: + "helm.sh/hook": pre-install,pre-upgrade subjects: - kind: ServiceAccount name: {{ .Release.Name }}-admission @@ -139,6 +145,10 @@ metadata: namespace: {{ .Release.Namespace }} labels: app: volcano-admission-init + annotations: + "helm.sh/hook": pre-install + "helm.sh/hook-weight": "0" + "helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded spec: backoffLimit: 3 template: diff --git a/installer/helm/chart/volcano/templates/scheduler.yaml b/installer/helm/chart/volcano/templates/scheduler.yaml index 9e0b369503..d97a589018 100644 --- a/installer/helm/chart/volcano/templates/scheduler.yaml +++ b/installer/helm/chart/volcano/templates/scheduler.yaml @@ -125,7 +125,7 @@ spec: - --scheduler-conf=/volcano.scheduler/{{base .Values.basic.scheduler_config_file}} - --enable-healthz=true - --enable-metrics=true - - -v=3 + - -v=5 - 2>&1 imagePullPolicy: {{ .Values.basic.image_pull_policy }} volumeMounts: diff --git a/installer/helm/chart/volcano/values.yaml b/installer/helm/chart/volcano/values.yaml index d8bfc58b20..bd1b535429 100644 --- a/installer/helm/chart/volcano/values.yaml +++ b/installer/helm/chart/volcano/values.yaml @@ -1,13 +1,13 @@ basic: - controller_image_name: "volcanosh/vc-controller-manager" - scheduler_image_name: "volcanosh/vc-scheduler" - admission_image_name: "volcanosh/vc-webhook-manager" + image_tag_version: "sha-7e1b436" + controller_image_name: "public.ecr.aws/n9u9x7z1/volcano-controller-manager" + scheduler_image_name: "public.ecr.aws/n9u9x7z1/volcano-scheduler" + admission_image_name: "public.ecr.aws/n9u9x7z1/volcano-webhook-manager" admission_secret_name: "volcano-admission-secret" admission_config_file: "config/volcano-admission.conf" scheduler_config_file: "config/volcano-scheduler.conf" image_pull_secret: "" image_pull_policy: "IfNotPresent" - image_tag_version: "latest" admission_port: 8443 custom: metrics_enable: false diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 0b3458bec3..76a944f400 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -177,10 +177,13 @@ func (alloc *Action) Execute(ssn *framework.Session) { break } - predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, predicateFn, true) + predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, predicateFn, false) if len(predicateNodes) == 0 { + klog.V(3).Infof("PredicateNodes for task %s/%s found: %v", task.Namespace, task.Name, fitErrors.Error()) job.NodesFitErrors[task.UID] = fitErrors - break + // Don't break the loop here. We need to perform this check for all tasks to ensure they have proper NodeFitErrors set (if applicable) + // so that the right pod condition is set for the cluster autoscaler. + continue } var candidateNodes []*api.NodeInfo diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index b0b04b980b..21729e0867 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -651,7 +651,6 @@ func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string) ctx = *taskInfo.LastTransaction } - msg = ji.JobFitErrors switch status := ctx.Status; status { case Allocated, Pipelined: // Pod is schedulable @@ -662,13 +661,13 @@ func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string) return PodReasonSchedulable, msg case Pending: if fe := ji.NodesFitErrors[tid]; fe != nil { - // Pod is not schedulable - return PodReasonUnschedulable, fe.Error() + // Pod is not schedulable on currently available nodes. We want to set 'Unschedulable' as the reason to trigger the cluster autoscaler. + return PodReasonUnschedulable, fmt.Sprintf("fiterr: %s", fe.Error()) } - // Pod is not scheduled yet, keep UNSCHEDULABLE as the reason to support cluster autoscaler - return PodReasonUnschedulable, msg + // Pod hasn't cleared the enqueue phase yet. Use the 'Ineligible' status to bypass the cluster autoscaler. + return PodReasonIneligible, "pod is not yet eligible to schedule" default: - return status.String(), msg + return status.String(), ji.JobFitErrors } } diff --git a/pkg/scheduler/api/unschedule_info.go b/pkg/scheduler/api/unschedule_info.go index e1c6310ac3..baec7d98bb 100644 --- a/pkg/scheduler/api/unschedule_info.go +++ b/pkg/scheduler/api/unschedule_info.go @@ -18,6 +18,9 @@ const ( // These are reasons for a pod's transition to a condition. const ( + // PodReasonIneligible reason means that the pod is not currently eligible for scheduling on a node, + // for example due to queue constraints. + PodReasonIneligible = "Ineligible" // PodReasonUnschedulable reason in PodScheduled PodCondition means that the scheduler // can't schedule the pod right now, for example due to insufficient resources in the cluster. // It can also mean that the scheduler skips scheduling the pod which left the pod `Undetermined`, diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 4db20196c5..de34e46412 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -257,7 +257,7 @@ func podConditionHaveUpdate(status *v1.PodStatus, condition *v1.PodCondition) bo // UpdatePodCondition will Update pod with podCondition func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.PodCondition) (*v1.Pod, error) { - klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status) + klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s), reason %s, msg %s", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason, condition.Message) if podutil.UpdatePodCondition(&pod.Status, condition) { return su.kubeclient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}) } @@ -869,6 +869,8 @@ func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason Message: message, } + klog.V(4).Infof("task unscheduleable %s/%s, reason: %s message: %s", pod.Namespace, pod.Name, reason, message) + if podConditionHaveUpdate(&pod.Status, condition) { pod = pod.DeepCopy() diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index bf4bdcc191..86e1be9ed9 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -265,13 +265,14 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) scheduling.PodGroupStatus { } } - // If there're enough allocated resource, it's running - if int32(allocated) >= jobInfo.PodGroup.Spec.MinMember { + if int32(len(jobInfo.TaskStatusIndex[api.Running])) >= jobInfo.PodGroup.Spec.MinMember { status.Phase = scheduling.PodGroupRunning // If all allocated tasks is succeeded, it's completed if len(jobInfo.TaskStatusIndex[api.Succeeded]) == allocated { status.Phase = scheduling.PodGroupCompleted } + } else if int32(allocated) >= jobInfo.PodGroup.Spec.MinMember { + status.Phase = scheduling.PodGroupAllocated } else if jobInfo.PodGroup.Status.Phase != scheduling.PodGroupInqueue { status.Phase = scheduling.PodGroupPending } diff --git a/pkg/scheduler/plugins/factory.go b/pkg/scheduler/plugins/factory.go index 518f1ae038..83dba10ee2 100644 --- a/pkg/scheduler/plugins/factory.go +++ b/pkg/scheduler/plugins/factory.go @@ -23,6 +23,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/plugins/conformance" "volcano.sh/volcano/pkg/scheduler/plugins/drf" "volcano.sh/volcano/pkg/scheduler/plugins/extender" + "volcano.sh/volcano/pkg/scheduler/plugins/fit" "volcano.sh/volcano/pkg/scheduler/plugins/gang" "volcano.sh/volcano/pkg/scheduler/plugins/nodeorder" "volcano.sh/volcano/pkg/scheduler/plugins/numaaware" @@ -55,6 +56,7 @@ func init() { framework.RegisterPluginBuilder(cdp.PluginName, cdp.New) framework.RegisterPluginBuilder(rescheduling.PluginName, rescheduling.New) framework.RegisterPluginBuilder(usage.PluginName, usage.New) + framework.RegisterPluginBuilder(fit.PluginName, fit.New) // Plugins for Queues framework.RegisterPluginBuilder(proportion.PluginName, proportion.New) diff --git a/pkg/scheduler/plugins/fit/fit.go b/pkg/scheduler/plugins/fit/fit.go new file mode 100644 index 0000000000..431790e683 --- /dev/null +++ b/pkg/scheduler/plugins/fit/fit.go @@ -0,0 +1,205 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fit + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/klog" + "math" + + "volcano.sh/apis/pkg/apis/scheduling" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/metrics" + "volcano.sh/volcano/pkg/scheduler/plugins/util" +) + +// PluginName indicates name of volcano scheduler plugin. +const PluginName = "fit" + +type fitPlugin struct { + //totalResource *api.Resource + //totalGuarantee *api.Resource + queueOpts map[api.QueueID]*queueAttr + // Arguments given for the plugin + pluginArguments framework.Arguments +} + +type queueAttr struct { + queueID api.QueueID + name string + weight int32 + share float64 + + allocated *api.Resource + request *api.Resource + // elastic represents the sum of job's elastic resource, job's elastic = job.allocated - job.minAvailable + elastic *api.Resource + // inqueue represents the resource request of the inqueue job + inqueue *api.Resource + capability *api.Resource +} + +// New return proportion action +func New(arguments framework.Arguments) framework.Plugin { + return &fitPlugin{ + queueOpts: map[api.QueueID]*queueAttr{}, + pluginArguments: arguments, + } +} + +func (pp *fitPlugin) Name() string { + return PluginName +} + +func (pp *fitPlugin) OnSessionOpen(ssn *framework.Session) { + // Build attributes for Queues. + for _, job := range ssn.Jobs { + klog.V(4).Infof("Considering Job <%s/%s>.", job.Namespace, job.Name) + if _, found := pp.queueOpts[job.Queue]; !found { + queue := ssn.Queues[job.Queue] + attr := &queueAttr{ + queueID: queue.UID, + name: queue.Name, + weight: queue.Weight, + + allocated: api.EmptyResource(), + request: api.EmptyResource(), + elastic: api.EmptyResource(), + inqueue: api.EmptyResource(), + } + if len(queue.Queue.Spec.Capability) != 0 { + attr.capability = api.NewResource(queue.Queue.Spec.Capability) + if attr.capability.MilliCPU <= 0 { + attr.capability.MilliCPU = math.MaxFloat64 + } + if attr.capability.Memory <= 0 { + attr.capability.Memory = math.MaxFloat64 + } + } + pp.queueOpts[job.Queue] = attr + klog.V(4).Infof("Added Queue <%s> attributes.", job.Queue) + } + + attr := pp.queueOpts[job.Queue] + for status, tasks := range job.TaskStatusIndex { + if api.AllocatedStatus(status) { + for _, t := range tasks { + attr.allocated.Add(t.Resreq) + attr.request.Add(t.Resreq) + } + } else if status == api.Pending { + for _, t := range tasks { + attr.request.Add(t.Resreq) + } + } + } + + if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { + attr.inqueue.Add(job.GetMinResources()) + } + + // calculate inqueue resource for running jobs + // the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition: + // Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement. + if (job.PodGroup.Status.Phase == scheduling.PodGroupRunning || job.PodGroup.Status.Phase == scheduling.PodGroupAllocated) && + job.PodGroup.Spec.MinResources != nil && + int32(util.CalculateAllocatedTaskNum(job)) >= job.PodGroup.Spec.MinMember { + allocated := util.GetAllocatedResource(job) + inqueued := util.GetInqueueResource(job, allocated) + attr.inqueue.Add(inqueued) + } + attr.elastic.Add(job.GetElasticResources()) + klog.V(5).Infof("Queue %s allocated <%s> request <%s> inqueue <%s> elastic <%s>", + attr.name, attr.allocated.String(), attr.request.String(), attr.inqueue.String(), attr.elastic.String()) + } + + for queueID, queueInfo := range ssn.Queues { + if _, ok := pp.queueOpts[queueID]; !ok { + metrics.UpdateQueueAllocated(queueInfo.Name, 0, 0) + } + } + + // Record metrics + for _, attr := range pp.queueOpts { + metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) + metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory) + metrics.UpdateQueueWeight(attr.name, attr.weight) + queue := ssn.Queues[attr.queueID] + metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue) + metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending) + metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running) + metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown) + } + + ssn.AddJobEnqueueableFn(pp.Name(), func(obj interface{}) int { + job := obj.(*api.JobInfo) + queueID := job.Queue + attr := pp.queueOpts[queueID] + queue := ssn.Queues[queueID] + // If no capability is set, always enqueue the job. + if attr.capability == nil { + klog.V(4).Infof("Capability of queue <%s> was not set, allow job <%s/%s> to Inqueue.", + queue.Name, job.Namespace, job.Name) + return util.Permit + } + + if job.PodGroup.Spec.MinResources == nil { + klog.V(4).Infof("job %s MinResources is null.", job.Name) + return util.Permit + } + minReq := job.GetMinResources() + + klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>", + job.Name, minReq.String(), queue.Name, attr.capability.String(), attr.allocated.String(), attr.inqueue.String(), attr.elastic.String()) + // The queue resource quota limit has not reached + inqueue := minReq.Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic).LessEqual(attr.capability, api.Infinity) + klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue) + if inqueue { + attr.inqueue.Add(job.GetMinResources()) + return util.Permit + } + ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "resource requirements cannot currently fit in queue") + return util.Reject + }) + + // Register event handlers. + ssn.AddEventHandler(&framework.EventHandler{ + AllocateFunc: func(event *framework.Event) { + job := ssn.Jobs[event.Task.Job] + attr := pp.queueOpts[job.Queue] + attr.allocated.Add(event.Task.Resreq) + metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) + + klog.V(4).Infof("Fit AllocateFunc: task <%v/%v>, resreq <%v>, share <%v>", + event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) + }, + DeallocateFunc: func(event *framework.Event) { + job := ssn.Jobs[event.Task.Job] + attr := pp.queueOpts[job.Queue] + attr.allocated.Sub(event.Task.Resreq) + metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) + + klog.V(4).Infof("Fit EvictFunc: task <%v/%v>, resreq <%v>, share <%v>", + event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) + }, + }) +} + +func (pp *fitPlugin) OnSessionClose(ssn *framework.Session) { + pp.queueOpts = nil +} diff --git a/pkg/scheduler/plugins/overcommit/overcommit.go b/pkg/scheduler/plugins/overcommit/overcommit.go index a8ca5b2a63..ee5a34310c 100644 --- a/pkg/scheduler/plugins/overcommit/overcommit.go +++ b/pkg/scheduler/plugins/overcommit/overcommit.go @@ -98,7 +98,7 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) { // calculate inqueue resource for running jobs // the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition: // Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement. - if job.PodGroup.Status.Phase == scheduling.PodGroupRunning && + if (job.PodGroup.Status.Phase == scheduling.PodGroupRunning || job.PodGroup.Status.Phase == scheduling.PodGroupAllocated) && job.PodGroup.Spec.MinResources != nil && int32(util.CalculateAllocatedTaskNum(job)) >= job.PodGroup.Spec.MinMember { allocated := util.GetAllocatedResource(job) diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 86b85794c4..939c3306ae 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -148,7 +148,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { // calculate inqueue resource for running jobs // the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition: // Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement. - if job.PodGroup.Status.Phase == scheduling.PodGroupRunning && + if (job.PodGroup.Status.Phase == scheduling.PodGroupRunning || job.PodGroup.Status.Phase == scheduling.PodGroupAllocated) && job.PodGroup.Spec.MinResources != nil && int32(util.CalculateAllocatedTaskNum(job)) >= job.PodGroup.Spec.MinMember { allocated := util.GetAllocatedResource(job) diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index 114851b41d..971cf8792d 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -54,6 +54,10 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI klog.V(4).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>", task.Namespace, task.Name, node.Name, task.Resreq, node.Idle) + // Naive caching of predicate failures doesn't work in the presence of cluster autoscaling + // for GPU nodes. Volcano would see the nodes before the nvidia plugin is installed (i.e., before + // the GPU(s) are visible), fail the matching for GPU pods, and then never properly check again. + // // Check if the task had "predicate" failure before. // And then check if the task failed to predict on this node before. if enableErrorCache && taskFailedBefore { diff --git a/skaffold.yaml b/skaffold.yaml new file mode 100644 index 0000000000..1a12171076 --- /dev/null +++ b/skaffold.yaml @@ -0,0 +1,25 @@ +apiVersion: skaffold/v2beta28 +kind: Config + +metadata: + name: docker + +profiles: + # skaffold build -p volcano --default-repo=public.ecr.aws/n9u9x7z1 + - name: volcano + build: + artifacts: + - image: volcano-scheduler + docker: + dockerfile: installer/dockerfile/scheduler/Dockerfile + - image: volcano-webhook-manager + docker: + dockerfile: installer/dockerfile/webhook-manager/Dockerfile + - image: volcano-controller-manager + docker: + dockerfile: installer/dockerfile/controller-manager/Dockerfile + tagPolicy: + gitCommit: + variant: AbbrevCommitSha + ignoreChanges: true + platforms: ["linux/amd64"] diff --git a/vendor/modules.txt b/vendor/modules.txt index 15c9fe49eb..b35051c9ef 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -992,7 +992,7 @@ sigs.k8s.io/yaml # stathat.com/c/consistent v1.0.0 ## explicit stathat.com/c/consistent -# volcano.sh/apis v1.6.0-alpha.0.0.20230214095022-ad92502b1a57 +# volcano.sh/apis v1.6.0-alpha.0.0.20221021034835-d3a04f5cfc7c => github.com/predibase/volcano-apis v0.0.0-20230208222101-1946093f2249 ## explicit; go 1.19 volcano.sh/apis/pkg/apis/batch/v1alpha1 volcano.sh/apis/pkg/apis/bus/v1alpha1 @@ -1063,3 +1063,4 @@ volcano.sh/apis/pkg/client/listers/scheduling/v1beta1 # k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.25.0 # k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.25.0 # k8s.io/sample-controller => k8s.io/sample-controller v0.25.0 +# volcano.sh/apis => github.com/predibase/volcano-apis v0.0.0-20230208222101-1946093f2249 diff --git a/vendor/volcano.sh/apis/pkg/apis/scheduling/types.go b/vendor/volcano.sh/apis/pkg/apis/scheduling/types.go index a7435e260b..ff0370f065 100644 --- a/vendor/volcano.sh/apis/pkg/apis/scheduling/types.go +++ b/vendor/volcano.sh/apis/pkg/apis/scheduling/types.go @@ -41,6 +41,9 @@ const ( // enough resources to it. PodGroupPending PodGroupPhase = "Pending" + // PodGroupAllocated means `spec.minMember` underlying tasks have successfully been allocated resources. + PodGroupAllocated PodGroupPhase = "Allocated" + // PodGroupRunning means `spec.minMember` pods of PodGroup has been in running phase. PodGroupRunning PodGroupPhase = "Running"