Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/xds/resolver/cluster_specifier_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, nil, nil)

stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)

Expand Down Expand Up @@ -171,7 +171,7 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, nil, nil)

// Wait for an update from the resolver, and verify the service config.
wantSC = `
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, nil, nil)

stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)

Expand Down Expand Up @@ -265,7 +265,7 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
ClusterSpecifierPluginName: "cspB",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, nil, nil)

// Wait for an update from the resolver, and verify the service config.
wantSC = `
Expand Down
23 changes: 4 additions & 19 deletions internal/xds/resolver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,36 +257,21 @@ func setupManagementServerForTest(t *testing.T, nodeID string) (*e2e.ManagementS
return mgmtServer, listenerResourceNamesCh, routeConfigResourceNamesCh, bootstrapContents
}

// Spins up an xDS management server and configures it with a default listener
// and route configuration resource. It also sets up an xDS bootstrap
// configuration file that points to the above management server.
func configureResourcesOnManagementServer(ctx context.Context, t *testing.T, mgmtServer *e2e.ManagementServer, nodeID string, listeners []*v3listenerpb.Listener, routes []*v3routepb.RouteConfiguration) {
// Updates all resources on the given management server.
func configureResources(ctx context.Context, t *testing.T, mgmtServer *e2e.ManagementServer, nodeID string, listeners []*v3listenerpb.Listener, routes []*v3routepb.RouteConfiguration, clusters []*v3clusterpb.Cluster, endpoints []*v3endpointpb.ClusterLoadAssignment) {
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: listeners,
Routes: routes,
Clusters: clusters,
Endpoints: endpoints,
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
}

// Updates all the listener, route, cluster and endpoint configuration resources
// on the given management server.
func configureAllResourcesOnManagementServer(ctx context.Context, t *testing.T, mgmtServer *e2e.ManagementServer, nodeID string, listeners []*v3listenerpb.Listener, routes []*v3routepb.RouteConfiguration, clusters []*v3clusterpb.Cluster, endpoints []*v3endpointpb.ClusterLoadAssignment) {
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: listeners,
Routes: routes,
Clusters: clusters,
Endpoints: endpoints,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
}

// waitForResourceNames waits for the wantNames to be pushed on to namesCh.
// Fails the test by calling t.Fatal if the context expires before that.
func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) {
Expand Down
6 changes: 3 additions & 3 deletions internal/xds/resolver/watch_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s) TestServiceWatch_ListenerPointsToNewRouteConfiguration(t *testing.T) {

// Update the management server with the new route configuration resource.
resources.Routes = append(resources.Routes, e2e.DefaultRouteConfig(newTestRouteConfigName, defaultTestServiceName, resources.Clusters[0].Name))
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, resources.Listeners, resources.Routes)
configureResources(ctx, t, mgmtServer, nodeID, resources.Listeners, resources.Routes, nil, nil)

// Ensure update from the resolver.
verifyUpdateFromResolver(ctx, t, stateCh, wantServiceConfig(resources.Clusters[0].Name))
Expand Down Expand Up @@ -169,15 +169,15 @@ func (s) TestServiceWatch_ListenerPointsToInlineRouteConfiguration(t *testing.T)
}},
}},
}}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, resources.Listeners, nil)
configureResources(ctx, t, mgmtServer, nodeID, resources.Listeners, nil, nil, nil)

// Verify that the old route configuration is not requested anymore.
waitForResourceNames(ctx, t, routeCfgCh, []string{})
verifyUpdateFromResolver(ctx, t, stateCh, wantServiceConfig(resources.Clusters[0].Name))

// Update listener back to contain a route configuration name.
resources.Listeners = []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, resources.Routes[0].Name)}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, resources.Listeners, resources.Routes)
configureResources(ctx, t, mgmtServer, nodeID, resources.Listeners, resources.Routes, nil, nil)

// Verify that that route configuration resource is requested.
waitForResourceNames(ctx, t, routeCfgCh, []string{resources.Routes[0].Name})
Expand Down
16 changes: 16 additions & 0 deletions internal/xds/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,21 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
}
r.logger = prefixLogger(r)
r.logger.Infof("Creating resolver for target: %+v", target)

dmSet := make(chan struct{})
// Schedule a callback that blocks until r.dm is set i.e xdsdepmgr.New()
// returns. This acts as a gatekeeper: even if dependency manager sends the
// updates before the xdsdepmgr.New() has a chance to return, they will be
// queued behind this blocker and processed only after initialization is
// complete.
r.serializer.TrySchedule(func(ctx context.Context) {
select {
case <-dmSet:
case <-ctx.Done():
}
})
r.dm = xdsdepmgr.New(r.ldsResourceName, opts.Authority, r.xdsClient, r)
close(dmSet)
return r, nil
}

