From a287146334151268da755949f7fd21ee07ee8fac Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Tue, 25 Nov 2025 17:55:02 +0000 Subject: [PATCH 1/2] ext_proc: log gRPC status if openStram failed Signed-off-by: Yanjun Xiang --- .../filters/http/ext_proc/ext_proc.cc | 24 +++++-- .../filters/http/ext_proc/ext_proc.h | 10 +++ .../filters/http/ext_proc/processor_state.h | 1 + .../ext_proc/ext_proc_integration_test.cc | 31 +++++++- .../filters/http/ext_proc/filter_test.cc | 71 +++++++++++++++---- 5 files changed, 118 insertions(+), 19 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 517ca33c2f6db..a87c162867b75 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -73,6 +73,7 @@ constexpr absl::string_view ResponseTrailerLatencyUsField = "response_trailer_la constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_call_status"; constexpr absl::string_view BytesSentField = "bytes_sent"; constexpr absl::string_view BytesReceivedField = "bytes_received"; +constexpr absl::string_view GrpcStatusBeforeFirstCallField = "grpc_status_before_first_call"; absl::optional initProcessingMode(const ExtProcPerRoute& config) { if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) { @@ -416,6 +417,8 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const { static_cast(bytes_sent_)); (*struct_msg->mutable_fields())[BytesReceivedField].set_number_value( static_cast(bytes_received_)); + (*struct_msg->mutable_fields())[GrpcStatusBeforeFirstCallField].set_number_value( + static_cast(static_cast(grpc_status_before_first_call_))); return struct_msg; } @@ -457,6 +460,7 @@ absl::optional ExtProcLoggingInfo::serializeAsString() const { } parts.push_back(absl::StrCat("bs:", bytes_sent_)); parts.push_back(absl::StrCat("br:", bytes_received_)); + parts.push_back(absl::StrCat("os:", static_cast(grpc_status_before_first_call_))); return absl::StrJoin(parts, ","); } @@ -520,6 +524,9 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const { if (field_name == BytesReceivedField) { return static_cast(bytes_received_); } + if (field_name == GrpcStatusBeforeFirstCallField) { + return static_cast(grpc_status_before_first_call_); + } return {}; } @@ -639,8 +646,7 @@ void Filter::onError() { } else { // Return an error and stop processing the current stream. processing_complete_ = true; - decoding_state_.onFinishProcessorCall(Grpc::Status::Aborted); - encoding_state_.onFinishProcessorCall(Grpc::Status::Aborted); + onFinishProcessorCalls(Grpc::Status::Aborted); ImmediateResponse errorResponse; errorResponse.mutable_status()->set_code( static_cast(static_cast(config_->statusOnError()))); @@ -1911,8 +1917,7 @@ void Filter::onMessageTimeout() { // Return an error and stop processing the current stream. processing_complete_ = true; closeStream(); - decoding_state_.onFinishProcessorCall(Grpc::Status::DeadlineExceeded); - encoding_state_.onFinishProcessorCall(Grpc::Status::DeadlineExceeded); + onFinishProcessorCalls(Grpc::Status::DeadlineExceeded); ImmediateResponse errorResponse; errorResponse.mutable_status()->set_code(StatusCode::GatewayTimeout); errorResponse.set_details(absl::StrFormat("%s_per-message_timeout_exceeded", ErrorPrefix)); @@ -1920,9 +1925,19 @@ void Filter::onMessageTimeout() { } } +void Filter::recordGrpcStatusBeforeFirstCall(Grpc::Status::GrpcStatus call_status) { + if (!decoding_state_.getCallStartTime().has_value() && + !encoding_state_.getCallStartTime().has_value()) { + if (loggingInfo() != nullptr) { + loggingInfo()->recordGrpcStatusBeforeFirstCall(call_status); + } + } +} + // Regardless of the current filter state, reset it to "IDLE", continue // the current callback, and reset timers. This is used in a few error-handling situations. void Filter::clearAsyncState(Grpc::Status::GrpcStatus call_status) { + recordGrpcStatusBeforeFirstCall(call_status); decoding_state_.clearAsyncState(call_status); encoding_state_.clearAsyncState(call_status); } @@ -1930,6 +1945,7 @@ void Filter::clearAsyncState(Grpc::Status::GrpcStatus call_status) { // Regardless of the current state, ensure that the timers won't fire // again. void Filter::onFinishProcessorCalls(Grpc::Status::GrpcStatus call_status) { + recordGrpcStatusBeforeFirstCall(call_status); decoding_state_.onFinishProcessorCall(call_status); encoding_state_.onFinishProcessorCall(call_status); } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 4d3081daf1af5..28d7860fa40da 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -96,6 +96,13 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status, ProcessorState::CallbackState callback_state, envoy::config::core::v3::TrafficDirection traffic_direction); + void recordGrpcStatusBeforeFirstCall(Grpc::Status::GrpcStatus call_status) { + grpc_status_before_first_call_ = call_status; + } + Grpc::Status::GrpcStatus getGrpcStatusBeforeFirstCall() const { + return grpc_status_before_first_call_; + } + void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; } void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; } void setClusterInfo(absl::optional cluster_info) { @@ -144,6 +151,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { Upstream::HostDescriptionConstSharedPtr upstream_host_; // The status details of the underlying HTTP/2 stream. Envoy gRPC only. std::string http_response_code_details_; + // The gRPC status when the openStream() operation fails. + Grpc::Status::GrpcStatus grpc_status_before_first_call_ = Grpc::Status::Ok; }; class ThreadLocalStreamManager; @@ -548,6 +557,7 @@ class Filter : public Logger::Loggable, void closeStream(); void halfCloseAndWaitForRemoteClose(); + void recordGrpcStatusBeforeFirstCall(Grpc::Status::GrpcStatus call_status); void onFinishProcessorCalls(Grpc::Status::GrpcStatus call_status); void clearAsyncState(Grpc::Status::GrpcStatus call_status = Grpc::Status::Aborted); void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response); diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index f413cd5d6edb8..3f015b505a94b 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -166,6 +166,7 @@ class ProcessorState : public Logger::Loggable { virtual const Http::RequestOrResponseHeaderMap* responseHeaders() const PURE; const Http::HeaderMap* responseTrailers() const { return trailers_; } + const absl::optional& getCallStartTime() const { return call_start_time_; } void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout, CallbackState callback_state); void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 8ca1c458a3e2d..9b4eb04088be2 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -73,7 +73,6 @@ using namespace std::chrono_literals; INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDeferredProcessing, ExtProcIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); - // Test the filter using the default configuration by connecting to // an ext_proc server that responds to the request_headers message // by immediately closing the stream. @@ -4965,6 +4964,36 @@ TEST_P(ExtProcIntegrationTest, AccessLogExtProcInCompositeFilter) { EXPECT_THAT(log_content, testing::HasSubstr("response_header_latency_us")); } +TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoWithWrongCluster) { + if (!IsEnvoyGrpc()) { + GTEST_SKIP() << "Google gRPC stream open does not fail immediately with wrong ext_proc cluster"; + } + auto access_log_path = TestEnvironment::temporaryPath("ext_proc_open_stream_wrong_cluster.log"); + config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { + auto* access_log = cm.add_access_log(); + access_log->set_name("accesslog"); + envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config; + access_log_config.set_path(access_log_path); + auto* json_format = access_log_config.mutable_log_format()->mutable_json_format(); + + (*json_format->mutable_fields())["field_grpc_status_before_first_call"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:grpc_status_before_first_call)%"); + access_log->mutable_typed_config()->PackFrom(access_log_config); + }); + proto_config_.set_failure_mode_allow(false); + proto_config_.mutable_message_timeout()->set_nanos(200000000); + ConfigOptions config_option = {}; + config_option.valid_grpc_server = false; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + verifyDownstreamResponse(*response, 500); + std::string log_result = waitForAccessLog(access_log_path, 0, true); + auto json_log = Json::Factory::loadFromString(log_result).value(); + auto field_request_header_status = json_log->getString("field_grpc_status_before_first_call"); + EXPECT_EQ(*field_request_header_status, "14"); +} + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index c4efeb73df397..36a5552b3edee 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -97,6 +97,11 @@ static const std::string filter_config_name = "scooby.dooby.doo"; class HttpFilterTest : public testing::Test { protected: + enum DoStartOption { + DEFAULT = 1, + ON_GRPC_ERROR = 2, + ON_GRPC_CLOSE = 3, + }; void initialize(std::string&& yaml, bool is_upstream_filter = false) { scoped_runtime_.mergeValues( {{"envoy.reloadable_features.ext_proc_stream_close_optimization", "true"}}); @@ -190,6 +195,16 @@ class HttpFilterTest : public testing::Test { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&) { + if (do_start_option_ == ON_GRPC_ERROR) { + callbacks.onGrpcError(Grpc::Status::Internal, "foo"); + return nullptr; + } + + if (do_start_option_ == ON_GRPC_CLOSE) { + callbacks.onGrpcClose(); + return nullptr; + } + if (final_expected_grpc_service_.has_value()) { EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), config_with_hash_key.config())); @@ -460,17 +475,16 @@ class HttpFilterTest : public testing::Test { stream_callbacks_->onReceiveMessage(std::move(response)); } + const ExtProcLoggingInfo* getExtProcLoggingInfo() { + return stream_info_.filterState() + ->getDataReadOnly( + filter_config_name); + } + // Get the gRPC call stats data from the filter state. const ExtProcLoggingInfo::GrpcCalls& getGrpcCalls(const envoy::config::core::v3::TrafficDirection traffic_direction) { - // The number of processor grpc calls made in the encoding and decoding path. - const ExtProcLoggingInfo::GrpcCalls& grpc_calls = - stream_info_.filterState() - ->getDataReadOnly< - Envoy::Extensions::HttpFilters::ExternalProcessing::ExtProcLoggingInfo>( - filter_config_name) - ->grpcCalls(traffic_direction); - return grpc_calls; + return getExtProcLoggingInfo()->grpcCalls(traffic_direction); } // Check gRPC call stats for headers and trailers. @@ -631,12 +645,7 @@ class HttpFilterTest : public testing::Test { // The metadata configured as part of ext_proc filter should be in the filter state. // In addition, bytes sent/received should also be stored. void expectFilterState(const Envoy::Protobuf::Struct& expected_metadata) { - const auto* filterState = - stream_info_.filterState() - ->getDataReadOnly< - Envoy::Extensions::HttpFilters::ExternalProcessing::ExtProcLoggingInfo>( - filter_config_name); - const Envoy::Protobuf::Struct& loggedMetadata = filterState->filterMetadata(); + const Envoy::Protobuf::Struct& loggedMetadata = getExtProcLoggingInfo()->filterMetadata(); EXPECT_THAT(loggedMetadata, ProtoEq(expected_metadata)); } @@ -668,6 +677,7 @@ class HttpFilterTest : public testing::Test { NiceMock factory_context_; Extensions::Filters::Common::Expr::BuilderInstanceSharedConstPtr builder_; TestScopedRuntime scoped_runtime_; + DoStartOption do_start_option_ = DEFAULT; }; // Using the default configuration, test the filter with a processor that @@ -1773,6 +1783,7 @@ TEST_F(HttpFilterTest, StreamingSendRequestDataGrpcFail) { Unused) { modify_headers(immediate_response_headers); })); server_closed_stream_ = true; stream_callbacks_->onGrpcError(Grpc::Status::Internal, "error message"); + EXPECT_EQ(Grpc::Status::Ok, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall()); // Sending another chunk of data. No more gRPC call. EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, false)); @@ -5749,6 +5760,38 @@ TEST_F(HttpFilterTest, FilterMetadataOverridesClusterMetadata) { filter_->onDestroy(); } +TEST_F(HttpFilterTest, GrpcErrorOnOpenStream) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + do_start_option_ = ON_GRPC_ERROR; + + // Create synthetic HTTP request + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + filter_->onDestroy(); + EXPECT_EQ(Grpc::Status::Internal, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall()); +} + +TEST_F(HttpFilterTest, GrpcCloseOnOpenStream) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + do_start_option_ = ON_GRPC_CLOSE; + + // Create synthetic HTTP request + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + filter_->onDestroy(); + EXPECT_EQ(Grpc::Status::Aborted, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall()); +} + } // namespace } // namespace ExternalProcessing } // namespace HttpFilters From 0ecbe06483aa7790c9cc427f2804d90c39b60f18 Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Tue, 25 Nov 2025 18:58:51 +0000 Subject: [PATCH 2/2] tests Signed-off-by: Yanjun Xiang --- .../filters/http/ext_proc/ext_proc_integration_test.cc | 4 +--- test/extensions/filters/http/ext_proc/filter_test.cc | 6 ------ 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 9b4eb04088be2..c490868a30390 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -4980,8 +4980,6 @@ TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoWithWrongCluster) { "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:grpc_status_before_first_call)%"); access_log->mutable_typed_config()->PackFrom(access_log_config); }); - proto_config_.set_failure_mode_allow(false); - proto_config_.mutable_message_timeout()->set_nanos(200000000); ConfigOptions config_option = {}; config_option.valid_grpc_server = false; initializeConfig(config_option); @@ -4991,7 +4989,7 @@ TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoWithWrongCluster) { std::string log_result = waitForAccessLog(access_log_path, 0, true); auto json_log = Json::Factory::loadFromString(log_result).value(); auto field_request_header_status = json_log->getString("field_grpc_status_before_first_call"); - EXPECT_EQ(*field_request_header_status, "14"); + EXPECT_NE(*field_request_header_status, "0"); } } // namespace ExternalProcessing diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 36a5552b3edee..05dbaed4b1578 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -5768,10 +5768,7 @@ TEST_F(HttpFilterTest, GrpcErrorOnOpenStream) { )EOF"); do_start_option_ = ON_GRPC_ERROR; - - // Create synthetic HTTP request EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - test_time_->advanceTimeWait(std::chrono::microseconds(10)); filter_->onDestroy(); EXPECT_EQ(Grpc::Status::Internal, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall()); } @@ -5784,10 +5781,7 @@ TEST_F(HttpFilterTest, GrpcCloseOnOpenStream) { )EOF"); do_start_option_ = ON_GRPC_CLOSE; - - // Create synthetic HTTP request EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); - test_time_->advanceTimeWait(std::chrono::microseconds(10)); filter_->onDestroy(); EXPECT_EQ(Grpc::Status::Aborted, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall()); }