Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 211 additions & 11 deletions clusterloader2/pkg/measurement/common/kube_state_metrics_measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const (
probeIntervalDefault = 30 * time.Second
ksmNamespace = "kube-state-metrics-perf-test"
ksmServiceName = "kube-state-metrics"
crsmLatencyName = "CustomResourceStateMetricsLatency"
crsmNamespace = "custom-resource-state-metrics-perf-test"
crsmServiceName = "custom-resource-state-metrics"
ksmSelfPort = 8081
ksmMetricsPort = 8080
)
Expand All @@ -53,10 +56,25 @@ type ksmLatencyMeasurement struct {
wg sync.WaitGroup
}

type crsmLatencyMeasurement struct {
ctx context.Context
cancel func()
isRunning bool
namespace string
serviceName string
metricsPort int
selfPort int
initialLatency *measurementutil.Histogram
wg sync.WaitGroup
}

func init() {
if err := measurement.Register(ksmLatencyName, CreateKSMLatencyMeasurement); err != nil {
klog.Fatalf("Cannot register %s: %v", ksmLatencyName, err)
}
if err := measurement.Register(crsmLatencyName, CreateCRSMLatencyMeasurement); err != nil {
klog.Fatalf("Cannot register %s: %v", crsmLatencyName, err)
}
}

// CreateKSMLatencyMeasurement creates a new Kube State
Expand All @@ -73,15 +91,24 @@ func CreateKSMLatencyMeasurement() measurement.Measurement {
}
}

// CreateCRSMLatencyMeasurement creates a new Custom Resource State Metrics Measurement.
func CreateCRSMLatencyMeasurement() measurement.Measurement {
ctx, cancel := context.WithCancel(context.Background())
return &crsmLatencyMeasurement{
namespace: crsmNamespace,
serviceName: crsmServiceName,
selfPort: ksmSelfPort,
metricsPort: ksmMetricsPort,
ctx: ctx,
cancel: cancel,
}
}

