Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ constexpr absl::string_view ResponseTrailerLatencyUsField = "response_trailer_la
constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_call_status";
constexpr absl::string_view BytesSentField = "bytes_sent";
constexpr absl::string_view BytesReceivedField = "bytes_received";
constexpr absl::string_view GrpcStatusBeforeFirstCallField = "grpc_status_before_first_call";

absl::optional<ProcessingMode> initProcessingMode(const ExtProcPerRoute& config) {
if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) {
Expand Down Expand Up @@ -416,6 +417,8 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const {
static_cast<double>(bytes_sent_));
(*struct_msg->mutable_fields())[BytesReceivedField].set_number_value(
static_cast<double>(bytes_received_));
(*struct_msg->mutable_fields())[GrpcStatusBeforeFirstCallField].set_number_value(
static_cast<double>(static_cast<int>(grpc_status_before_first_call_)));
return struct_msg;
}

Expand Down Expand Up @@ -457,6 +460,7 @@ absl::optional<std::string> ExtProcLoggingInfo::serializeAsString() const {
}
parts.push_back(absl::StrCat("bs:", bytes_sent_));
parts.push_back(absl::StrCat("br:", bytes_received_));
parts.push_back(absl::StrCat("os:", static_cast<int>(grpc_status_before_first_call_)));

return absl::StrJoin(parts, ",");
}
Expand Down Expand Up @@ -520,6 +524,9 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const {
if (field_name == BytesReceivedField) {
return static_cast<int64_t>(bytes_received_);
}
if (field_name == GrpcStatusBeforeFirstCallField) {
return static_cast<int64_t>(grpc_status_before_first_call_);
}
return {};
}

Expand Down Expand Up @@ -639,8 +646,7 @@ void Filter::onError() {
} else {
// Return an error and stop processing the current stream.
processing_complete_ = true;
decoding_state_.onFinishProcessorCall(Grpc::Status::Aborted);
encoding_state_.onFinishProcessorCall(Grpc::Status::Aborted);
onFinishProcessorCalls(Grpc::Status::Aborted);
ImmediateResponse errorResponse;
errorResponse.mutable_status()->set_code(
static_cast<StatusCode>(static_cast<uint32_t>(config_->statusOnError())));
Expand Down Expand Up @@ -1911,25 +1917,35 @@ void Filter::onMessageTimeout() {
// Return an error and stop processing the current stream.
processing_complete_ = true;
closeStream();
decoding_state_.onFinishProcessorCall(Grpc::Status::DeadlineExceeded);
encoding_state_.onFinishProcessorCall(Grpc::Status::DeadlineExceeded);
onFinishProcessorCalls(Grpc::Status::DeadlineExceeded);
ImmediateResponse errorResponse;
errorResponse.mutable_status()->set_code(StatusCode::GatewayTimeout);
errorResponse.set_details(absl::StrFormat("%s_per-message_timeout_exceeded", ErrorPrefix));
sendImmediateResponse(errorResponse);
}
}

void Filter::recordGrpcStatusBeforeFirstCall(Grpc::Status::GrpcStatus call_status) {
if (!decoding_state_.getCallStartTime().has_value() &&
!encoding_state_.getCallStartTime().has_value()) {
if (loggingInfo() != nullptr) {
loggingInfo()->recordGrpcStatusBeforeFirstCall(call_status);
}
}
}

// Regardless of the current filter state, reset it to "IDLE", continue
// the current callback, and reset timers. This is used in a few error-handling situations.
void Filter::clearAsyncState(Grpc::Status::GrpcStatus call_status) {
recordGrpcStatusBeforeFirstCall(call_status);
decoding_state_.clearAsyncState(call_status);
encoding_state_.clearAsyncState(call_status);
}

// Regardless of the current state, ensure that the timers won't fire
// again.
void Filter::onFinishProcessorCalls(Grpc::Status::GrpcStatus call_status) {
recordGrpcStatusBeforeFirstCall(call_status);
decoding_state_.onFinishProcessorCall(call_status);
encoding_state_.onFinishProcessorCall(call_status);
}
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction);
void recordGrpcStatusBeforeFirstCall(Grpc::Status::GrpcStatus call_status) {
grpc_status_before_first_call_ = call_status;
}
Grpc::Status::GrpcStatus getGrpcStatusBeforeFirstCall() const {
return grpc_status_before_first_call_;
}

