From bcf1e588b47dcc57f34a6e80dd29e2a5a43c36bd Mon Sep 17 00:00:00 2001 From: Rishab87 Date: Fri, 1 Aug 2025 21:21:04 +0530 Subject: [PATCH 1/3] crsm tests --- .../common/kube_state_metrics_measurement.go | 237 +++++++++++++----- .../0namespace.yaml | 4 + .../clusterRole.yaml | 12 + .../clusterRoleBinding.yaml | 12 + .../configMap.yaml | 17 ++ .../deployment.yaml | 38 +++ .../serviceAccount.yaml | 5 + .../testing/load/namespace-labels.yaml | 10 + .../overrides/ksm-namespaces-override.yaml | 11 + .../custom-resource-state-metrics.yaml | 48 ++++ .../prometheus/ksm-namespaces-test.yaml | 51 ++++ 11 files changed, 388 insertions(+), 57 deletions(-) create mode 100644 clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/0namespace.yaml create mode 100644 clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/clusterRole.yaml create mode 100644 clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/clusterRoleBinding.yaml create mode 100644 clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/configMap.yaml create mode 100644 clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/deployment.yaml create mode 100644 clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/serviceAccount.yaml create mode 100644 clusterloader2/testing/load/namespace-labels.yaml create mode 100644 clusterloader2/testing/overrides/ksm-namespaces-override.yaml create mode 100644 clusterloader2/testing/prometheus/custom-resource-state-metrics.yaml create mode 100644 clusterloader2/testing/prometheus/ksm-namespaces-test.yaml diff --git a/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go b/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go index c9ce23f8c8..b7fd4626a0 100644 --- a/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go +++ b/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/common/model" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + ) const ( @@ -37,6 +38,9 @@ const ( probeIntervalDefault = 30 * time.Second ksmNamespace = "kube-state-metrics-perf-test" ksmServiceName = "kube-state-metrics" + crsmLatencyName = "CustomResourceStateMetricsLatency" + crsmNamespace = "custom-resource-state-metrics-perf-test" + crsmServiceName = "custom-resource-state-metrics" ksmSelfPort = 8081 ksmMetricsPort = 8080 ) @@ -53,10 +57,25 @@ type ksmLatencyMeasurement struct { wg sync.WaitGroup } +type crsmLatencyMeasurement struct { + ctx context.Context + cancel func() + isRunning bool + namespace string + serviceName string + metricsPort int + selfPort int + initialLatency *measurementutil.Histogram + wg sync.WaitGroup +} + func init() { if err := measurement.Register(ksmLatencyName, CreateKSMLatencyMeasurement); err != nil { klog.Fatalf("Cannot register %s: %v", ksmLatencyName, err) } + if err := measurement.Register(crsmLatencyName, CreateCRSMLatencyMeasurement); err != nil { + klog.Fatalf("Cannot register %s: %v", crsmLatencyName, err) + } } // CreateKSMLatencyMeasurement creates a new Kube State @@ -73,6 +92,19 @@ func CreateKSMLatencyMeasurement() measurement.Measurement { } } +// CreateCRSMLatencyMeasurement creates a new Custom Resource State Metrics Measurement. +func CreateCRSMLatencyMeasurement() measurement.Measurement { + ctx, cancel := context.WithCancel(context.Background()) + return &crsmLatencyMeasurement{ + namespace: crsmNamespace, + serviceName: crsmServiceName, + selfPort: ksmSelfPort, + metricsPort: ksmMetricsPort, + ctx: ctx, + cancel: cancel, + } +} + // Execute supports two actions: // - start - starts goroutine and queries /metrics every probeIntervalDefault interval, // it also collects initial latency metrics. @@ -100,35 +132,73 @@ func (m *ksmLatencyMeasurement) Execute(config *measurement.Config) ([]measureme // the scrape interval so we should cancel. m.startQuerying(m.ctx, client, probeIntervalDefault) // Retrieve initial latency when first call is done. - m.initialLatency, err = m.retrieveKSMLatencyMetrics(m.ctx, client) + m.initialLatency, err = m.retrieveLatencyMetrics(m.ctx, client) return nil, err case "gather": defer m.cancel() - return m.createKSMLatencySummary(m.ctx, client) + return m.createLatencySummary(m.ctx, client) default: return nil, fmt.Errorf("unknown action %v", action) } } -func (m *ksmLatencyMeasurement) stop() error { - if !m.isRunning { - return fmt.Errorf("%s: measurement was not running", m) +// Execute for crsmLatencyMeasurement +func (m *crsmLatencyMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) { + if !config.CloudProvider.Features().SupportKubeStateMetrics { + klog.Infof("not executing CRSMLatencyMeasurement: unsupported for provider, %s", config.ClusterFramework.GetClusterConfig().Provider.Name()) + return nil, nil } - m.cancel() - m.wg.Wait() - return nil -} - -// createKSMLatencyReport gathers the latency one last time and creates the summary based on the Quantile from the sub histograms. -// Afterwards it creates the Summary Report. -func (m *ksmLatencyMeasurement) createKSMLatencySummary(ctx context.Context, client clientset.Interface) ([]measurement.Summary, error) { - latestLatency, err := m.retrieveKSMLatencyMetrics(ctx, client) + action, err := util.GetString(config.Params, "action") if err != nil { return nil, err } - if err = m.stop(); err != nil { + client := config.ClusterFramework.GetClientSets().GetClient() + switch action { + case "start": + if m.isRunning { + klog.V(2).Infof("%s: measurement already running", m) + return nil, nil + } + m.startQuerying(m.ctx, client, probeIntervalDefault) + m.initialLatency, err = m.retrieveLatencyMetrics(m.ctx, client) + return nil, err + case "gather": + defer m.cancel() + return m.createLatencySummary(m.ctx, client) + default: + return nil, fmt.Errorf("unknown action %v", action) + } +} + +func getMetricsFromService(ctx context.Context, client clientset.Interface, namespace, serviceName string, port int) (string, error) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + out, err := client.CoreV1().RESTClient().Get(). + Resource("services"). + SubResource("proxy"). + Namespace(namespace). + Name(fmt.Sprintf("%v:%v", serviceName, port)). + Suffix("metrics"). + Do(ctx).Raw() + return string(out), err +} + +func (m *ksmLatencyMeasurement) stop() error { + if !m.isRunning { + return fmt.Errorf("%s: measurement was not running", m) + } + m.isRunning = false + m.cancel() + m.wg.Wait() + return nil +} + +func (m *ksmLatencyMeasurement) createLatencySummary(ctx context.Context, client clientset.Interface) ([]measurement.Summary, error) { + latestLatency, err := m.retrieveLatencyMetrics(ctx, client) + if err != nil { return nil, err } + m.stop() // We want to subtract the latest histogram from the first one we collect. finalLatency := HistogramSub(latestLatency, m.initialLatency) // Pretty Print the report. @@ -149,61 +219,39 @@ func (m *ksmLatencyMeasurement) createKSMLatencySummary(ctx context.Context, cli func (m *ksmLatencyMeasurement) startQuerying(ctx context.Context, client clientset.Interface, interval time.Duration) { m.isRunning = true m.wg.Add(1) - go m.queryLoop(ctx, client, interval) -} - -func (m *ksmLatencyMeasurement) queryLoop(ctx context.Context, client clientset.Interface, interval time.Duration) { - defer m.wg.Done() - for { - select { - case <-ctx.Done(): - return - case <-time.After(interval): - var output string - output, err := m.getMetricsFromService(ctx, client, m.metricsPort) - if err != nil { - klog.V(2).Infof("error during fetching metrics from service: %v", err) - } - if output == "" { - klog.V(2).Infof("/metrics endpoint of kube-state-metrics returned no data in namespace: %s from service: %s port: %d", m.namespace, m.serviceName, m.metricsPort) + go func() { + defer m.wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + _, err := getMetricsFromService(ctx, client, m.namespace, m.serviceName, m.metricsPort) + if err != nil { + klog.V(2).Infof("error during fetching metrics from service: %v", err) + } } - } - } + }() } -func (m *ksmLatencyMeasurement) retrieveKSMLatencyMetrics(ctx context.Context, c clientset.Interface) (*measurementutil.Histogram, error) { - ksmHist := measurementutil.NewHistogram(nil) - output, err := m.getMetricsFromService(ctx, c, m.selfPort) +func (m *ksmLatencyMeasurement) retrieveLatencyMetrics(ctx context.Context, c clientset.Interface) (*measurementutil.Histogram, error) { + hist := measurementutil.NewHistogram(nil) + output, err := getMetricsFromService(ctx, c, m.namespace, m.serviceName, m.selfPort) if err != nil { - return ksmHist, err + return hist, err } samples, err := measurementutil.ExtractMetricSamples(output) if err != nil { - return ksmHist, err + return hist, err } for _, sample := range samples { - switch sample.Metric[model.MetricNameLabel] { - case ksmRequestDurationMetricName: - measurementutil.ConvertSampleToHistogram(sample, ksmHist) + if sample.Metric[model.MetricNameLabel] == ksmRequestDurationMetricName { + measurementutil.ConvertSampleToHistogram(sample, hist) } } - return ksmHist, nil + return hist, nil } - -func (m *ksmLatencyMeasurement) getMetricsFromService(ctx context.Context, client clientset.Interface, port int) (string, error) { - ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() - out, err := client.CoreV1().RESTClient().Get(). - Resource("services"). - SubResource("proxy"). - Namespace(m.namespace). - Name(fmt.Sprintf("%v:%v", m.serviceName, port)). - Suffix("metrics"). - Do(ctx).Raw() - return string(out), err -} - // Dispose cleans up after the measurement. func (m *ksmLatencyMeasurement) Dispose() { if err := m.stop(); err != nil { @@ -215,3 +263,78 @@ func (m *ksmLatencyMeasurement) Dispose() { func (m *ksmLatencyMeasurement) String() string { return ksmLatencyName } + +func (m *crsmLatencyMeasurement) stop() error { + if !m.isRunning { + return fmt.Errorf("%s: measurement was not running", m) + } + m.isRunning = false + m.cancel() + m.wg.Wait() + return nil +} + +func (m *crsmLatencyMeasurement) createLatencySummary(ctx context.Context, client clientset.Interface) ([]measurement.Summary, error) { + latestLatency, err := m.retrieveLatencyMetrics(ctx, client) + if err != nil { + return nil, err + } + m.stop() + finalLatency := HistogramSub(latestLatency, m.initialLatency) + result := &measurementutil.LatencyMetric{} + if err = SetQuantileFromHistogram(result, finalLatency); err != nil { + return nil, err + } + content, err := util.PrettyPrintJSON(result) + if err != nil { + return nil, err + } + return []measurement.Summary{measurement.CreateSummary(crsmLatencyName, "json", content)}, nil +} + +func (m *crsmLatencyMeasurement) startQuerying(ctx context.Context, client clientset.Interface, interval time.Duration) { + m.isRunning = true + m.wg.Add(1) + go func() { + defer m.wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + _, err := getMetricsFromService(ctx, client, m.namespace, m.serviceName, m.metricsPort) + if err != nil { + klog.V(2).Infof("error during fetching metrics from service: %v", err) + } + } + } + }() +} + +func (m *crsmLatencyMeasurement) retrieveLatencyMetrics(ctx context.Context, c clientset.Interface) (*measurementutil.Histogram, error) { + hist := measurementutil.NewHistogram(nil) + output, err := getMetricsFromService(ctx, c, m.namespace, m.serviceName, m.selfPort) + if err != nil { + return hist, err + } + samples, err := measurementutil.ExtractMetricSamples(output) + if err != nil { + return hist, err + } + for _, sample := range samples { + if sample.Metric[model.MetricNameLabel] == ksmRequestDurationMetricName { + measurementutil.ConvertSampleToHistogram(sample, hist) + } + } + return hist, nil +} + +func (m *crsmLatencyMeasurement) Dispose() { + if err := m.stop(); err != nil { + klog.V(2).Infof("error during dispose call: %v", err) + } +} + +func (m *crsmLatencyMeasurement) String() string { + return crsmLatencyName +} \ No newline at end of file diff --git a/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/0namespace.yaml b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/0namespace.yaml new file mode 100644 index 0000000000..dd2568bda7 --- /dev/null +++ b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/0namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: custom-resource-state-metrics-perf-test \ No newline at end of file diff --git a/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/clusterRole.yaml b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/clusterRole.yaml new file mode 100644 index 0000000000..3e526b3ac9 --- /dev/null +++ b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/clusterRole.yaml @@ -0,0 +1,12 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: custom-resource-state-metrics +rules: +- apiGroups: + - "" + resources: + - namespaces + verbs: + - list + - watch \ No newline at end of file diff --git a/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/clusterRoleBinding.yaml b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/clusterRoleBinding.yaml new file mode 100644 index 0000000000..faf52cc1ff --- /dev/null +++ b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/clusterRoleBinding.yaml @@ -0,0 +1,12 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: custom-resource-state-metrics +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: custom-resource-state-metrics +subjects: +- kind: ServiceAccount + name: custom-resource-state-metrics + namespace: custom-resource-state-metrics-perf-test \ No newline at end of file diff --git a/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/configMap.yaml b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/configMap.yaml new file mode 100644 index 0000000000..3fa9812ce2 --- /dev/null +++ b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/configMap.yaml @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: custom-resource-state-metrics-config + namespace: custom-resource-state-metrics-perf-test +data: + config.yaml: | + spec: + resources: + - groupVersion: v1 + kind: Namespace + metricName: kube_namespace_labels + metricType: Gauge + valueFrom: + - "1" + labelsFrom: + - key: "*" \ No newline at end of file diff --git a/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/deployment.yaml b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/deployment.yaml new file mode 100644 index 0000000000..fb33145592 --- /dev/null +++ b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/deployment.yaml @@ -0,0 +1,38 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: custom-resource-state-metrics + namespace: custom-resource-state-metrics-perf-test + labels: + app.kubernetes.io/name: custom-resource-state-metrics +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: custom-resource-state-metrics + template: + metadata: + labels: + app.kubernetes.io/name: custom-resource-state-metrics + spec: + serviceAccountName: custom-resource-state-metrics + containers: + - name: kube-state-metrics + image: registry.k8s.io/kube-state-metrics/kube-state-metrics:v2.8.0 + args: + - --custom-resource-state-only + - --custom-resource-state-config=/etc/config/config.yaml + - --port=8080 + - --telemetry-port=8081 + ports: + - name: http-metrics + containerPort: 8080 + - name: telemetry + containerPort: 8081 + volumeMounts: + - name: config-volume + mountPath: /etc/config + volumes: + - name: config-volume + configMap: + name: custom-resource-state-metrics-config \ No newline at end of file diff --git a/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/serviceAccount.yaml b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/serviceAccount.yaml new file mode 100644 index 0000000000..bc9375da07 --- /dev/null +++ b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/serviceAccount.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: custom-resource-state-metrics + namespace: custom-resource-state-metrics-perf-test \ No newline at end of file diff --git a/clusterloader2/testing/load/namespace-labels.yaml b/clusterloader2/testing/load/namespace-labels.yaml new file mode 100644 index 0000000000..395ddacade --- /dev/null +++ b/clusterloader2/testing/load/namespace-labels.yaml @@ -0,0 +1,10 @@ +kind: Namespace +apiVersion: v1 +metadata: + name: {{.Name}} + labels: + "label-1": "value-1" + "label-2": "value-2" + "label-3": "value-3" + "label-4": "value-4" + "label-5": "value-5" \ No newline at end of file diff --git a/clusterloader2/testing/overrides/ksm-namespaces-override.yaml b/clusterloader2/testing/overrides/ksm-namespaces-override.yaml new file mode 100644 index 0000000000..d5a8f3c161 --- /dev/null +++ b/clusterloader2/testing/overrides/ksm-namespaces-override.yaml @@ -0,0 +1,11 @@ +- gvk: + kind: Deployment + version: apps/v1 + name: kube-state-metrics + namespace: kube-system + object_overrides: + - path: spec.template.spec.containers.0.args + value: + - --port=8080 + - --telemetry-port=8081 + - --resources=namespaces \ No newline at end of file diff --git a/clusterloader2/testing/prometheus/custom-resource-state-metrics.yaml b/clusterloader2/testing/prometheus/custom-resource-state-metrics.yaml new file mode 100644 index 0000000000..3f7854a6fa --- /dev/null +++ b/clusterloader2/testing/prometheus/custom-resource-state-metrics.yaml @@ -0,0 +1,48 @@ +name: custom-resource-state-metrics +tuningSets: +- name: load + qpsLoad: + qps: 10 +steps: +- name: crsm-test + phases: + - phaseName: crsm-deployment + tuningSet: load + objectTemplates: + - path: "pkg/prometheus/manifests/exporters/custom-resource-state-metrics/" + - phaseName: wait-for-crsm-deployment + tuningSet: load + deploymentRolloutTimeout: 5m + wait_for_controlled_pods: + - labelSelector: app.kubernetes.io/name=custom-resource-state-metrics + namespace: custom-resource-state-metrics-perf-test + - phaseName: start-crsm-latency-measurement + tuningSet: load + measurement: + method: CustomResourceStateMetricsLatency + params: + action: "start" + - phaseName: create-load-namespaces + tuningSet: load + objectTemplates: + - path: "testing/load/namespace-labels.yaml" + namespaceRange: + min: 1 + max: 500 + - phaseName: wait-for-measurement-to-run + tuningSet: load + sleepDuration: 5m + - phaseName: gather-crsm-latency-measurement + tuningSet: load + measurement: + method: CustomResourceStateMetricsLatency + params: + action: "gather" + - phaseName: delete-load-namespaces + tuningSet: load + objectTemplates: + - path: "testing/load/namespace-labels.yaml" + namespaceRange: + min: 1 + max: 500 + action: "delete" \ No newline at end of file diff --git a/clusterloader2/testing/prometheus/ksm-namespaces-test.yaml b/clusterloader2/testing/prometheus/ksm-namespaces-test.yaml new file mode 100644 index 0000000000..2dec503c8f --- /dev/null +++ b/clusterloader2/testing/prometheus/ksm-namespaces-test.yaml @@ -0,0 +1,51 @@ +name: native-ksm-namespaces-test +objectTemplates: +- path: "pkg/prometheus/manifests/default/" +testoverrides: +- path: "testing/overrides/ksm-namespaces-override.yaml" +tuningSets: +- name: load + qpsLoad: + qps: 10 +steps: +- name: load-test + phases: + - phaseName: wait-for-ksm-deployment + tuningSet: load + wait_for_controlled_pods: + - labelSelector: app.kubernetes.io/name=kube-state-metrics + namespace: kube-system + - phaseName: start-latency-measurement + tuningSet: load + measurement: + method: KubeStateMetricsLatency + params: + action: "start" + namespace: "kube-system" + serviceName: "kube-state-metrics" + - phaseName: create-load-namespaces + tuningSet: load + objectTemplates: + - path: "testing/load/namespace-labels.yaml" + namespaceRange: + min: 1 + max: 500 + - phaseName: wait-for-measurement-to-run + tuningSet: load + sleepDuration: 5m + - phaseName: gather-latency-measurement + tuningSet: load + measurement: + method: KubeStateMetricsLatency + params: + action: "gather" + namespace: "kube-system" + serviceName: "kube-state-metrics" + - phaseName: delete-load-namespaces + tuningSet: load + objectTemplates: + - path: "testing/load/namespace-labels.yaml" + namespaceRange: + min: 1 + max: 500 + action: "delete" \ No newline at end of file From 20c53412b2805d7d684f9fd3ded8b6863b8783ee Mon Sep 17 00:00:00 2001 From: Rishab87 Date: Fri, 1 Aug 2025 22:51:58 +0530 Subject: [PATCH 2/3] minor changes --- .../measurement/common/kube_state_metrics_measurement.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go b/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go index b7fd4626a0..2d8d2fafb0 100644 --- a/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go +++ b/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go @@ -110,10 +110,6 @@ func CreateCRSMLatencyMeasurement() measurement.Measurement { // it also collects initial latency metrics. // - gather - gathers latency metrics and creates a latency summary. func (m *ksmLatencyMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) { - if !config.CloudProvider.Features().SupportKubeStateMetrics { - klog.Infof("not executing KSMLatencyMeasurement: unsupported for provider, %s", config.ClusterFramework.GetClusterConfig().Provider.Name()) - return nil, nil - } action, err := util.GetString(config.Params, "action") if err != nil { return nil, err @@ -144,10 +140,6 @@ func (m *ksmLatencyMeasurement) Execute(config *measurement.Config) ([]measureme // Execute for crsmLatencyMeasurement func (m *crsmLatencyMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) { - if !config.CloudProvider.Features().SupportKubeStateMetrics { - klog.Infof("not executing CRSMLatencyMeasurement: unsupported for provider, %s", config.ClusterFramework.GetClusterConfig().Provider.Name()) - return nil, nil - } action, err := util.GetString(config.Params, "action") if err != nil { return nil, err From 7074194e6da70dd4df6fda9d73d6acd2df210fa3 Mon Sep 17 00:00:00 2001 From: Rishab87 Date: Fri, 1 Aug 2025 23:38:24 +0530 Subject: [PATCH 3/3] serivce monitor and fixes --- .../common/kube_state_metrics_measurement.go | 245 ++++++++++++------ .../serviceMonitor.yaml | 22 ++ .../custom-resource-state-metrics.yaml | 14 +- .../prometheus/ksm-namespaces-test.yaml | 14 +- 4 files changed, 201 insertions(+), 94 deletions(-) create mode 100644 clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/serviceMonitor.yaml diff --git a/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go b/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go index 2d8d2fafb0..4fe2b2832a 100644 --- a/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go +++ b/clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go @@ -29,7 +29,6 @@ import ( "github.com/prometheus/common/model" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - ) const ( @@ -128,11 +127,11 @@ func (m *ksmLatencyMeasurement) Execute(config *measurement.Config) ([]measureme // the scrape interval so we should cancel. m.startQuerying(m.ctx, client, probeIntervalDefault) // Retrieve initial latency when first call is done. - m.initialLatency, err = m.retrieveLatencyMetrics(m.ctx, client) + m.initialLatency, err = m.retrieveKSMLatencyMetrics(m.ctx, client) return nil, err case "gather": defer m.cancel() - return m.createLatencySummary(m.ctx, client) + return m.createKSMLatencySummary(m.ctx, client) default: return nil, fmt.Errorf("unknown action %v", action) } @@ -152,45 +151,33 @@ func (m *crsmLatencyMeasurement) Execute(config *measurement.Config) ([]measurem return nil, nil } m.startQuerying(m.ctx, client, probeIntervalDefault) - m.initialLatency, err = m.retrieveLatencyMetrics(m.ctx, client) + m.initialLatency, err = m.retrieveCRSMLatencyMetrics(m.ctx, client) return nil, err case "gather": defer m.cancel() - return m.createLatencySummary(m.ctx, client) + return m.createCRSMLatencySummary(m.ctx, client) default: return nil, fmt.Errorf("unknown action %v", action) } } -func getMetricsFromService(ctx context.Context, client clientset.Interface, namespace, serviceName string, port int) (string, error) { - ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() - out, err := client.CoreV1().RESTClient().Get(). - Resource("services"). - SubResource("proxy"). - Namespace(namespace). - Name(fmt.Sprintf("%v:%v", serviceName, port)). - Suffix("metrics"). - Do(ctx).Raw() - return string(out), err -} - func (m *ksmLatencyMeasurement) stop() error { - if !m.isRunning { - return fmt.Errorf("%s: measurement was not running", m) - } - m.isRunning = false - m.cancel() - m.wg.Wait() - return nil + if !m.isRunning { + return fmt.Errorf("%s: measurement was not running", m) + } + m.cancel() + m.wg.Wait() + return nil } -func (m *ksmLatencyMeasurement) createLatencySummary(ctx context.Context, client clientset.Interface) ([]measurement.Summary, error) { - latestLatency, err := m.retrieveLatencyMetrics(ctx, client) +func (m *ksmLatencyMeasurement) createKSMLatencySummary(ctx context.Context, client clientset.Interface) ([]measurement.Summary, error) { + latestLatency, err := m.retrieveKSMLatencyMetrics(ctx, client) if err != nil { return nil, err } - m.stop() + if err = m.stop(); err != nil { + return nil, err + } // We want to subtract the latest histogram from the first one we collect. finalLatency := HistogramSub(latestLatency, m.initialLatency) // Pretty Print the report. @@ -202,6 +189,12 @@ func (m *ksmLatencyMeasurement) createLatencySummary(ctx context.Context, client if err != nil { return nil, err } + + // Log the latency results for visibility + klog.Infof("%s: Latency Results - P50: %v, P90: %v, P99: %v", + m, result.Perc50, result.Perc90, result.Perc99) + klog.Infof("%s: Final latency summary: %s", m, content) + // Create Summary. return []measurement.Summary{measurement.CreateSummary(ksmLatencyName, "json", content)}, nil } @@ -211,43 +204,86 @@ func (m *ksmLatencyMeasurement) createLatencySummary(ctx context.Context, client func (m *ksmLatencyMeasurement) startQuerying(ctx context.Context, client clientset.Interface, interval time.Duration) { m.isRunning = true m.wg.Add(1) - go func() { - defer m.wg.Done() - for { - select { - case <-ctx.Done(): - return - case <-time.After(interval): - _, err := getMetricsFromService(ctx, client, m.namespace, m.serviceName, m.metricsPort) - if err != nil { - klog.V(2).Infof("error during fetching metrics from service: %v", err) - } + go m.queryLoop(ctx, client, interval) +} + +func (m *ksmLatencyMeasurement) queryLoop(ctx context.Context, client clientset.Interface, interval time.Duration) { + defer m.wg.Done() + queryCount := 0 + for { + select { + case <-ctx.Done(): + klog.V(2).Infof("%s: stopping query loop after %d queries", m, queryCount) + return + case <-time.After(interval): + queryCount++ + klog.V(4).Infof("%s: executing query #%d", m, queryCount) + output, err := m.getMetricsFromService(ctx, client, m.metricsPort) + if err != nil { + klog.V(2).Infof("%s: error during fetching metrics from service (query #%d): %v", m, queryCount, err) + } + if output == "" { + klog.V(2).Infof("%s: /metrics endpoint returned no data in namespace: %s from service: %s port: %d", + m, m.namespace, m.serviceName, m.metricsPort) + } else { + klog.V(4).Infof("%s: successfully fetched %d bytes from metrics endpoint (query #%d)", + m, len(output), queryCount) } } - }() + } } -func (m *ksmLatencyMeasurement) retrieveLatencyMetrics(ctx context.Context, c clientset.Interface) (*measurementutil.Histogram, error) { - hist := measurementutil.NewHistogram(nil) - output, err := getMetricsFromService(ctx, c, m.namespace, m.serviceName, m.selfPort) +func (m *ksmLatencyMeasurement) retrieveKSMLatencyMetrics(ctx context.Context, c clientset.Interface) (*measurementutil.Histogram, error) { + klog.V(4).Infof("%s: retrieving KSM latency metrics", m) + ksmHist := measurementutil.NewHistogram(nil) + output, err := m.getMetricsFromService(ctx, c, m.selfPort) if err != nil { - return hist, err + klog.Errorf("%s: failed to get metrics from service: %v", m, err) + return ksmHist, err } samples, err := measurementutil.ExtractMetricSamples(output) if err != nil { - return hist, err + klog.Errorf("%s: failed to extract metric samples: %v", m, err) + return ksmHist, err } + + sampleCount := 0 for _, sample := range samples { - if sample.Metric[model.MetricNameLabel] == ksmRequestDurationMetricName { - measurementutil.ConvertSampleToHistogram(sample, hist) + switch sample.Metric[model.MetricNameLabel] { + case ksmRequestDurationMetricName: + measurementutil.ConvertSampleToHistogram(sample, ksmHist) + sampleCount++ } } - return hist, nil + klog.V(4).Infof("%s: processed %d histogram samples", m, sampleCount) + return ksmHist, nil +} + +func (m *ksmLatencyMeasurement) getMetricsFromService(ctx context.Context, client clientset.Interface, port int) (string, error) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + klog.V(4).Infof("%s: fetching metrics from %s/%s:%d", m, m.namespace, m.serviceName, port) + + out, err := client.CoreV1().RESTClient().Get(). + Resource("services"). + SubResource("proxy"). + Namespace(m.namespace). + Name(fmt.Sprintf("%v:%v", m.serviceName, port)). + Suffix("metrics"). + Do(ctx).Raw() + + if err != nil { + klog.V(2).Infof("%s: error fetching metrics from %s/%s:%d: %v", m, m.namespace, m.serviceName, port, err) + } + + return string(out), err } + // Dispose cleans up after the measurement. func (m *ksmLatencyMeasurement) Dispose() { if err := m.stop(); err != nil { - klog.V(2).Infof("error during dispose call: %v", err) + klog.V(2).Infof("%s: error during dispose call: %v", m, err) } } @@ -257,21 +293,22 @@ func (m *ksmLatencyMeasurement) String() string { } func (m *crsmLatencyMeasurement) stop() error { - if !m.isRunning { - return fmt.Errorf("%s: measurement was not running", m) - } - m.isRunning = false - m.cancel() - m.wg.Wait() - return nil + if !m.isRunning { + return fmt.Errorf("%s: measurement was not running", m) + } + m.cancel() + m.wg.Wait() + return nil } -func (m *crsmLatencyMeasurement) createLatencySummary(ctx context.Context, client clientset.Interface) ([]measurement.Summary, error) { - latestLatency, err := m.retrieveLatencyMetrics(ctx, client) +func (m *crsmLatencyMeasurement) createCRSMLatencySummary(ctx context.Context, client clientset.Interface) ([]measurement.Summary, error) { + latestLatency, err := m.retrieveCRSMLatencyMetrics(ctx, client) if err != nil { return nil, err } - m.stop() + if err = m.stop(); err != nil { + return nil, err + } finalLatency := HistogramSub(latestLatency, m.initialLatency) result := &measurementutil.LatencyMetric{} if err = SetQuantileFromHistogram(result, finalLatency); err != nil { @@ -281,49 +318,97 @@ func (m *crsmLatencyMeasurement) createLatencySummary(ctx context.Context, clien if err != nil { return nil, err } + + // Log the latency results for visibility + klog.Infof("%s: Latency Results - P50: %v, P90: %v, P99: %v", + m, result.Perc50, result.Perc90, result.Perc99) + klog.Infof("%s: Final latency summary: %s", m, content) + return []measurement.Summary{measurement.CreateSummary(crsmLatencyName, "json", content)}, nil } func (m *crsmLatencyMeasurement) startQuerying(ctx context.Context, client clientset.Interface, interval time.Duration) { m.isRunning = true m.wg.Add(1) - go func() { - defer m.wg.Done() - for { - select { - case <-ctx.Done(): - return - case <-time.After(interval): - _, err := getMetricsFromService(ctx, client, m.namespace, m.serviceName, m.metricsPort) - if err != nil { - klog.V(2).Infof("error during fetching metrics from service: %v", err) - } + go m.queryLoop(ctx, client, interval) +} + +func (m *crsmLatencyMeasurement) queryLoop(ctx context.Context, client clientset.Interface, interval time.Duration) { + defer m.wg.Done() + queryCount := 0 + for { + select { + case <-ctx.Done(): + klog.V(2).Infof("%s: stopping query loop after %d queries", m, queryCount) + return + case <-time.After(interval): + queryCount++ + klog.V(4).Infof("%s: executing query #%d", m, queryCount) + output, err := m.getMetricsFromService(ctx, client, m.metricsPort) + if err != nil { + klog.V(2).Infof("%s: error during fetching metrics from service (query #%d): %v", m, queryCount, err) + } + if output == "" { + klog.V(2).Infof("%s: /metrics endpoint returned no data in namespace: %s from service: %s port: %d", + m, m.namespace, m.serviceName, m.metricsPort) + } else { + klog.V(4).Infof("%s: successfully fetched %d bytes from metrics endpoint (query #%d)", + m, len(output), queryCount) } } - }() + } } -func (m *crsmLatencyMeasurement) retrieveLatencyMetrics(ctx context.Context, c clientset.Interface) (*measurementutil.Histogram, error) { - hist := measurementutil.NewHistogram(nil) - output, err := getMetricsFromService(ctx, c, m.namespace, m.serviceName, m.selfPort) +func (m *crsmLatencyMeasurement) retrieveCRSMLatencyMetrics(ctx context.Context, c clientset.Interface) (*measurementutil.Histogram, error) { + klog.V(4).Infof("%s: retrieving CRSM latency metrics", m) + crsmHist := measurementutil.NewHistogram(nil) + output, err := m.getMetricsFromService(ctx, c, m.selfPort) if err != nil { - return hist, err + klog.Errorf("%s: failed to get metrics from service: %v", m, err) + return crsmHist, err } samples, err := measurementutil.ExtractMetricSamples(output) if err != nil { - return hist, err + klog.Errorf("%s: failed to extract metric samples: %v", m, err) + return crsmHist, err } + + sampleCount := 0 for _, sample := range samples { - if sample.Metric[model.MetricNameLabel] == ksmRequestDurationMetricName { - measurementutil.ConvertSampleToHistogram(sample, hist) + switch sample.Metric[model.MetricNameLabel] { + case ksmRequestDurationMetricName: + measurementutil.ConvertSampleToHistogram(sample, crsmHist) + sampleCount++ } } - return hist, nil + klog.V(4).Infof("%s: processed %d histogram samples", m, sampleCount) + return crsmHist, nil +} + +func (m *crsmLatencyMeasurement) getMetricsFromService(ctx context.Context, client clientset.Interface, port int) (string, error) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + klog.V(4).Infof("%s: fetching metrics from %s/%s:%d", m, m.namespace, m.serviceName, port) + + out, err := client.CoreV1().RESTClient().Get(). + Resource("services"). + SubResource("proxy"). + Namespace(m.namespace). + Name(fmt.Sprintf("%v:%v", m.serviceName, port)). + Suffix("metrics"). + Do(ctx).Raw() + + if err != nil { + klog.V(2).Infof("%s: error fetching metrics from %s/%s:%d: %v", m, m.namespace, m.serviceName, port, err) + } + + return string(out), err } func (m *crsmLatencyMeasurement) Dispose() { if err := m.stop(); err != nil { - klog.V(2).Infof("error during dispose call: %v", err) + klog.V(2).Infof("%s: error during dispose call: %v", m, err) } } diff --git a/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/serviceMonitor.yaml b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/serviceMonitor.yaml new file mode 100644 index 0000000000..276122703f --- /dev/null +++ b/clusterloader2/pkg/prometheus/manifests/exporters/custom-resource-state-metrics/serviceMonitor.yaml @@ -0,0 +1,22 @@ +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + labels: + app.kubernetes.io/name: kube-state-metrics + app.kubernetes.io/version: 2.13.0 + name: custom-resource-state-metrics-sm + namespace: custom-resource-state-metrics-perf-test +spec: + selector: + matchLabels: + app.kubernetes.io/name: custom-resource-state-metrics + namespaceSelector: + matchNames: + - custom-resource-state-metrics-perf-test + endpoints: + - port: telemetry + path: /metrics + interval: 30s + - port: http-metrics + path: /metrics + interval: 30s diff --git a/clusterloader2/testing/prometheus/custom-resource-state-metrics.yaml b/clusterloader2/testing/prometheus/custom-resource-state-metrics.yaml index 3f7854a6fa..22be19552d 100644 --- a/clusterloader2/testing/prometheus/custom-resource-state-metrics.yaml +++ b/clusterloader2/testing/prometheus/custom-resource-state-metrics.yaml @@ -7,39 +7,39 @@ steps: - name: crsm-test phases: - phaseName: crsm-deployment - tuningSet: load + tuningSet: load objectTemplates: - path: "pkg/prometheus/manifests/exporters/custom-resource-state-metrics/" - phaseName: wait-for-crsm-deployment - tuningSet: load + tuningSet: load deploymentRolloutTimeout: 5m wait_for_controlled_pods: - labelSelector: app.kubernetes.io/name=custom-resource-state-metrics namespace: custom-resource-state-metrics-perf-test - phaseName: start-crsm-latency-measurement - tuningSet: load + tuningSet: load measurement: method: CustomResourceStateMetricsLatency params: action: "start" - phaseName: create-load-namespaces - tuningSet: load + tuningSet: load objectTemplates: - path: "testing/load/namespace-labels.yaml" namespaceRange: min: 1 max: 500 - phaseName: wait-for-measurement-to-run - tuningSet: load + tuningSet: load sleepDuration: 5m - phaseName: gather-crsm-latency-measurement - tuningSet: load + tuningSet: load measurement: method: CustomResourceStateMetricsLatency params: action: "gather" - phaseName: delete-load-namespaces - tuningSet: load + tuningSet: load objectTemplates: - path: "testing/load/namespace-labels.yaml" namespaceRange: diff --git a/clusterloader2/testing/prometheus/ksm-namespaces-test.yaml b/clusterloader2/testing/prometheus/ksm-namespaces-test.yaml index 2dec503c8f..f82f977163 100644 --- a/clusterloader2/testing/prometheus/ksm-namespaces-test.yaml +++ b/clusterloader2/testing/prometheus/ksm-namespaces-test.yaml @@ -1,6 +1,6 @@ name: native-ksm-namespaces-test objectTemplates: -- path: "pkg/prometheus/manifests/default/" +- path: "pkg/prometheus/manifests/exporters/kube-state-metrics/" testoverrides: - path: "testing/overrides/ksm-namespaces-override.yaml" tuningSets: @@ -11,12 +11,12 @@ steps: - name: load-test phases: - phaseName: wait-for-ksm-deployment - tuningSet: load + tuningSet: load wait_for_controlled_pods: - labelSelector: app.kubernetes.io/name=kube-state-metrics namespace: kube-system - phaseName: start-latency-measurement - tuningSet: load + tuningSet: load measurement: method: KubeStateMetricsLatency params: @@ -24,17 +24,17 @@ steps: namespace: "kube-system" serviceName: "kube-state-metrics" - phaseName: create-load-namespaces - tuningSet: load + tuningSet: load objectTemplates: - path: "testing/load/namespace-labels.yaml" namespaceRange: min: 1 max: 500 - phaseName: wait-for-measurement-to-run - tuningSet: load + tuningSet: load sleepDuration: 5m - phaseName: gather-latency-measurement - tuningSet: load + tuningSet: load measurement: method: KubeStateMetricsLatency params: @@ -42,7 +42,7 @@ steps: namespace: "kube-system" serviceName: "kube-state-metrics" - phaseName: delete-load-namespaces - tuningSet: load + tuningSet: load objectTemplates: - path: "testing/load/namespace-labels.yaml" namespaceRange: