Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions pkg/cache/scheduler/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ func (c *clusterQueue) deleteWorkload(log logr.Logger, wlKey workload.Reference)
}

func (c *clusterQueue) reportActiveWorkloads() {
if c.HasParent() {
for ancestor := range c.Parent().PathSelfToRoot() {
metrics.ReportCohortSubtreeAdmittedActiveWorkloads(ancestor.Name, c.admittedWorkloadsCount, c.customMetricLabelValues, c.roleTracker)
}
}
metrics.ReportAdmittedActiveWorkloads(c.Name, c.admittedWorkloadsCount, c.customMetricLabelValues, c.roleTracker)
metrics.ReportReservingActiveWorkloads(c.Name, len(c.Workloads), c.customMetricLabelValues, c.roleTracker)
}
Expand Down Expand Up @@ -544,17 +549,23 @@ func (c *clusterQueue) updateWorkloadUsage(log logr.Logger, wi *workload.Info, o
}
}
c.updateWorkloadTASUsage(log, wi, op)
signedOne := op.asSignedOne()
if admitted {
updateFlavorUsage(frUsage, c.AdmittedUsage, op)
c.admittedWorkloadsCount += op.asSignedOne()
if c.HasParent() {
for ancestor := range c.Parent().PathSelfToRoot() {
ancestor.admittedWorkloadsCount += signedOne
}
}
c.admittedWorkloadsCount += signedOne
}
qKey := queue.KeyFromWorkload(wi.Obj)
if lq, ok := c.localQueues[qKey]; ok {
updateFlavorUsage(frUsage, lq.totalReserved, op)
lq.reservingWorkloads += op.asSignedOne()
lq.reservingWorkloads += signedOne
if admitted {
lq.updateAdmittedUsage(frUsage, op)
lq.admittedWorkloads += op.asSignedOne()
lq.admittedWorkloads += signedOne
}
if features.Enabled(features.LocalQueueMetrics) {
lq.reportActiveWorkloads(c.roleTracker)
Expand Down
2 changes: 2 additions & 0 deletions pkg/cache/scheduler/cohort.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type cohort struct {
resourceNode resourceNode

FairWeight float64

admittedWorkloadsCount int
}

func newCohort(name kueue.CohortReference) *cohort {
Expand Down
18 changes: 18 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ var (
// +metricsdoc:group=cohort
// +metricsdoc:labels=cohort="the name of the Cohort",priority_class="the priority class name",replica_role="one of `leader`, `follower`, or `standalone`"
CohortSubtreeAdmittedWorkloadsTotal *prometheus.CounterVec

// +metricsdoc:group=cohort
// +metricsdoc:labels=cohort="the name of the Cohort",replica_role="one of `leader`, `follower`, or `standalone`"
CohortSubtreeAdmittedActiveWorkloads *prometheus.GaugeVec
)

func trackGaugeVec(g *prometheus.GaugeVec) *prometheus.GaugeVec {
Expand Down Expand Up @@ -740,6 +744,13 @@ If the Cohort has a weight of zero and is borrowing, this will return NaN.`,
Help: "The total number of admitted workloads per cohort's subtree",
}, append([]string{"cohort", "priority_class", "replica_role"}, extraLabels...),
)
CohortSubtreeAdmittedActiveWorkloads = trackGaugeVec(prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: constants.KueueName,
Name: "cohort_subtree_admitted_active_workloads",
Help: "The number of admitted Workloads that are active (unsuspended and not finished), per cohort's subtree",
}, append([]string{"cohort", "replica_role"}, extraLabels...),
))
}

func init() {
Expand Down Expand Up @@ -929,6 +940,7 @@ func ClearCohortMetrics(cohortName string) {
CohortSubtreeQuota.DeletePartialMatch(prometheus.Labels{"cohort": cohortName})
CohortWeightedShare.DeletePartialMatch(prometheus.Labels{"cohort": cohortName})
CohortSubtreeAdmittedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"cohort": cohortName})
CohortSubtreeAdmittedActiveWorkloads.DeletePartialMatch(prometheus.Labels{"cohort": cohortName})
}

func ReportClusterQueueStatus(cqName kueue.ClusterQueueReference, cqStatus ClusterQueueStatus, customLabelValues []string, tracker *roletracker.RoleTracker) {
Expand Down Expand Up @@ -1030,6 +1042,11 @@ func ReportCohortWeightedShare(cohort kueue.CohortReference, weightedShare float
CohortWeightedShare.WithLabelValues(labels...).Set(weightedShare)
}

func ReportCohortSubtreeAdmittedActiveWorkloads(cohort kueue.CohortReference, count int, customLabelValues []string, tracker *roletracker.RoleTracker) {
labels := append([]string{string(cohort), roletracker.GetRole(tracker)}, customLabelValues...)
CohortSubtreeAdmittedActiveWorkloads.WithLabelValues(labels...).Set(float64(count))
}

func ReportAdmittedActiveWorkloads(cqName kueue.ClusterQueueReference, count int, customLabelValues []string, tracker *roletracker.RoleTracker) {
labels := append([]string{string(cqName), roletracker.GetRole(tracker)}, customLabelValues...)
AdmittedActiveWorkloads.WithLabelValues(labels...).Set(float64(count))
Expand Down Expand Up @@ -1165,6 +1182,7 @@ func Register() {
CohortWeightedShare,
CohortSubtreeQuota,
CohortSubtreeAdmittedWorkloadsTotal,
CohortSubtreeAdmittedActiveWorkloads,
)
if features.Enabled(features.LocalQueueMetrics) {
RegisterLQMetrics()
Expand Down
1 change: 1 addition & 0 deletions site/content/en/docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The following metrics are available only if `LocalQueueMetrics` feature gate is
<!-- BEGIN GENERATED TABLE: cohort -->
| Metric name | Type | Description | Labels |
| --- | --- | --- | --- |
| `kueue_cohort_subtree_admitted_active_workloads` | Gauge | The number of admitted Workloads that are active (unsuspended and not finished), per cohort's subtree | `cohort`: the name of the Cohort<br> `replica_role`: one of `leader`, `follower`, or `standalone` |
| `kueue_cohort_subtree_admitted_workloads_total` | Counter | The total number of admitted workloads per cohort's subtree | `cohort`: the name of the Cohort<br> `priority_class`: the priority class name<br> `replica_role`: one of `leader`, `follower`, or `standalone` |
| `kueue_cohort_subtree_quota` | Gauge | Reports the cohort's nominal quota aggregated within the cohort's subtree. The values are reported per resource and flavor | `cohort`: the name of the Cohort<br> `flavor`: the resource flavor name<br> `resource`: the resource name<br> `replica_role`: one of `leader`, `follower`, or `standalone` |
| `kueue_cohort_weighted_share` | Gauge | Reports a value that representing the maximum of the ratios of usage above nominal<br>quota to the lendable resources in the Cohort, among all the resources provided by<br>the Cohort, and divided by the weight.<br>If zero, it means that the usage of the Cohort is below the nominal quota.<br>If the Cohort has a weight of zero and is borrowing, this will return NaN. | `cohort`: the name of the Cohort<br> `replica_role`: one of `leader`, `follower`, or `standalone` |
Expand Down