Skip to content

Commit d4159bf

Browse files
committed
add test and fix cache state
1 parent 8c685cc commit d4159bf

File tree

4 files changed

+184
-51
lines changed

4 files changed

+184
-51
lines changed

src/core/xds/grpc/xds_server_grpc.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ void GrpcXdsServer::JsonPostLoad(const Json& json, const JsonArgs& args,
135135
for (const Json& feature_json : array) {
136136
if (feature_json.type() == Json::Type::kString &&
137137
(feature_json.string() == kServerFeatureIgnoreResourceDeletion ||
138+
// FIXME: env var guard
138139
feature_json.string() ==
139140
kServerFeatureResourceTimerIsTransientFailure ||
140141
feature_json.string() == kServerFeatureTrustedXdsServer)) {

src/core/xds/xds_client/xds_client.cc

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -242,16 +242,17 @@ class XdsClient::XdsChannel::AdsCall final
242242
name_.authority, type_->type_url(), name_.key)
243243
<< "} from xds server";
244244
resource_seen_ = true;
245-
state.SetDoesNotExist();
246-
absl::Status status =
247-
ads_call_->xds_channel()->server_
248-
.ResourceTimerIsTransientFailure()
249-
? absl::UnavailableError(absl::StrCat(
250-
"xDS server ", ads_call_->xds_channel()->server_uri(),
251-
" not responding"))
252-
: absl::NotFoundError("does not exist");
245+
if (ads_call_->xds_channel()->server_
246+
.ResourceTimerIsTransientFailure()) {
247+
state.SetTransientError(absl::StrCat(
248+
"xDS server ", ads_call_->xds_channel()->server_uri(),
249+
" not responding"));
250+
} else {
251+
state.SetDoesNotExist();
252+
}
253253
ads_call_->xds_client()->NotifyWatchersOnResourceChanged(
254-
std::move(status), state.watchers(), ReadDelayHandle::NoWait());
254+
state.WatcherStatus(), state.watchers(),
255+
ReadDelayHandle::NoWait());
255256
}
256257
}
257258
ads_call_->xds_client()->work_serializer_.DrainQueue();
@@ -1019,20 +1020,18 @@ void XdsClient::XdsChannel::AdsCall::ParseResource(
10191020
resource_state.set_ignored_deletion(false);
10201021
}
10211022
// Update resource state based on whether the resource is valid.
1022-
absl::Status status = absl::InvalidArgumentError(
1023-
absl::StrCat("invalid resource: ", decode_status.ToString()));
10241023
if (!decode_status.ok()) {
1024+
resource_state.SetNacked(context->version, decode_status.message(),
1025+
context->update_time_);
10251026
if (!resource_state.HasResource()) {
1026-
xds_client()->NotifyWatchersOnResourceChanged(std::move(status),
1027-
resource_state.watchers(),
1028-
context->read_delay_handle);
1027+
xds_client()->NotifyWatchersOnResourceChanged(
1028+
resource_state.WatcherStatus(), resource_state.watchers(),
1029+
context->read_delay_handle);
10291030
} else {
1030-
xds_client()->NotifyWatchersOnAmbientError(std::move(status),
1031-
resource_state.watchers(),
1032-
context->read_delay_handle);
1031+
xds_client()->NotifyWatchersOnAmbientError(
1032+
resource_state.WatcherStatus(), resource_state.watchers(),
1033+
context->read_delay_handle);
10331034
}
1034-
resource_state.SetNacked(context->version, decode_status.ToString(),
1035-
context->update_time_);
10361035
++context->num_invalid_resources;
10371036
return;
10381037
}
@@ -1235,8 +1234,8 @@ void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) {
12351234
} else {
12361235
resource_state.SetDoesNotExist();
12371236
xds_client()->NotifyWatchersOnResourceChanged(
1238-
absl::NotFoundError("does not exist"),
1239-
resource_state.watchers(), context.read_delay_handle);
1237+
resource_state.WatcherStatus(), resource_state.watchers(),
1238+
context.read_delay_handle);
12401239
}
12411240
}
12421241
}
@@ -1338,7 +1337,7 @@ void XdsClient::ResourceState::SetAcked(
13381337
}
13391338

