diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 301f12f3..7fa11052 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -27,7 +27,8 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - envtest_k8s_version: [1.23.5, 1.27.1, 1.31.0, 1.32.0] + # from https://github.com/kubernetes-sigs/controller-tools/blob/main/envtest-releases.yaml + envtest_k8s_version: [1.23.5, 1.33.0] steps: - name: Clone the code uses: actions/checkout@v4 diff --git a/.gitignore b/.gitignore index 8bd7b98a..46e64ddf 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ tmp* __debug* vendor +logs diff --git a/.vscode/launch.json b/.vscode/launch.json index ebf461d8..814deb54 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -64,6 +64,7 @@ "ENABLE_CR_CONTROLLER": "true" }, "args": [ + "--metrics-path", "${workspaceFolder}/logs/metrics.log", "--gpu-info-config", "${workspaceFolder}/config/samples/gpu-info-config.yaml", "--dynamic-config", "${workspaceFolder}/config/samples/dynamic-config.yaml", "--scheduler-config", "${workspaceFolder}/config/samples/scheduler-config.yaml", diff --git a/charts/tensor-fusion/Chart.yaml b/charts/tensor-fusion/Chart.yaml index 1cf16079..fe556111 100644 --- a/charts/tensor-fusion/Chart.yaml +++ b/charts/tensor-fusion/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.5.3 +version: 1.5.4 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/tensor-fusion/values-cn.yaml b/charts/tensor-fusion/values-cn.yaml index d83dfcd4..6d771228 100644 --- a/charts/tensor-fusion/values-cn.yaml +++ b/charts/tensor-fusion/values-cn.yaml @@ -7,9 +7,9 @@ controller: repository: registry.cn-hangzhou.aliyuncs.com/tensorfusion/tensor-fusion-operator vectorAgentImage: docker.m.daocloud.io/timberio/vector:latest-alpine -admissionWebhooks: - patch: - image: k8s.m.daocloud.io/ingress-nginx/kube-webhook-certgen:v1.5.0 + admissionWebhooks: + patch: + image: k8s.m.daocloud.io/ingress-nginx/kube-webhook-certgen:v1.5.0 agent: image: diff --git a/config/samples/gpu-info-config.yaml b/config/samples/gpu-info-config.yaml index 791af254..c6279171 100644 --- a/config/samples/gpu-info-config.yaml +++ b/config/samples/gpu-info-config.yaml @@ -9,3 +9,15 @@ vendor: NVIDIA costPerHour: 1.89 fp16TFlops: 312 + +- model: NVIDIA 3090 + fullModelName: "NVIDIA GeForce RTX 3090" + vendor: NVIDIA + costPerHour: 0.3 + fp16TFlops: 71.2 + +- model: NVIDIA 5060 Ti + fullModelName: "NVIDIA GeForce RTX 5060 Ti" + vendor: NVIDIA + costPerHour: 0.25 + fp16TFlops: 66 \ No newline at end of file diff --git a/internal/constants/env.go b/internal/constants/env.go index 69bf009b..9a7679f3 100644 --- a/internal/constants/env.go +++ b/internal/constants/env.go @@ -31,6 +31,9 @@ const ( KubeletDevicePluginVolumeName = "device-plugin" KubeletDevicePluginPath = "/var/lib/kubelet/device-plugins" + KubeletPodResourcesVolumeName = "pod-resources" + KubeletPodResourcesPath = "/var/lib/kubelet/pod-resources" + TensorFusionVectorConfigName = "tensor-fusion-sys-vector-config" TensorFusionVectorConfigVolumeName = "vector-config" TensorFusionVectorConfigMountPath = "/etc/vector/vector.yaml" diff --git a/internal/controller/gpunode_controller.go b/internal/controller/gpunode_controller.go index c3c9cfe6..8e18b62b 100644 --- a/internal/controller/gpunode_controller.go +++ b/internal/controller/gpunode_controller.go @@ -158,6 +158,7 @@ func (r *GPUNodeReconciler) checkStatusAndUpdateVirtualCapacity(ctx context.Cont if err != nil { return fmt.Errorf("failed to update GPU node status to pending: %w", err) } + metrics.SetNodeMetrics(node, poolObj, nil) } err := r.syncStatusToGPUDevices(ctx, node, tfv1.TensorFusionGPUPhasePending) @@ -172,7 +173,7 @@ func (r *GPUNodeReconciler) checkStatusAndUpdateVirtualCapacity(ctx context.Cont return err } if len(gpuModels) == 0 { - // when GPU created, will trigger next reconcile + log.FromContext(ctx).Info("GPU models not found, skip update", "node", node.Name) return nil } @@ -290,6 +291,16 @@ func (r *GPUNodeReconciler) reconcileNodeDiscoveryJob( } } + if job.Status.Failed > 0 { + log.Info("node discovery job failed, update GPU node status to failed", "node", gpunode.Name) + // Update phase to failed, require manual address why it failed and restart of node discovery job + gpunode.Status.Phase = tfv1.TensorFusionGPUNodePhaseFailed + if err := r.Status().Update(ctx, gpunode); err != nil { + return fmt.Errorf("failed to update GPU node status to failed: %w", err) + } + metrics.SetNodeMetrics(gpunode, pool, nil) + } + return nil } diff --git a/internal/controller/tensorfusioncluster_controller.go b/internal/controller/tensorfusioncluster_controller.go index cd8ed2f2..b830b270 100644 --- a/internal/controller/tensorfusioncluster_controller.go +++ b/internal/controller/tensorfusioncluster_controller.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strconv" + "strings" "sync" "golang.org/x/time/rate" @@ -325,8 +326,8 @@ func (r *TensorFusionClusterReconciler) reconcileGPUPool(ctx context.Context, tf errors = append(errors, fmt.Errorf("failed to update GPUPool %s: %w", key, err)) } anyPoolChanged = true - r.updateMetricsRecorder(ctx, existingPool) } + r.updateMetricsRecorder(ctx, existingPool) } } @@ -444,6 +445,7 @@ func (r *TensorFusionClusterReconciler) SetupWithManager(mgr ctrl.Manager, addLi // Update metrics recorder's raw billing map func (r *TensorFusionClusterReconciler) updateMetricsRecorder(ctx context.Context, pool *tfv1.GPUPool) { + const dollarSign = "$" log := log.FromContext(ctx) if pool.Spec.QosConfig == nil { log.Info("QosConfig is nil, skip updating metrics recorder", "pool", pool.Name) @@ -456,8 +458,8 @@ func (r *TensorFusionClusterReconciler) updateMetricsRecorder(ctx context.Contex } pricingDetail := r.MetricsRecorder.WorkerUnitPriceMap[pool.Name] for _, pricing := range qosConfig.Pricing { - tflopsPerHour, _ := strconv.ParseFloat(pricing.Requests.PerFP16TFlopsPerHour, 64) - vramPerHour, _ := strconv.ParseFloat(pricing.Requests.PerGBOfVRAMPerHour, 64) + tflopsPerHour, _ := strconv.ParseFloat(strings.TrimPrefix(pricing.Requests.PerFP16TFlopsPerHour, dollarSign), 64) + vramPerHour, _ := strconv.ParseFloat(strings.TrimPrefix(pricing.Requests.PerGBOfVRAMPerHour, dollarSign), 64) limitOverRequestChargingRatio, _ := strconv.ParseFloat(pricing.LimitsOverRequestsChargingRatio, 64) pricingDetail[string(pricing.Qos)] = metrics.RawBillingPricing{ diff --git a/internal/gpuallocator/gpuallocator.go b/internal/gpuallocator/gpuallocator.go index 332d4fbc..e47fb46a 100644 --- a/internal/gpuallocator/gpuallocator.go +++ b/internal/gpuallocator/gpuallocator.go @@ -580,6 +580,12 @@ func (s *GpuAllocator) Score(ctx context.Context, cfg *config.GPUFitConfig, req for nodeName, gpus := range validNodeGPUs { for _, gpu := range gpus { res := strategy.Score(gpu) + + // making Pending GPU to lower score, prefer not scheduling to them + if gpu.Status.Phase == tfv1.TensorFusionGPUPhasePending { + res = res / 4 + } + if _, exists := result[nodeName]; !exists { result[nodeName] = make(map[string]int, len(gpus)) } diff --git a/internal/metrics/migrate.go b/internal/metrics/migrate.go index 42fc4808..4a0bb0e8 100644 --- a/internal/metrics/migrate.go +++ b/internal/metrics/migrate.go @@ -8,9 +8,9 @@ var TFVersionMigrationMap = []struct { }{ // init version, just run init SQL {"1.0", []string{ - "CREATE TABLE IF NOT EXISTS tf_worker_resources (\n `worker` String NULL SKIPPING INDEX,\n `workload` String NULL INVERTED INDEX,\n `pool` String NULL INVERTED INDEX,\n `namespace` String NULL INVERTED INDEX,\n `qos` String NULL,\n `tflops_request` Double NULL,\n `tflops_limit` Double NULL,\n `vram_bytes_request` Double NULL,\n `vram_bytes_limit` Double NULL,\n `gpu_count` BigInt NULL,\n `raw_cost` Double NULL,\n `ts` Timestamp_ms TIME INDEX,\n PRIMARY KEY (`worker`, `workload`, `pool`, `namespace`))\n ENGINE=mito WITH( ttl='30d', merge_mode = 'last_non_null')", + "CREATE TABLE IF NOT EXISTS tf_worker_resources (\n `worker` String NULL SKIPPING INDEX,\n `workload` String NULL INVERTED INDEX,\n `pool` String NULL INVERTED INDEX,\n `namespace` String NULL INVERTED INDEX,\n `qos` String NULL,\n `tflops_request` Double NULL,\n `tflops_limit` Double NULL,\n `vram_bytes_request` Double NULL,\n `vram_bytes_limit` Double NULL,\n `gpu_count` BigInt NULL,\n `raw_cost` Double NULL,\n `ready` String NULL,\n `ts` Timestamp_ms TIME INDEX,\n PRIMARY KEY (`worker`, `workload`, `pool`, `namespace`))\n ENGINE=mito WITH( ttl='30d', merge_mode = 'last_non_null')", - "CREATE TABLE IF NOT EXISTS tf_node_resources (\n `node` String NULL INVERTED INDEX,\n `pool` String NULL INVERTED INDEX,\n `allocated_tflops` Double NULL,\n `allocated_tflops_percent` Double NULL,\n `allocated_vram_bytes` Double NULL,\n `allocated_vram_percent` Double NULL,\n `allocated_tflops_percent_virtual` Double NULL,\n `allocated_vram_percent_virtual` Double NULL,\n `raw_cost` Double NULL,\n `gpu_count` BigInt NULL,\n `ts` Timestamp_ms TIME INDEX,\n PRIMARY KEY (`node`, `pool`))\n ENGINE=mito WITH( ttl='30d', merge_mode = 'last_non_null')", + "CREATE TABLE IF NOT EXISTS tf_node_resources (\n `node` String NULL INVERTED INDEX,\n `pool` String NULL INVERTED INDEX,\n `phase` String NULL INVERTED INDEX,\n `allocated_tflops` Double NULL,\n `allocated_tflops_percent` Double NULL,\n `allocated_vram_bytes` Double NULL,\n `allocated_vram_percent` Double NULL,\n `allocated_tflops_percent_virtual` Double NULL,\n `allocated_vram_percent_virtual` Double NULL,\n `raw_cost` Double NULL,\n `gpu_count` BigInt NULL,\n `ts` Timestamp_ms TIME INDEX,\n PRIMARY KEY (`node`, `pool`, `phase`))\n ENGINE=mito WITH( ttl='30d', merge_mode = 'last_non_null')", "CREATE TABLE IF NOT EXISTS tf_system_metrics (\n `pool` String NULL INVERTED INDEX,\n `total_workers_cnt` BigInt NULL,\n `total_nodes_cnt` BigInt NULL,\n `total_allocation_fail_cnt` BigInt NULL,\n `total_allocation_success_cnt` BigInt NULL,\n `total_scale_up_cnt` BigInt NULL,\n `total_scale_down_cnt` BigInt NULL,\n `ts` Timestamp_ms TIME INDEX,\n PRIMARY KEY (`pool`))\n ENGINE=mito WITH( ttl='30d', merge_mode = 'last_non_null')", diff --git a/internal/metrics/recorder.go b/internal/metrics/recorder.go index 77025744..e73c5b4d 100644 --- a/internal/metrics/recorder.go +++ b/internal/metrics/recorder.go @@ -93,6 +93,7 @@ func SetWorkerMetricsByWorkload(pod *corev1.Pod) { metricsItem.TflopsLimit = gpuLimitResource.Tflops.AsApproximateFloat64() metricsItem.VramBytesRequest = gpuRequestResource.Vram.AsApproximateFloat64() metricsItem.VramBytesLimit = gpuLimitResource.Vram.AsApproximateFloat64() + metricsItem.Ready = utils.IsPodConditionTrue(pod.Status.Conditions, corev1.PodReady) if count <= 0 || count > uint64(math.MaxInt32) { // handle invalid or out-of-bounds data metricsItem.GPUCount = 1 @@ -116,6 +117,7 @@ func SetNodeMetrics(node *tfv1.GPUNode, poolObj *tfv1.GPUPool, gpuModels []strin // Fields that possibly change after initialization metricsItem := nodeMetricsMap[node.Name] metricsItem.PoolName = poolObj.Name + metricsItem.Phase = string(node.Status.Phase) metricsItem.SetGPUModelAndCount(gpuModels) totalTflops := node.Status.TotalTFlops.AsApproximateFloat64() @@ -279,6 +281,7 @@ func (mr *MetricsRecorder) RecordMetrics(writer io.Writer) { enc.AddField("raw_cost", metrics.RawCost) enc.AddField("vram_bytes_limit", metrics.VramBytesLimit) enc.AddField("vram_bytes_request", metrics.VramBytesRequest) + enc.AddField("ready", metrics.Ready) enc.EndLine(now) } @@ -302,6 +305,7 @@ func (mr *MetricsRecorder) RecordMetrics(writer io.Writer) { enc.AddTag("node", metrics.NodeName) enc.AddTag("pool", metrics.PoolName) + enc.AddTag("phase", metrics.Phase) enc.AddField("allocated_tflops", metrics.AllocatedTflops) enc.AddField("allocated_tflops_percent", metrics.AllocatedTflopsPercent) @@ -371,7 +375,7 @@ func (mr *MetricsRecorder) getWorkerRawCost(metrics *WorkerResourceMetrics, dura func (mr *MetricsRecorder) getNodeRawCost(metrics *NodeResourceMetrics, duration time.Duration, hourlyUnitPriceMap map[string]float64) float64 { cost := 0.0 for _, gpuModel := range metrics.gpuModels { - cost += metrics.AllocatedTflops * duration.Hours() * hourlyUnitPriceMap[gpuModel] + cost += duration.Seconds() * hourlyUnitPriceMap[gpuModel] / 3600.0 } return cost } diff --git a/internal/metrics/types.go b/internal/metrics/types.go index 593df004..c37bb321 100644 --- a/internal/metrics/types.go +++ b/internal/metrics/types.go @@ -42,6 +42,7 @@ type WorkerResourceMetrics struct { VramBytesLimit float64 `json:"vramBytesLimit" gorm:"column:vram_bytes_limit"` GPUCount int `json:"gpuCount" gorm:"column:gpu_count"` RawCost float64 `json:"rawCost" gorm:"column:raw_cost"` + Ready bool `json:"ready" gorm:"column:ready"` // NOTE: make sure new fields will be migrated in SetupTable function @@ -60,6 +61,7 @@ func (wm WorkerResourceMetrics) TableName() string { type NodeResourceMetrics struct { NodeName string `json:"nodeName" gorm:"column:node;index:,class:INVERTED"` PoolName string `json:"poolName" gorm:"column:pool;index:,class:INVERTED"` + Phase string `json:"phase" gorm:"column:phase;index:,class:INVERTED"` AllocatedTflops float64 `json:"allocatedTflops" gorm:"column:allocated_tflops"` AllocatedTflopsPercent float64 `json:"allocatedTflopsPercent" gorm:"column:allocated_tflops_percent"` @@ -86,6 +88,9 @@ func (nm NodeResourceMetrics) TableName() string { } func (nm *NodeResourceMetrics) SetGPUModelAndCount(gpuModels []string) { + if gpuModels == nil { + return + } nm.gpuModels = gpuModels nm.GPUCount = len(gpuModels) } diff --git a/internal/utils/compose.go b/internal/utils/compose.go index 96cdbad5..e161bf50 100644 --- a/internal/utils/compose.go +++ b/internal/utils/compose.go @@ -387,6 +387,14 @@ func AddTFHypervisorConfAfterTemplate(ctx context.Context, spec *v1.PodSpec, poo Type: ptr.To(v1.HostPathDirectoryOrCreate), }, }, + }, v1.Volume{ + Name: constants.KubeletPodResourcesVolumeName, + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: constants.KubeletPodResourcesPath, + Type: ptr.To(v1.HostPathDirectoryOrCreate), + }, + }, }) composeHypervisorInitContainer(spec, pool) @@ -430,6 +438,9 @@ func composeHypervisorContainer(spec *v1.PodSpec, pool *tfv1.GPUPool, enableVect }, v1.VolumeMount{ Name: constants.KubeletDevicePluginVolumeName, MountPath: constants.KubeletDevicePluginPath, + }, v1.VolumeMount{ + Name: constants.KubeletPodResourcesVolumeName, + MountPath: constants.KubeletPodResourcesPath, }) if enableVector { spec.Containers[0].VolumeMounts = append(spec.Containers[0].VolumeMounts, v1.VolumeMount{