diff --git a/internal/xds/clients/config.go b/internal/xds/clients/config.go index f106465f646a..2cc4751f81ef 100644 --- a/internal/xds/clients/config.go +++ b/internal/xds/clients/config.go @@ -109,4 +109,23 @@ type MetricsReporter interface { // Each client will produce different metrics. Please see the client's // documentation for a list of possible metrics events. ReportMetric(metric any) + + // RegisterAsyncReporter registers a reporter to produce metric values for + // only the listed descriptors. The returned function must be called when + // the metrics are no longer needed, which will remove the reporter. + RegisterAsyncReporter(reporter AsyncReporter) func() +} + +// AsyncReporter is an interface for types that record metrics asynchronously. +// Implementations must be concurrent-safe. +type AsyncReporter interface { + // Report records metric values using the provided recorder. + Report(AsyncMetricsRecorder) error +} + +// AsyncMetricsRecorder is a recorder for async metrics. +type AsyncMetricsRecorder interface { + // ReportMetric reports a metric. The metric will be one of the predefined + // set of types in the metrics.go file. + ReportMetric(metric any) } diff --git a/internal/xds/clients/xdsclient/ads_stream.go b/internal/xds/clients/xdsclient/ads_stream.go index 24e66b834716..1c0cdb32eb88 100644 --- a/internal/xds/clients/xdsclient/ads_stream.go +++ b/internal/xds/clients/xdsclient/ads_stream.go @@ -715,3 +715,9 @@ func (fc *adsFlowControl) wait() bool { return fc.stopped } + +func (fc *adsFlowControl) isStopped() bool { + fc.mu.Lock() + defer fc.mu.Unlock() + return fc.stopped +} diff --git a/internal/xds/clients/xdsclient/metrics/metrics.go b/internal/xds/clients/xdsclient/metrics/metrics.go index 2e14efb1ae9b..f347a556e39f 100644 --- a/internal/xds/clients/xdsclient/metrics/metrics.go +++ b/internal/xds/clients/xdsclient/metrics/metrics.go @@ -40,3 +40,18 @@ type ResourceUpdateInvalid struct { type ServerFailure struct { ServerURI string } + +// XDSClientConnected reports the connectivity state of the xDS stream. +// Value is 1 if connected, 0 otherwise. +type XDSClientConnected struct { + ServerURI string + Value int64 +} + +// XDSClientResourceStats reports the number of resources currently cached. +type XDSClientResourceStats struct { + Authority string + ResourceType string + CacheState string + Count int64 +} diff --git a/internal/xds/clients/xdsclient/test/helpers_test.go b/internal/xds/clients/xdsclient/test/helpers_test.go index 1d6ac9ca7f4a..fc76d8466fc4 100644 --- a/internal/xds/clients/xdsclient/test/helpers_test.go +++ b/internal/xds/clients/xdsclient/test/helpers_test.go @@ -25,10 +25,12 @@ import ( "fmt" "strconv" "strings" + "sync" "testing" "time" "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/xds/clients" "google.golang.org/grpc/internal/xds/clients/internal/pretty" "google.golang.org/grpc/internal/xds/clients/internal/testutils" "google.golang.org/grpc/internal/xds/clients/xdsclient" @@ -279,12 +281,16 @@ func buildResourceName(typeName, auth, id string, ctxParams map[string]string) s // have taken place. type testMetricsReporter struct { metricsCh *testutils.Channel + + mu sync.Mutex + asyncReporters map[clients.AsyncReporter]struct{} } // newTestMetricsReporter returns a new testMetricsReporter. func newTestMetricsReporter() *testMetricsReporter { return &testMetricsReporter{ - metricsCh: testutils.NewChannelWithSize(1), + metricsCh: testutils.NewChannelWithSize(50), + asyncReporters: make(map[clients.AsyncReporter]struct{}), } } @@ -302,7 +308,40 @@ func (r *testMetricsReporter) waitForMetric(ctx context.Context, metricsDataWant return nil } +func (r *testMetricsReporter) waitForSpecificMetric(ctx context.Context, metricsDataWant any) error { + for { + got, err := r.metricsCh.Receive(ctx) + if err != nil { + return fmt.Errorf("timeout waiting for metric: %v (want %T)", err, metricsDataWant) + } + if diff := cmp.Diff(got, metricsDataWant); diff == "" { + return nil + } + // Continue if mismatch. + + } +} + // ReportMetric sends the metrics data to the metricsCh channel. func (r *testMetricsReporter) ReportMetric(m any) { - r.metricsCh.Replace(m) + r.metricsCh.Send(m) +} + +func (r *testMetricsReporter) RegisterAsyncReporter(reporter clients.AsyncReporter) func() { + r.mu.Lock() + defer r.mu.Unlock() + r.asyncReporters[reporter] = struct{}{} + return func() { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.asyncReporters, reporter) + } +} + +func (r *testMetricsReporter) triggerAsyncMetrics() { + r.mu.Lock() + defer r.mu.Unlock() + for reporter := range r.asyncReporters { + reporter.Report(r) + } } diff --git a/internal/xds/clients/xdsclient/test/metrics_test.go b/internal/xds/clients/xdsclient/test/metrics_test.go index 8332cc57faef..8bfae5506d34 100644 --- a/internal/xds/clients/xdsclient/test/metrics_test.go +++ b/internal/xds/clients/xdsclient/test/metrics_test.go @@ -22,6 +22,7 @@ import ( "context" "errors" "net" + "sync" "testing" "github.com/google/uuid" @@ -311,3 +312,314 @@ func (s) TestServerFailureMetrics_AfterResponseRecv(t *testing.T) { t.Fatal(err.Error()) } } + +func (s) TestConnectedMetric(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + tmr := newTestMetricsReporter() + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: l}) + nodeID := uuid.New().String() + + resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} + si := clients.ServerIdentifier{ + ServerURI: mgmtServer.Address, + Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, + } + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + xdsClientConfig := xdsclient.Config{ + Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}}, + Node: clients.Node{ID: nodeID}, + TransportBuilder: grpctransport.NewBuilder(configs), + ResourceTypes: resourceTypes, + Authorities: map[string]xdsclient.Authority{ + "": {XDSServers: []xdsclient.ServerConfig{}}, + }, + MetricsReporter: tmr, + } + client, err := xdsclient.New(xdsClientConfig) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Initial State: Not connected (until watch starts or channel created?). + // The client creates channels on demand. WatchResource triggers channel creation. + + // Watch a resource to trigger connection. + client.WatchResource(listenerType.TypeURL, "foo", noopListenerWatcher{}) + + // Wait for connection. + const listenerName = "test-listener-resource" + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, "route-config")}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server: %v", err) + } + client.WatchResource(listenerType.TypeURL, listenerName, noopListenerWatcher{}) + + // Wait for the update to ensure we are connected. + if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil { + t.Fatal(err.Error()) + } + + // Now trigger async metrics. + tmr.triggerAsyncMetrics() + if err := tmr.waitForSpecificMetric(ctx, &metrics.XDSClientConnected{ServerURI: mgmtServer.Address, Value: 1}); err != nil { + t.Fatal(err.Error()) + } + + // Stop server to force disconnect. + mgmtServer.Stop() + + // Wait a bit for disconnect to be detected. + // xDS client should detect it on stream failure. + // We can wait for ServerFailure metric (synchronous). + if err := tmr.waitForSpecificMetric(ctx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err != nil { + t.Fatal(err.Error()) + } +} + +func (s) TestResourceMetrics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + tmr := newTestMetricsReporter() + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: l}) + nodeID := uuid.New().String() + + resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} + si := clients.ServerIdentifier{ + ServerURI: mgmtServer.Address, + Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, + } + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + xdsClientConfig := xdsclient.Config{ + Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}}, + Node: clients.Node{ID: nodeID}, + TransportBuilder: grpctransport.NewBuilder(configs), + ResourceTypes: resourceTypes, + Authorities: map[string]xdsclient.Authority{ + "": {XDSServers: []xdsclient.ServerConfig{}}, + }, + MetricsReporter: tmr, + } + client, err := xdsclient.New(xdsClientConfig) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + const listenerName = "test-listener-resource" + const routeConfigName = "test-route-configuration-resource" + + // Requested state. + client.WatchResource(listenerType.TypeURL, listenerName, noopListenerWatcher{}) + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server: %v", err) + } + + // Wait for Valid update. + if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil { + t.Fatal(err.Error()) + } + + // Trigger async metrics. + tmr.triggerAsyncMetrics() + if err := tmr.waitForSpecificMetric(ctx, &metrics.XDSClientResourceStats{ + Authority: "", // Default authority + ResourceType: "ListenerResource", + CacheState: "acked", + Count: 1, + }); err != nil { + t.Fatal(err.Error()) + } + + // Nacked but cached. + // Update with bad resource. + resources.Listeners[0].ApiListener = nil + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server: %v", err) + } + + // Wait for Invalid update. + if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateInvalid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil { + t.Fatal(err.Error()) + } + + tmr.triggerAsyncMetrics() + if err := tmr.waitForSpecificMetric(ctx, &metrics.XDSClientResourceStats{ + Authority: "", + ResourceType: "ListenerResource", + CacheState: "nacked_but_cached", + Count: 1, + }); err != nil { + t.Fatal(err.Error()) + } +} + +// TestResourceMetrics_Extended tests multiple resources in different states: +// - 2 in "requested" state +// - 2 in "nacked" state +// - 1 in "does_not_exist" state +func (s) TestResourceMetrics_Extended(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + tmr := newTestMetricsReporter() + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: l}) + nodeID := uuid.New().String() + + resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} + si := clients.ServerIdentifier{ + ServerURI: mgmtServer.Address, + Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, + } + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + xdsClientConfig := xdsclient.Config{ + Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}}, + Node: clients.Node{ID: nodeID}, + TransportBuilder: grpctransport.NewBuilder(configs), + ResourceTypes: resourceTypes, + Authorities: map[string]xdsclient.Authority{ + "": {XDSServers: []xdsclient.ServerConfig{}}, + }, + MetricsReporter: tmr, + } + client, err := xdsclient.New(xdsClientConfig) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Resource names + resRequested1 := "res-requested-1" + resRequested2 := "res-requested-2" + resNacked1 := "res-nacked-1" + resNacked2 := "res-nacked-2" + resNotExist := "res-not-exist" + + resList := []string{resRequested1, resRequested2, resNacked1, resNacked2, resNotExist} + for _, res := range resList { + client.WatchResource(listenerType.TypeURL, res, noopListenerWatcher{}) + } + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{ + e2e.DefaultClientListener(resNacked1, "route-config"), + e2e.DefaultClientListener(resNacked2, "route-config"), + e2e.DefaultClientListener(resNotExist, "route-config"), + }, + SkipValidation: true, + } + // Make Nacked resources invalid + resources.Listeners[0].ApiListener = nil + resources.Listeners[1].ApiListener = nil + + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server: %v", err) + } + + // Wait for Res5 to be valid. + if err := tmr.waitForSpecificMetric(ctx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil { + t.Fatal(err.Error()) + } + resourcesEmpty := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resourcesEmpty); err != nil { + t.Fatalf("Failed to update management server: %v", err) + } + + // Verify "requested" count 2 + tmr.triggerAsyncMetrics() + if err := tmr.waitForSpecificMetric(ctx, &metrics.XDSClientResourceStats{ + Authority: "", + ResourceType: "ListenerResource", + CacheState: "requested", + Count: 2, + }); err != nil { + t.Fatalf("Failed to verify requested count: %v", err) + } + + // Verify "nacked" count 2 + tmr.triggerAsyncMetrics() + if err := tmr.waitForSpecificMetric(ctx, &metrics.XDSClientResourceStats{ + Authority: "", + ResourceType: "ListenerResource", + CacheState: "nacked", + Count: 2, + }); err != nil { + t.Fatalf("Failed to verify nacked count: %v", err) + } + + // Wait for the does_not_exist state. + // We need to wait for the client to process the empty update and mark the resource as removed. + // We use a watcher to detect the specific error. + removed := make(chan struct{}) + var closeOnce sync.Once + client.WatchResource(listenerType.TypeURL, resNotExist, &testWatcher{ + onError: func(err error) { + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + closeOnce.Do(func() { close(removed) }) + } + }, + }) + + // Verify "does_not_exist" count 1 + select { + case <-removed: + case <-ctx.Done(): + t.Fatal("timeout waiting for resource removal") + } + + tmr.triggerAsyncMetrics() + if err := tmr.waitForSpecificMetric(ctx, &metrics.XDSClientResourceStats{ + Authority: "", + ResourceType: "ListenerResource", + CacheState: "does_not_exist", + Count: 1, + }); err != nil { + t.Fatalf("Failed to verify does_not_exist count: %v", err) + } +} + +type testWatcher struct { + onError func(error) +} + +func (w *testWatcher) ResourceChanged(_ xdsclient.ResourceData, onDone func()) { onDone() } +func (w *testWatcher) ResourceError(err error, onDone func()) { + if w.onError != nil { + w.onError(err) + } + onDone() +} +func (w *testWatcher) AmbientError(_ error, onDone func()) { onDone() } diff --git a/internal/xds/clients/xdsclient/xdsclient.go b/internal/xds/clients/xdsclient/xdsclient.go index b1c6955484dc..67f365e1d647 100644 --- a/internal/xds/clients/xdsclient/xdsclient.go +++ b/internal/xds/clients/xdsclient/xdsclient.go @@ -95,6 +95,8 @@ type XDSClient struct { // Once all references to a channel are dropped, the channel is closed. channelsMu sync.Mutex xdsActiveChannels map[ServerConfig]*channelState // Map from server config to in-use xdsChannels. + + metricsCleanup func() } // New returns a new xDS Client configured with the provided config. @@ -114,6 +116,11 @@ func New(config Config) (*XDSClient, error) { if err != nil { return nil, err } + + // Register this client instance as an Async Reporter. + if client.metricsReporter != nil { + client.metricsCleanup = client.metricsReporter.RegisterAsyncReporter(client) + } return client, nil } @@ -171,6 +178,9 @@ func (c *XDSClient) Close() { if c.done.HasFired() { return } + if c.metricsCleanup != nil { + c.metricsCleanup() + } c.done.Fire() c.topLevelAuthority.close() @@ -441,3 +451,113 @@ func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName return a.resourceWatchStateForTesting(rType, resourceName) } + +// Report implements clients.AsyncReporter. +// This is the entry point invoked by the metrics system during a scrape. +func (c *XDSClient) Report(rec clients.AsyncMetricsRecorder) error { + c.reportConnectedState(rec) + c.reportResourceStats(rec) + return nil +} + +// reportConnectedState handles the "grpc.xds_client.connected" metric. +func (c *XDSClient) reportConnectedState(rec clients.AsyncMetricsRecorder) { + c.channelsMu.Lock() + defer c.channelsMu.Unlock() + + for _, cs := range c.xdsActiveChannels { + val := int64(0) + if cs.channel.ads != nil && cs.channel.ads.fc != nil && !cs.channel.ads.fc.isStopped() { + val = 1 + } + + rec.ReportMetric(&metrics.XDSClientConnected{ + ServerURI: cs.serverConfig.ServerIdentifier.ServerURI, + Value: val, + }) + } +} + +// reportResourceStats handles the "grpc.xds_client.resources" metric. +func (c *XDSClient) reportResourceStats(rec clients.AsyncMetricsRecorder) { + allAuthorities := make([]*authority, 0, len(c.authorities)+1) + if c.topLevelAuthority != nil { + allAuthorities = append(allAuthorities, c.topLevelAuthority) + } + for _, a := range c.authorities { + allAuthorities = append(allAuthorities, a) + } + + for _, a := range allAuthorities { + stats := a.resourceStats() + for typeURL, stateCounts := range stats { + for cacheState, count := range stateCounts { + if count > 0 { + rec.ReportMetric(&metrics.XDSClientResourceStats{ + Authority: a.name, + ResourceType: typeURL, + CacheState: cacheState, + Count: int64(count), + }) + } + } + } + } +} + +func (a *authority) resourceStats() map[string]map[string]int { + // Create a channel to receive the result + ret := make(chan map[string]map[string]int, 1) + + op := func(context.Context) { + // Map: ResourceType (String) -> CacheState (String) -> Count (Int) + summary := make(map[string]map[string]int) + for rType, resourceMap := range a.resources { + rName := rType.TypeName + if _, ok := summary[rName]; !ok { + summary[rName] = make(map[string]int) + } + for _, state := range resourceMap { + s := getCacheState(state) + summary[rName][s]++ + } + } + + ret <- summary + } + + // Schedule the operation. + // If the serializer is closed/context canceled, the second func (onFailure) runs. + a.xdsClientSerializer.ScheduleOr(op, func() { + ret <- nil + }) + + return <-ret +} + +// getCacheState determines the metrics label string for a given resource state. +func getCacheState(r *resourceState) string { + switch r.md.Status { + case xdsresource.ServiceStatusRequested: + return "requested" + + case xdsresource.ServiceStatusNotExist: + return "does_not_exist" + + case xdsresource.ServiceStatusACKed: + return "acked" + + case xdsresource.ServiceStatusNACKed: + // If the status is NACKed, it means the *latest* update failed. + // However, if 'r.cache' is not nil, it means we are still holding onto + // a previously ACKed version of the resource. + if r.cache != nil { + return "nacked_but_cached" + } + return "nacked" + + default: + // Fallback for initialization states + return "requested" + } +} diff --git a/internal/xds/xdsclient/clientimpl.go b/internal/xds/xdsclient/clientimpl.go index b20886470115..ab0256f0080d 100644 --- a/internal/xds/xdsclient/clientimpl.go +++ b/internal/xds/xdsclient/clientimpl.go @@ -75,6 +75,20 @@ var ( Labels: []string{"grpc.target", "grpc.xds.server"}, Default: false, }) + xdsClientConnectedMetric = estats.RegisterInt64AsyncGauge(estats.MetricDescriptor{ + Name: "grpc.xds_client.connected", + Description: "Experimental. 1 if the xDS stream is connected, 0 otherwise.", + Unit: "1", + Type: estats.MetricTypeIntAsyncGauge, + Labels: []string{"grpc.target", "grpc.xds.server"}, + }) + xdsClientResourcesMetric = estats.RegisterInt64AsyncGauge(estats.MetricDescriptor{ + Name: "grpc.xds_client.resources", + Description: "Experimental. Number of xDS resources currently cached.", + Unit: "{resource}", + Type: estats.MetricTypeIntAsyncGauge, + Labels: []string{"grpc.target", "grpc.xds.authority", "grpc.xds.cache_state", "grpc.xds.resource_type"}, + }) ) // clientImpl embed xdsclient.XDSClient and implement internal XDSClient @@ -263,3 +277,59 @@ func populateGRPCTransportConfigsFromServerConfig(sc *bootstrap.ServerConfig, gr } return nil } + +// RegisterAsyncReporter adapts the generic clients.AsyncReporter to the +// estats.AsyncMetricReporter interface and registers it. +func (mr *metricsReporter) RegisterAsyncReporter(reporter clients.AsyncReporter) func() { + if mr.recorder == nil { + return func() {} + } + + // Define which metrics we intend to report for OTel registration. + descriptors := []estats.AsyncMetric{ + xdsClientConnectedMetric, + xdsClientResourcesMetric, + } + + // Create the callback wrapper. + // This function is invoked by the stats system during a scrape. + cbWrapper := func(rec estats.AsyncMetricsRecorder) error { + wrapper := &asyncMetricsRecorderAdapter{ + target: mr.target, + delegate: rec, + } + reporter.Report(wrapper) + return nil + } + + // Register with the underlying gRPC stats recorder. + return mr.recorder.RegisterAsyncReporter(estats.AsyncMetricReporterFunc(cbWrapper), descriptors...) +} + +// asyncMetricsRecorderAdapter adapts estats.AsyncMetricsRecorder to clients.AsyncMetricsRecorder. +type asyncMetricsRecorderAdapter struct { + target string + delegate estats.AsyncMetricsRecorder +} + +func (a *asyncMetricsRecorderAdapter) ReportMetric(metric any) { + switch m := metric.(type) { + case *metrics.XDSClientConnected: + // Record: grpc.xds_client.connected + // Labels: grpc.target, grpc.xds.authority + a.delegate.RecordInt64AsyncGauge(xdsClientConnectedMetric, m.Value, + a.target, + m.ServerURI, + ) + + case *metrics.XDSClientResourceStats: + // Record: grpc.xds_client.resources + // Labels: grpc.target, grpc.xds.authority, grpc.xds.cache_state, grpc.xds.resource_type + a.delegate.RecordInt64AsyncGauge(xdsClientResourcesMetric, m.Count, + a.target, + m.Authority, + m.CacheState, + m.ResourceType, + ) + } +}