Expand Down Expand Up @@ -329,6 +343,8 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool {
state := iresolver.SetConfigSelector(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig(string(sc)),
}, cs)
state = xdsresource.SetXDSConfig(state, r.xdsConfig)
state = xdsdepmgr.SetXDSClusterSubscriber(state, r.dm)
if err := r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient)); err != nil {
if r.logger.V(2) {
r.logger.Infof("Channel rejected new state: %+v with error: %v", state, err)
Expand Down
155 changes: 143 additions & 12 deletions internal/xds/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
xxhash "github.com/cespare/xxhash/v2"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc/codes"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcsync"
iresolver "google.golang.org/grpc/internal/resolver"
iringhash "google.golang.org/grpc/internal/ringhash"
"google.golang.org/grpc/internal/testutils"
Expand Down Expand Up @@ -235,7 +237,7 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) {
routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, nil, nil)

// Wait for a discovery request for a route configuration resource.
stateCh, _, r := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, contents)
Expand Down Expand Up @@ -303,7 +305,7 @@ func (s) TestNoMatchingVirtualHost(t *testing.T) {
listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
route.VirtualHosts = nil
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{listener}, []*v3routepb.RouteConfiguration{route})
configureResources(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{listener}, []*v3routepb.RouteConfiguration{route}, nil, nil)

// Build the resolver inline (duplicating buildResolverForTarget internals)
// to avoid issues with blocked channel writes when NACKs occur.
Expand Down Expand Up @@ -376,7 +378,7 @@ func (s) TestResolverBadServiceUpdate_NACKedWithoutCache(t *testing.T) {
}},
}},
}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil)
configureResources(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil, nil, nil)

// Build the resolver inline (duplicating buildResolverForTarget internals)
// to avoid issues with blocked channel writes when NACKs occur.
Expand Down Expand Up @@ -473,7 +475,7 @@ func (s) TestResolverBadServiceUpdate_NACKedWithCache(t *testing.T) {

// Since the resource is cached, it should be received as an ambient error
// and so the RPCs should continue passing.
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil)
configureResources(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil, nil, nil)

// "Make an RPC" by invoking the config selector which should succeed by
// continuing to use the previously cached resource.
Expand Down Expand Up @@ -563,7 +565,7 @@ func (s) TestResolverGoodServiceUpdate(t *testing.T) {
// route configuration resource, as specified by the test case.
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
routes := []*v3routepb.RouteConfiguration{tt.routeConfig}
configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes, tt.clusterConfig, tt.endpointConfig)
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, tt.clusterConfig, tt.endpointConfig)

stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)

Expand Down Expand Up @@ -632,7 +634,7 @@ func (s) TestResolverRequestHash(t *testing.T) {
}}
cluster := []*v3clusterpb.Cluster{e2e.DefaultCluster(defaultTestClusterName, defaultTestEndpointName, e2e.SecurityLevelNone)}
endpoints := []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(defaultTestEndpointName, defaultTestHostname, defaultTestPort)}
configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes, cluster, endpoints)
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, cluster, endpoints)

// Build the resolver and read the config selector out of it.
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)
Expand Down Expand Up @@ -738,7 +740,7 @@ waitForStateUpdate:
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()

configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, resources.Listeners, resources.Routes)
configureResources(ctx, t, mgmtServer, nodeID, resources.Listeners, resources.Routes, nil, nil)

select {
case state = <-stateCh:
Expand Down Expand Up @@ -945,7 +947,7 @@ func (s) TestResolverMaxStreamDuration(t *testing.T) {
e2e.DefaultEndpoint("endpoint_B", defaultTestHostname, defaultTestPort),
e2e.DefaultEndpoint("endpoint_C", defaultTestHostname, defaultTestPort),
}
configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes, cluster, endpoints)
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, cluster, endpoints)

// Read the update pushed by the resolver to the ClientConn.
cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
Expand Down Expand Up @@ -1121,7 +1123,7 @@ func (s) TestResolverMultipleLDSUpdates(t *testing.T) {
// Configure the management server with a listener resource, but no route
// configuration resource.
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil)
configureResources(ctx, t, mgmtServer, nodeID, listeners, nil, nil, nil)

stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)

Expand Down Expand Up @@ -1155,7 +1157,7 @@ func (s) TestResolverMultipleLDSUpdates(t *testing.T) {
}},
}},
}}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil)
configureResources(ctx, t, mgmtServer, nodeID, listeners, nil, nil, nil)

