Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions internal/xds/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,23 @@ type MetricsReporter interface {
// Each client will produce different metrics. Please see the client's
// documentation for a list of possible metrics events.
ReportMetric(metric any)

// RegisterAsyncReporter registers a reporter to produce metric values for
// only the listed descriptors. The returned function must be called when
// the metrics are no longer needed, which will remove the reporter.
RegisterAsyncReporter(reporter AsyncReporter) func()
}

// AsyncReporter is an interface for types that record metrics asynchronously.
// Implementations must be concurrent-safe.
type AsyncReporter interface {
// Report records metric values using the provided recorder.
Report(AsyncMetricsRecorder) error
}

// AsyncMetricsRecorder is a recorder for async metrics.
type AsyncMetricsRecorder interface {
// ReportMetric reports a metric. The metric will be one of the predefined
// set of types in the metrics.go file.
ReportMetric(metric any)
}
6 changes: 6 additions & 0 deletions internal/xds/clients/xdsclient/ads_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,9 @@ func (fc *adsFlowControl) wait() bool {

return fc.stopped
}

func (fc *adsFlowControl) isStopped() bool {
fc.mu.Lock()
defer fc.mu.Unlock()
return fc.stopped
}
15 changes: 15 additions & 0 deletions internal/xds/clients/xdsclient/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,18 @@ type ResourceUpdateInvalid struct {
type ServerFailure struct {
ServerURI string
}

// XDSClientConnected reports the connectivity state of the xDS stream.
// Value is 1 if connected, 0 otherwise.
type XDSClientConnected struct {
ServerURI string
Value int64
}

// XDSClientResourceStats reports the number of resources currently cached.
type XDSClientResourceStats struct {
Authority string
ResourceType string
CacheState string
Count int64
}
43 changes: 41 additions & 2 deletions internal/xds/clients/xdsclient/test/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"

"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/internal/pretty"
"google.golang.org/grpc/internal/xds/clients/internal/testutils"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
Expand Down Expand Up @@ -279,12 +281,16 @@ func buildResourceName(typeName, auth, id string, ctxParams map[string]string) s
// have taken place.
type testMetricsReporter struct {
metricsCh *testutils.Channel

mu sync.Mutex
asyncReporters map[clients.AsyncReporter]struct{}
}

// newTestMetricsReporter returns a new testMetricsReporter.
func newTestMetricsReporter() *testMetricsReporter {
return &testMetricsReporter{
metricsCh: testutils.NewChannelWithSize(1),
metricsCh: testutils.NewChannelWithSize(50),
asyncReporters: make(map[clients.AsyncReporter]struct{}),
}
}

Expand All @@ -302,7 +308,40 @@ func (r *testMetricsReporter) waitForMetric(ctx context.Context, metricsDataWant
return nil
}

func (r *testMetricsReporter) waitForSpecificMetric(ctx context.Context, metricsDataWant any) error {
for {
got, err := r.metricsCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout waiting for metric: %v (want %T)", err, metricsDataWant)
}
if diff := cmp.Diff(got, metricsDataWant); diff == "" {
return nil
}
// Continue if mismatch.

}
}

// ReportMetric sends the metrics data to the metricsCh channel.
func (r *testMetricsReporter) ReportMetric(m any) {
r.metricsCh.Replace(m)
r.metricsCh.Send(m)
}

func (r *testMetricsReporter) RegisterAsyncReporter(reporter clients.AsyncReporter) func() {
r.mu.Lock()
defer r.mu.Unlock()
r.asyncReporters[reporter] = struct{}{}
return func() {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.asyncReporters, reporter)
}
}

func (r *testMetricsReporter) triggerAsyncMetrics() {
r.mu.Lock()
defer r.mu.Unlock()
for reporter := range r.asyncReporters {
reporter.Report(r)
}
}
Loading
Loading