// Execute supports two actions:
// - start - starts goroutine and queries /metrics every probeIntervalDefault interval,
// it also collects initial latency metrics.
// - gather - gathers latency metrics and creates a latency summary.
func (m *ksmLatencyMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) {
if !config.CloudProvider.Features().SupportKubeStateMetrics {
klog.Infof("not executing KSMLatencyMeasurement: unsupported for provider, %s", config.ClusterFramework.GetClusterConfig().Provider.Name())
return nil, nil
}
action, err := util.GetString(config.Params, "action")
if err != nil {
return nil, err
Expand Down Expand Up @@ -110,6 +137,30 @@ func (m *ksmLatencyMeasurement) Execute(config *measurement.Config) ([]measureme
}
}

// Execute for crsmLatencyMeasurement
func (m *crsmLatencyMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) {
action, err := util.GetString(config.Params, "action")
if err != nil {
return nil, err
}
client := config.ClusterFramework.GetClientSets().GetClient()
switch action {
case "start":
if m.isRunning {
klog.V(2).Infof("%s: measurement already running", m)
return nil, nil
}
m.startQuerying(m.ctx, client, probeIntervalDefault)
m.initialLatency, err = m.retrieveCRSMLatencyMetrics(m.ctx, client)
return nil, err
case "gather":
defer m.cancel()
return m.createCRSMLatencySummary(m.ctx, client)
default:
return nil, fmt.Errorf("unknown action %v", action)
}
}

func (m *ksmLatencyMeasurement) stop() error {
if !m.isRunning {
return fmt.Errorf("%s: measurement was not running", m)
Expand All @@ -119,8 +170,6 @@ func (m *ksmLatencyMeasurement) stop() error {
return nil
}

// createKSMLatencyReport gathers the latency one last time and creates the summary based on the Quantile from the sub histograms.
// Afterwards it creates the Summary Report.
func (m *ksmLatencyMeasurement) createKSMLatencySummary(ctx context.Context, client clientset.Interface) ([]measurement.Summary, error) {
latestLatency, err := m.retrieveKSMLatencyMetrics(ctx, client)
if err != nil {
Expand All @@ -140,6 +189,12 @@ func (m *ksmLatencyMeasurement) createKSMLatencySummary(ctx context.Context, cli
if err != nil {
return nil, err
}

// Log the latency results for visibility
klog.Infof("%s: Latency Results - P50: %v, P90: %v, P99: %v",
m, result.Perc50, result.Perc90, result.Perc99)
klog.Infof("%s: Final latency summary: %s", m, content)

// Create Summary.
return []measurement.Summary{measurement.CreateSummary(ksmLatencyName, "json", content)}, nil
}
Expand All @@ -154,64 +209,209 @@ func (m *ksmLatencyMeasurement) startQuerying(ctx context.Context, client client

func (m *ksmLatencyMeasurement) queryLoop(ctx context.Context, client clientset.Interface, interval time.Duration) {
defer m.wg.Done()
queryCount := 0
for {
select {
case <-ctx.Done():
klog.V(2).Infof("%s: stopping query loop after %d queries", m, queryCount)
return
case <-time.After(interval):
var output string
queryCount++
klog.V(4).Infof("%s: executing query #%d", m, queryCount)
output, err := m.getMetricsFromService(ctx, client, m.metricsPort)
if err != nil {
klog.V(2).Infof("error during fetching metrics from service: %v", err)
klog.V(2).Infof("%s: error during fetching metrics from service (query #%d): %v", m, queryCount, err)
}
if output == "" {
klog.V(2).Infof("/metrics endpoint of kube-state-metrics returned no data in namespace: %s from service: %s port: %d", m.namespace, m.serviceName, m.metricsPort)
klog.V(2).Infof("%s: /metrics endpoint returned no data in namespace: %s from service: %s port: %d",
m, m.namespace, m.serviceName, m.metricsPort)
} else {
klog.V(4).Infof("%s: successfully fetched %d bytes from metrics endpoint (query #%d)",
m, len(output), queryCount)
}

}
}
}

func (m *ksmLatencyMeasurement) retrieveKSMLatencyMetrics(ctx context.Context, c clientset.Interface) (*measurementutil.Histogram, error) {
klog.V(4).Infof("%s: retrieving KSM latency metrics", m)
ksmHist := measurementutil.NewHistogram(nil)
output, err := m.getMetricsFromService(ctx, c, m.selfPort)
if err != nil {
klog.Errorf("%s: failed to get metrics from service: %v", m, err)
return ksmHist, err
}
samples, err := measurementutil.ExtractMetricSamples(output)
if err != nil {
klog.Errorf("%s: failed to extract metric samples: %v", m, err)
return ksmHist, err
}

sampleCount := 0
for _, sample := range samples {
switch sample.Metric[model.MetricNameLabel] {
case ksmRequestDurationMetricName:
measurementutil.ConvertSampleToHistogram(sample, ksmHist)
sampleCount++
}
}
klog.V(4).Infof("%s: processed %d histogram samples", m, sampleCount)
return ksmHist, nil
}

func (m *ksmLatencyMeasurement) getMetricsFromService(ctx context.Context, client clientset.Interface, port int) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

klog.V(4).Infof("%s: fetching metrics from %s/%s:%d", m, m.namespace, m.serviceName, port)

out, err := client.CoreV1().RESTClient().Get().
Resource("services").
SubResource("proxy").
Namespace(m.namespace).
Name(fmt.Sprintf("%v:%v", m.serviceName, port)).
Suffix("metrics").
Do(ctx).Raw()

if err != nil {
klog.V(2).Infof("%s: error fetching metrics from %s/%s:%d: %v", m, m.namespace, m.serviceName, port, err)
}

return string(out), err
}

// Dispose cleans up after the measurement.
func (m *ksmLatencyMeasurement) Dispose() {
if err := m.stop(); err != nil {
klog.V(2).Infof("error during dispose call: %v", err)
klog.V(2).Infof("%s: error during dispose call: %v", m, err)
}
}

// String returns string representation of this measurement.
func (m *ksmLatencyMeasurement) String() string {
return ksmLatencyName
}

func (m *crsmLatencyMeasurement) stop() error {
if !m.isRunning {
return fmt.Errorf("%s: measurement was not running", m)
}
m.cancel()
m.wg.Wait()
return nil
}

func (m *crsmLatencyMeasurement) createCRSMLatencySummary(ctx context.Context, client clientset.Interface) ([]measurement.Summary, error) {
latestLatency, err := m.retrieveCRSMLatencyMetrics(ctx, client)
if err != nil {
return nil, err
}
if err = m.stop(); err != nil {
return nil, err
}
finalLatency := HistogramSub(latestLatency, m.initialLatency)
result := &measurementutil.LatencyMetric{}
if err = SetQuantileFromHistogram(result, finalLatency); err != nil {
return nil, err
}
content, err := util.PrettyPrintJSON(result)
if err != nil {
return nil, err
}

// Log the latency results for visibility
klog.Infof("%s: Latency Results - P50: %v, P90: %v, P99: %v",
m, result.Perc50, result.Perc90, result.Perc99)
klog.Infof("%s: Final latency summary: %s", m, content)

return []measurement.Summary{measurement.CreateSummary(crsmLatencyName, "json", content)}, nil
}

func (m *crsmLatencyMeasurement) startQuerying(ctx context.Context, client clientset.Interface, interval time.Duration) {
m.isRunning = true
m.wg.Add(1)
go m.queryLoop(ctx, client, interval)
}

func (m *crsmLatencyMeasurement) queryLoop(ctx context.Context, client clientset.Interface, interval time.Duration) {
defer m.wg.Done()
queryCount := 0
for {
select {
case <-ctx.Done():
klog.V(2).Infof("%s: stopping query loop after %d queries", m, queryCount)
return
case <-time.After(interval):
queryCount++
klog.V(4).Infof("%s: executing query #%d", m, queryCount)
output, err := m.getMetricsFromService(ctx, client, m.metricsPort)
if err != nil {
klog.V(2).Infof("%s: error during fetching metrics from service (query #%d): %v", m, queryCount, err)
}
if output == "" {
klog.V(2).Infof("%s: /metrics endpoint returned no data in namespace: %s from service: %s port: %d",
m, m.namespace, m.serviceName, m.metricsPort)
} else {
klog.V(4).Infof("%s: successfully fetched %d bytes from metrics endpoint (query #%d)",
m, len(output), queryCount)
}
}
}
}

func (m *crsmLatencyMeasurement) retrieveCRSMLatencyMetrics(ctx context.Context, c clientset.Interface) (*measurementutil.Histogram, error) {
klog.V(4).Infof("%s: retrieving CRSM latency metrics", m)
crsmHist := measurementutil.NewHistogram(nil)
output, err := m.getMetricsFromService(ctx, c, m.selfPort)
if err != nil {
klog.Errorf("%s: failed to get metrics from service: %v", m, err)
return crsmHist, err
}
samples, err := measurementutil.ExtractMetricSamples(output)
if err != nil {
klog.Errorf("%s: failed to extract metric samples: %v", m, err)
return crsmHist, err
}

sampleCount := 0
for _, sample := range samples {
switch sample.Metric[model.MetricNameLabel] {
case ksmRequestDurationMetricName:
measurementutil.ConvertSampleToHistogram(sample, crsmHist)
sampleCount++
}
}
klog.V(4).Infof("%s: processed %d histogram samples", m, sampleCount)
return crsmHist, nil
}

func (m *crsmLatencyMeasurement) getMetricsFromService(ctx context.Context, client clientset.Interface, port int) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

klog.V(4).Infof("%s: fetching metrics from %s/%s:%d", m, m.namespace, m.serviceName, port)

out, err := client.CoreV1().RESTClient().Get().
Resource("services").
SubResource("proxy").
Namespace(m.namespace).
Name(fmt.Sprintf("%v:%v", m.serviceName, port)).
Suffix("metrics").
Do(ctx).Raw()

if err != nil {
klog.V(2).Infof("%s: error fetching metrics from %s/%s:%d: %v", m, m.namespace, m.serviceName, port, err)
}

return string(out), err
}

func (m *crsmLatencyMeasurement) Dispose() {
if err := m.stop(); err != nil {
klog.V(2).Infof("%s: error during dispose call: %v", m, err)
}
}

func (m *crsmLatencyMeasurement) String() string {
return crsmLatencyName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: custom-resource-state-metrics-perf-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: custom-resource-state-metrics
rules:
- apiGroups:
- ""
resources:
- namespaces
verbs:
- list
- watch
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: custom-resource-state-metrics
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: custom-resource-state-metrics
subjects:
- kind: ServiceAccount
name: custom-resource-state-metrics
namespace: custom-resource-state-metrics-perf-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: custom-resource-state-metrics-config
namespace: custom-resource-state-metrics-perf-test
data:
config.yaml: |
spec:
resources:
- groupVersion: v1
kind: Namespace
metricName: kube_namespace_labels
metricType: Gauge
valueFrom:
- "1"
labelsFrom:
- key: "*"
Loading