13401339
void XdsClient::ResourceState::SetNacked(const std::string& version,
1341-
const std::string& details,
1340+
absl::string_view details,
13421341
Timestamp update_time) {
13431342
client_status_ = ClientResourceStatus::NACKED;
13441343
failed_version_ = version;
@@ -1351,9 +1350,15 @@ void XdsClient::ResourceState::SetDoesNotExist() {
13511350
client_status_ = ClientResourceStatus::DOES_NOT_EXIST;
13521351
}
13531352

1353+
void XdsClient::ResourceState::SetTransientError(const std::string& details) {
1354+
client_status_ = ClientResourceStatus::TRANSIENT_ERROR;
1355+
failed_details_ = details;
1356+
}
1357+
13541358
absl::string_view XdsClient::ResourceState::CacheStateString() const {
13551359
switch (client_status_) {
13561360
case ClientResourceStatus::REQUESTED:
1361+
case ClientResourceStatus::TRANSIENT_ERROR:
13571362
return "requested";
13581363
case ClientResourceStatus::DOES_NOT_EXIST:
13591364
return "does_not_exist";
@@ -1365,6 +1370,20 @@ absl::string_view XdsClient::ResourceState::CacheStateString() const {
13651370
Crash("unknown resource state");
13661371
}
13671372

1373+
absl::Status XdsClient::ResourceState::WatcherStatus() const {
1374+
switch (client_status_) {
1375+
case ClientResourceStatus::TRANSIENT_ERROR:
1376+
return absl::UnavailableError(failed_details_);
1377+
case ClientResourceStatus::DOES_NOT_EXIST:
1378+
return absl::NotFoundError("does not exist");
1379+
case ClientResourceStatus::NACKED:
1380+
return absl::InvalidArgumentError(
1381+
absl::StrCat("invalid resource: ", failed_details_));
1382+
default:
1383+
return absl::OkStatus();
1384+
}
1385+
}
1386+
13681387
namespace {
13691388

13701389
google_protobuf_Timestamp* EncodeTimestamp(Timestamp value, upb_Arena* arena) {
@@ -1567,25 +1586,16 @@ void XdsClient::WatchResource(const XdsResourceType* type,
15671586
NotifyWatchersOnResourceChanged(resource_state.resource(), {watcher},
15681587
ReadDelayHandle::NoWait());
15691588
notified_watcher = true;
1570-
} else if (resource_state.client_status() ==
1571-
ResourceState::ClientResourceStatus::DOES_NOT_EXIST) {
1572-
GRPC_TRACE_LOG(xds_client, INFO)
1573-
<< "[xds_client " << this
1574-
<< "] reporting cached does-not-exist for " << name;
1575-
NotifyWatchersOnResourceChanged(absl::NotFoundError("does not exist"),
1576-
{watcher}, ReadDelayHandle::NoWait());
1577-
notified_watcher = true;
1578-
} else if (resource_state.client_status() ==
1579-
ResourceState::ClientResourceStatus::NACKED) {
1580-
GRPC_TRACE_LOG(xds_client, INFO)
1581-
<< "[xds_client " << this
1582-
<< "] reporting cached validation failure for " << name << ": "
1583-
<< resource_state.failed_details();
1584-
NotifyWatchersOnResourceChanged(
1585-
absl::InvalidArgumentError(absl::StrCat(
1586-
"invalid resource: ", resource_state.failed_details())),
1587-
{watcher}, ReadDelayHandle::NoWait());
1588-
notified_watcher = true;
1589+
} else {
1590+
absl::Status status = resource_state.WatcherStatus();
1591+
if (!status.ok()) {
1592+
GRPC_TRACE_LOG(xds_client, INFO)
1593+
<< "[xds_client " << this
1594+
<< "] reporting cached status for " << name << ": " << status;
1595+
NotifyWatchersOnResourceChanged(std::move(status),
1596+
{watcher}, ReadDelayHandle::NoWait());
1597+
notified_watcher = true;
1598+
}
15891599
}
15901600
}
15911601
// If the channel is not connected, report an error to the watcher.

src/core/xds/xds_client/xds_client.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
279279
ACKED,
280280
// Client received this resource and replied with NACK.
281281
NACKED,
282+
// Transient error talking to xDS server.
283+
TRANSIENT_ERROR,
282284
};
283285
static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_REQUESTED) ==
284286
ClientResourceStatus::REQUESTED,
@@ -306,9 +308,10 @@ class XdsClient : public DualRefCounted<XdsClient> {
306308
void SetAcked(std::shared_ptr<const XdsResourceType::ResourceData> resource,
307309
std::string serialized_proto, std::string version,
308310
Timestamp update_time);
309-
void SetNacked(const std::string& version, const std::string& details,
311+
void SetNacked(const std::string& version, absl::string_view details,
310312
Timestamp update_time);
311313
void SetDoesNotExist();
314+
void SetTransientError(const std::string& details);
312315

313316
void set_ignored_deletion(bool value) { ignored_deletion_ = value; }
314317
bool ignored_deletion() const { return ignored_deletion_; }
@@ -322,6 +325,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
322325
}
323326

324327
absl::string_view failed_details() const { return failed_details_; }
328+
absl::Status WatcherStatus() const;
325329

326330
void FillGenericXdsConfig(
327331
upb_StringView type_url, upb_StringView resource_name, upb_Arena* arena,
@@ -341,7 +345,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
341345
std::string version_;
342346
// The rejected version string of the last failed update attempt.
343347
std::string failed_version_;
344-
// Details about the last failed update attempt.
348+
// Details about the last failed update attempt or transient error.
345349
std::string failed_details_;
346350
// Timestamp of the last failed update attempt.
347351
Timestamp failed_update_time_;

test/core/xds/xds_client_test.cc

Lines changed: 126 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,19 @@ class XdsClientTest : public ::testing::Test {
130130
public:
131131
explicit FakeXdsServer(
132132
absl::string_view server_uri = kDefaultXdsServerUrl,
133-
bool ignore_resource_deletion = false)
133+
bool ignore_resource_deletion = false,
134+
bool resource_timer_is_transient_failure = false)
134135
: server_uri_(server_uri),
135-
ignore_resource_deletion_(ignore_resource_deletion) {}
136+
ignore_resource_deletion_(ignore_resource_deletion),
137+
resource_timer_is_transient_failure_(
138+
resource_timer_is_transient_failure) {}
139+
136140
const std::string& server_uri() const override { return server_uri_; }
137141
bool IgnoreResourceDeletion() const override {
138142
return ignore_resource_deletion_;
139143
}
140144
bool ResourceTimerIsTransientFailure() const override {
141-
return false; // FIXME
145+
return resource_timer_is_transient_failure_;
142146
}
143147
bool Equals(const XdsServer& other) const override {
144148
const auto& o = static_cast<const FakeXdsServer&>(other);
@@ -152,6 +156,7 @@ class XdsClientTest : public ::testing::Test {
152156
private:
153157
std::string server_uri_;
154158
bool ignore_resource_deletion_ = false;
159+
bool resource_timer_is_transient_failure_ = false;
155160
};
156161

157162
class FakeAuthority : public Authority {
@@ -1527,7 +1532,7 @@ TEST_F(XdsClientTest, ResourceValidationFailure) {
15271532
ASSERT_TRUE(error.has_value());
15281533
EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
15291534
EXPECT_EQ(error->message(),
1530-
"invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1535+
"invalid resource: errors validating JSON: "
15311536
"[field:value error:is not a number] (node ID:xds_client_test)")
15321537
<< *error;
15331538
// Check metric data.
@@ -1564,7 +1569,7 @@ TEST_F(XdsClientTest, ResourceValidationFailure) {
15641569
ASSERT_TRUE(error.has_value());
15651570
EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
15661571
EXPECT_EQ(error->message(),
1567-
"invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1572+
"invalid resource: errors validating JSON: "
15681573
"[field:value error:is not a number] (node ID:xds_client_test)")
15691574
<< *error;
15701575
// Now server sends an updated version of the resource.
@@ -1728,14 +1733,14 @@ TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) {
17281733
ASSERT_TRUE(error.has_value());
17291734
EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
17301735
EXPECT_EQ(error->message(),
1731-
"invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1736+
"invalid resource: errors validating JSON: "
17321737
"[field:value error:is not a number] (node ID:xds_client_test)")
17331738
<< *error;
17341739
error = watcher3->WaitForNextError();
17351740
ASSERT_TRUE(error.has_value());
17361741
EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
17371742
EXPECT_EQ(error->message(),
1738-
"invalid resource: INVALID_ARGUMENT: JSON parsing failed: "
1743+
"invalid resource: JSON parsing failed: "
17391744
"[JSON parse error at index 15] (node ID:xds_client_test)")
17401745
<< *error;
17411746
// It cannot delivery an error for foo2, because the client doesn't know
@@ -1874,7 +1879,7 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
18741879
ASSERT_TRUE(error.has_value());
18751880
EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
18761881
EXPECT_EQ(error->message(),
1877-
"invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1882+
"invalid resource: errors validating JSON: "
18781883
"[field:value error:is not a number] (node ID:xds_client_test)")
18791884
<< *error;
18801885
// Check metric data.
@@ -2635,6 +2640,14 @@ TEST_F(XdsClientTest, ConnectionFailsWithCachedResource) {
26352640
}
26362641

26372642
TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
2643+
event_engine_->SetRunAfterDurationCallback(
2644+
[&](grpc_event_engine::experimental::EventEngine::Duration duration) {
2645+
grpc_event_engine::experimental::EventEngine::Duration expected =
2646+
std::chrono::seconds(15);
2647+
EXPECT_EQ(duration, expected)
2648+
<< "Expected: " << expected.count()
2649+
<< "\nActual: " << duration.count();
2650+
});
26382651
InitXdsClient();
26392652
// Start a watch for "foo1".
26402653
auto watcher = StartFooWatch("foo1");
@@ -2720,6 +2733,111 @@ TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
27202733
EXPECT_TRUE(stream->IsOrphaned());
27212734
}
27222735

2736+
TEST_F(XdsClientTest, ResourceTimerIsTransientFailure) {
2737+
event_engine_->SetRunAfterDurationCallback(
2738+
[&](grpc_event_engine::experimental::EventEngine::Duration duration) {
2739+
grpc_event_engine::experimental::EventEngine::Duration expected =
2740+
std::chrono::seconds(30);
2741+
EXPECT_EQ(duration, expected)
2742+
<< "Expected: " << expected.count()
2743+
<< "\nActual: " << duration.count();
2744+
});
2745+
InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
2746+
{FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, false, true)}));
2747+
// Start a watch for "foo1".
2748+
auto watcher = StartFooWatch("foo1");
2749+
// Watcher should initially not see any resource reported.
2750+
EXPECT_FALSE(watcher->HasEvent());
2751+
// Check metric data.
2752+
EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2753+
::testing::ElementsAre());
2754+
EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2755+
::testing::ElementsAre());
2756+
EXPECT_THAT(GetResourceCounts(),
2757+
::testing::ElementsAre(::testing::Pair(
2758+
ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2759+
XdsFooResourceType::Get()->type_url(),
2760+
"requested"),
2761+
1)));
2762+
// XdsClient should have created an ADS stream.
2763+
auto stream = WaitForAdsStream();
2764+
ASSERT_TRUE(stream != nullptr);
2765+
// XdsClient should have sent a subscription request on the ADS stream.
2766+
auto request = WaitForRequest(stream.get());
2767+
ASSERT_TRUE(request.has_value());
2768+
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2769+
/*version_info=*/"", /*response_nonce=*/"",
2770+
/*error_detail=*/absl::OkStatus(),
2771+
/*resource_names=*/{"foo1"});
2772+
CheckRequestNode(*request); // Should be present on the first request.
2773+
// Do not send a response, but wait for the resource to be reported as
2774+
// not existing.
2775+
auto error = watcher->WaitForNextError();
2776+
ASSERT_TRUE(error.has_value());
2777+
EXPECT_EQ(error,
2778+
absl::UnavailableError(absl::StrCat(
2779+
"xDS server ", kDefaultXdsServerUrl,
2780+
" not responding (node ID:xds_client_test)")));
2781+
// Check metric data.
2782+
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2783+
::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
2784+
EXPECT_THAT(GetResourceCounts(),
2785+
::testing::ElementsAre(::testing::Pair(
2786+
ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2787+
XdsFooResourceType::Get()->type_url(),
2788+
"requested"),
2789+
1)));
2790+
// Start a new watcher for the same resource. It should immediately
2791+
// receive the same does-not-exist notification.
2792+
auto watcher2 = StartFooWatch("foo1");
2793+
error = watcher2->WaitForNextError();
2794+
ASSERT_TRUE(error.has_value());
2795+
EXPECT_EQ(error,
2796+
absl::UnavailableError(absl::StrCat(
2797+
"xDS server ", kDefaultXdsServerUrl,
2798+
" not responding (node ID:xds_client_test)")));
2799+
// Now server sends a response.
2800+
stream->SendMessageToClient(
2801+
ResponseBuilder(XdsFooResourceType::Get()->type_url())
2802+
.set_version_info("1")
2803+
.set_nonce("A")
2804+
.AddFooResource(XdsFooResource("foo1", 6))
2805+
.Serialize());
2806+
// XdsClient should have delivered the response to the watchers.
2807+
auto resource = watcher->WaitForNextResource();
2808+
ASSERT_NE(resource, nullptr);
2809+
EXPECT_EQ(resource->name, "foo1");
2810+
EXPECT_EQ(resource->value, 6);
2811+
resource = watcher2->WaitForNextResource();
2812+
ASSERT_NE(resource, nullptr);
2813+
EXPECT_EQ(resource->name, "foo1");
2814+
EXPECT_EQ(resource->value, 6);
2815+
// Check metric data.
2816+
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2817+
::testing::ElementsAre(::testing::Pair(
2818+
::testing::Pair(kDefaultXdsServerUrl,
2819+
XdsFooResourceType::Get()->type_url()),
2820+
1)),
2821+
::testing::ElementsAre(), ::testing::_));
2822+
EXPECT_THAT(
2823+
GetResourceCounts(),
2824+
::testing::ElementsAre(::testing::Pair(
2825+
ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2826+
XdsFooResourceType::Get()->type_url(), "acked"),
2827+
1)));
2828+
// XdsClient should have sent an ACK message to the xDS server.
2829+
request = WaitForRequest(stream.get());
2830+
ASSERT_TRUE(request.has_value());
2831+
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2832+
/*version_info=*/"1", /*response_nonce=*/"A",
2833+
/*error_detail=*/absl::OkStatus(),
2834+
/*resource_names=*/{"foo1"});
2835+
// Cancel watch.
2836+
CancelFooWatch(watcher.get(), "foo1");
2837+
CancelFooWatch(watcher2.get(), "foo1");
2838+
EXPECT_TRUE(stream->IsOrphaned());
2839+
}
2840+
27232841
TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
27242842
InitXdsClient();
27252843
// Metrics should initially be empty.

0 commit comments

Comments
 (0)