void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; }
void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; }
void setClusterInfo(absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info) {
Expand Down Expand Up @@ -144,6 +151,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
Upstream::HostDescriptionConstSharedPtr upstream_host_;
// The status details of the underlying HTTP/2 stream. Envoy gRPC only.
std::string http_response_code_details_;
// The gRPC status when the openStream() operation fails.
Grpc::Status::GrpcStatus grpc_status_before_first_call_ = Grpc::Status::Ok;
};

class ThreadLocalStreamManager;
Expand Down Expand Up @@ -548,6 +557,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void closeStream();
void halfCloseAndWaitForRemoteClose();

void recordGrpcStatusBeforeFirstCall(Grpc::Status::GrpcStatus call_status);
void onFinishProcessorCalls(Grpc::Status::GrpcStatus call_status);
void clearAsyncState(Grpc::Status::GrpcStatus call_status = Grpc::Status::Aborted);
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
virtual const Http::RequestOrResponseHeaderMap* responseHeaders() const PURE;
const Http::HeaderMap* responseTrailers() const { return trailers_; }

const absl::optional<MonotonicTime>& getCallStartTime() const { return call_start_time_; }
void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout,
CallbackState callback_state);
void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ using namespace std::chrono_literals;

INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDeferredProcessing, ExtProcIntegrationTest,
GRPC_CLIENT_INTEGRATION_PARAMS);

// Test the filter using the default configuration by connecting to
// an ext_proc server that responds to the request_headers message
// by immediately closing the stream.
Expand Down Expand Up @@ -4965,6 +4964,34 @@ TEST_P(ExtProcIntegrationTest, AccessLogExtProcInCompositeFilter) {
EXPECT_THAT(log_content, testing::HasSubstr("response_header_latency_us"));
}

TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoWithWrongCluster) {
if (!IsEnvoyGrpc()) {
GTEST_SKIP() << "Google gRPC stream open does not fail immediately with wrong ext_proc cluster";
}
auto access_log_path = TestEnvironment::temporaryPath("ext_proc_open_stream_wrong_cluster.log");
config_helper_.addConfigModifier([&](HttpConnectionManager& cm) {
auto* access_log = cm.add_access_log();
access_log->set_name("accesslog");
envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config;
access_log_config.set_path(access_log_path);
auto* json_format = access_log_config.mutable_log_format()->mutable_json_format();

(*json_format->mutable_fields())["field_grpc_status_before_first_call"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:grpc_status_before_first_call)%");
access_log->mutable_typed_config()->PackFrom(access_log_config);
});
ConfigOptions config_option = {};
config_option.valid_grpc_server = false;
initializeConfig(config_option);
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequest(absl::nullopt);
verifyDownstreamResponse(*response, 500);
std::string log_result = waitForAccessLog(access_log_path, 0, true);
auto json_log = Json::Factory::loadFromString(log_result).value();
auto field_request_header_status = json_log->getString("field_grpc_status_before_first_call");
EXPECT_NE(*field_request_header_status, "0");
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
65 changes: 51 additions & 14 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ static const std::string filter_config_name = "scooby.dooby.doo";

class HttpFilterTest : public testing::Test {
protected:
enum DoStartOption {
DEFAULT = 1,
ON_GRPC_ERROR = 2,
ON_GRPC_CLOSE = 3,
};
void initialize(std::string&& yaml, bool is_upstream_filter = false) {
scoped_runtime_.mergeValues(
{{"envoy.reloadable_features.ext_proc_stream_close_optimization", "true"}});
Expand Down Expand Up @@ -190,6 +195,16 @@ class HttpFilterTest : public testing::Test {
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Envoy::Http::AsyncClient::StreamOptions&,
Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&) {
if (do_start_option_ == ON_GRPC_ERROR) {
callbacks.onGrpcError(Grpc::Status::Internal, "foo");
return nullptr;
}

if (do_start_option_ == ON_GRPC_CLOSE) {
callbacks.onGrpcClose();
return nullptr;
}

if (final_expected_grpc_service_.has_value()) {
EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(),
config_with_hash_key.config()));
Expand Down Expand Up @@ -460,17 +475,16 @@ class HttpFilterTest : public testing::Test {
stream_callbacks_->onReceiveMessage(std::move(response));
}

const ExtProcLoggingInfo* getExtProcLoggingInfo() {
return stream_info_.filterState()
->getDataReadOnly<Envoy::Extensions::HttpFilters::ExternalProcessing::ExtProcLoggingInfo>(
filter_config_name);
}

// Get the gRPC call stats data from the filter state.
const ExtProcLoggingInfo::GrpcCalls&
getGrpcCalls(const envoy::config::core::v3::TrafficDirection traffic_direction) {
// The number of processor grpc calls made in the encoding and decoding path.
const ExtProcLoggingInfo::GrpcCalls& grpc_calls =
stream_info_.filterState()
->getDataReadOnly<
Envoy::Extensions::HttpFilters::ExternalProcessing::ExtProcLoggingInfo>(
filter_config_name)
->grpcCalls(traffic_direction);
return grpc_calls;
return getExtProcLoggingInfo()->grpcCalls(traffic_direction);
}

// Check gRPC call stats for headers and trailers.
Expand Down Expand Up @@ -631,12 +645,7 @@ class HttpFilterTest : public testing::Test {
// The metadata configured as part of ext_proc filter should be in the filter state.
// In addition, bytes sent/received should also be stored.
void expectFilterState(const Envoy::Protobuf::Struct& expected_metadata) {
const auto* filterState =
stream_info_.filterState()
->getDataReadOnly<
Envoy::Extensions::HttpFilters::ExternalProcessing::ExtProcLoggingInfo>(
filter_config_name);
const Envoy::Protobuf::Struct& loggedMetadata = filterState->filterMetadata();
const Envoy::Protobuf::Struct& loggedMetadata = getExtProcLoggingInfo()->filterMetadata();
EXPECT_THAT(loggedMetadata, ProtoEq(expected_metadata));
}

Expand Down Expand Up @@ -668,6 +677,7 @@ class HttpFilterTest : public testing::Test {
NiceMock<Server::Configuration::MockServerFactoryContext> factory_context_;
Extensions::Filters::Common::Expr::BuilderInstanceSharedConstPtr builder_;
TestScopedRuntime scoped_runtime_;
DoStartOption do_start_option_ = DEFAULT;
};

// Using the default configuration, test the filter with a processor that
Expand Down Expand Up @@ -1773,6 +1783,7 @@ TEST_F(HttpFilterTest, StreamingSendRequestDataGrpcFail) {
Unused) { modify_headers(immediate_response_headers); }));
server_closed_stream_ = true;
stream_callbacks_->onGrpcError(Grpc::Status::Internal, "error message");
EXPECT_EQ(Grpc::Status::Ok, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall());

// Sending another chunk of data. No more gRPC call.
EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, false));
Expand Down Expand Up @@ -5749,6 +5760,32 @@ TEST_F(HttpFilterTest, FilterMetadataOverridesClusterMetadata) {
filter_->onDestroy();
}

TEST_F(HttpFilterTest, GrpcErrorOnOpenStream) {
initialize(R"EOF(
grpc_service:
envoy_grpc:
cluster_name: "ext_proc_server"
)EOF");

do_start_option_ = ON_GRPC_ERROR;
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
filter_->onDestroy();
EXPECT_EQ(Grpc::Status::Internal, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall());
}

TEST_F(HttpFilterTest, GrpcCloseOnOpenStream) {
initialize(R"EOF(
grpc_service:
envoy_grpc:
cluster_name: "ext_proc_server"
)EOF");

do_start_option_ = ON_GRPC_CLOSE;
EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false));
filter_->onDestroy();
EXPECT_EQ(Grpc::Status::Aborted, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall());
}

} // namespace
} // namespace ExternalProcessing
} // namespace HttpFilters
Expand Down