// Ensure that there is no update from the resolver.
verifyNoUpdateFromResolver(ctx, t, stateCh)
Expand Down Expand Up @@ -1194,7 +1196,7 @@ func (s) TestResolverWRR(t *testing.T) {
e2e.DefaultEndpoint("endpoint_A", defaultTestHostname, defaultTestPort),
e2e.DefaultEndpoint("endpoint_B", defaultTestHostname, defaultTestPort),
}
configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes, clusters, endpoints)
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, clusters, endpoints)

// Read the update pushed by the resolver to the ClientConn.
cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
Expand Down Expand Up @@ -1288,7 +1290,7 @@ func (s) TestConfigSelector_FailureCases(t *testing.T) {

// Update the management server with a listener resource that
// contains inline route configuration.
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{test.listener}, nil)
configureResources(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{test.listener}, nil, nil, nil)

// Ensure that the resolver pushes a state update to the channel.
cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
Expand Down Expand Up @@ -1455,3 +1457,132 @@ func (s) TestResolver_AutoHostRewrite(t *testing.T) {
})
}
}

// resourcesMatch returns true if the got slice matches resource names in want.
func resourcesMatch(got, want []string) bool {
diff := cmp.Diff(want, got, cmpopts.SortSlices(func(i, j string) bool { return i < j }))
return diff == ""
}

// TestResolverKeepWatchOpen_ActiveRPCs tests that the dependency manager keeps
// a cluster watch open when there are active RPCs using that cluster, even if
// the cluster is no longer referenced by the current route configuration.
func (s) TestResolverKeepWatchOpen_ActiveRPCs(t *testing.T) {
t.Skip("Will be enabled when all the watchers have shifted to dependency manager")
gotBothClusterRequest := grpcsync.NewEvent()
gotOnlySecondCluster := grpcsync.NewEvent()

// These are only accessed in the callback, which is executed serially.
seenBothClusters := false
seenSecondClusterOnly := false

clusterA := "cluster-A"
clusterB := "cluster-B"
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
if req.GetTypeUrl() != version.V3ClusterURL {
return nil
}
resourceNames := req.GetResourceNames()
if !seenBothClusters {
if resourcesMatch(resourceNames, []string{clusterA, clusterB}) {
seenBothClusters = true
gotBothClusterRequest.Fire()
}
return nil
}

if seenSecondClusterOnly {
return nil
}
if resourcesMatch(resourceNames, []string{clusterB}) {
seenSecondClusterOnly = true
gotOnlySecondCluster.Fire()
}
return nil
},
AllowResourceSubset: true,
})

nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)

// Configure initial resources: Route -> ClusterA.
route := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, clusterA)}
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
clusters := []*v3clusterpb.Cluster{
e2e.DefaultCluster(clusterA, "endpoint-A", e2e.SecurityLevelNone),
e2e.DefaultCluster(clusterB, "endpoint-B", e2e.SecurityLevelNone),
}
endpoints := []*v3endpointpb.ClusterLoadAssignment{
e2e.DefaultEndpoint("endpoint-A", "localhost", []uint32{8080}),
e2e.DefaultEndpoint("endpoint-B", "localhost", []uint32{8081}),
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

configureResources(ctx, t, mgmtServer, nodeID, listeners, route, clusters, endpoints)

stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)

cs := verifyUpdateFromResolver(ctx, t, stateCh, wantServiceConfig(clusterA))

// Start RPC (Ref Counts ClusterA).
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}

// Switch Configuration to ClusterB.
route[0] = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, clusterB)
configureResources(ctx, t, mgmtServer, nodeID, listeners, route, clusters, endpoints)

// Resolver should request BOTH A (due to active RPC) and B (due to new
// config).
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for updated CDS request including clusters A and B")
case <-gotBothClusterRequest.Done():
}

sctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()

select {
case <-sctx.Done():
case <-gotOnlySecondCluster.Done():
t.Fatalf("CDS request only included cluster B, expected both clusters")
}

// Verify Service Config has both clusters.
const wantServiceRaw = `{
"loadBalancingConfig": [{
"xds_cluster_manager_experimental": {
"children": {
"cluster:cluster-A": {
"childPolicy": [{"cds_experimental": {"cluster": "cluster-A"}}]
},
"cluster:cluster-B": {
"childPolicy": [{"cds_experimental": {"cluster": "cluster-B"}}]
}
}
}
}]
}`
verifyUpdateFromResolver(ctx, t, stateCh, wantServiceRaw)

// Finish RPC (Drops Ref to ClusterA).
res.OnCommitted()

// ONLY cluster B should be requested now that there are no references to
// cluster A.
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for updated CDS request including only cluster B")
case <-gotOnlySecondCluster.Done():
}

// ServiceConfig update should also contain only cluster B.
verifyUpdateFromResolver(ctx, t, stateCh, wantServiceConfig(clusterB))
}
Loading