Skip to content

Commit 6db4230

Browse files
committed
fix for headers responses data (+directions) + haproxy needed changes (pointers)
1 parent 1940e53 commit 6db4230

File tree

5 files changed

+61
-37
lines changed

5 files changed

+61
-37
lines changed

contrib/envoyproxy/go-control-plane/envoy.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,17 +185,21 @@ func (s *appsecEnvoyExternalProcessorServer) processMessage(ctx context.Context,
185185
switch v := req.Request.(type) {
186186
case *envoyextproc.ProcessingRequest_RequestHeaders:
187187
var (
188-
action message_processor.Action
189-
err error
188+
action message_processor.Action
189+
err error
190+
reqState *message_processor.RequestState
190191
)
191-
*currentRequest, action, err = s.messageProcessor.OnRequestHeaders(ctx, &requestHeadersEnvoy{v, s.config.Integration})
192+
reqState, action, err = s.messageProcessor.OnRequestHeaders(ctx, &requestHeadersEnvoy{v, s.config.Integration})
193+
if err == nil {
194+
*currentRequest = *reqState
195+
}
192196
return action, err
193197

194198
case *envoyextproc.ProcessingRequest_RequestBody:
195199
if !currentRequest.Ongoing {
196200
return message_processor.Action{}, status.Errorf(codes.InvalidArgument, "Received request body without request headers")
197201
}
198-
return s.messageProcessor.OnRequestBody(&requestBodyEnvoy{v}, *currentRequest)
202+
return s.messageProcessor.OnRequestBody(&requestBodyEnvoy{v}, currentRequest)
199203

200204
case *envoyextproc.ProcessingRequest_ResponseHeaders:
201205
if !currentRequest.Ongoing {
@@ -204,13 +208,13 @@ func (s *appsecEnvoyExternalProcessorServer) processMessage(ctx context.Context,
204208
" and can happen when a malformed request is sent to Envoy where the header Host is missing. See link to issue https://github.com/envoyproxy/envoy/issues/38022")
205209
return message_processor.Action{}, status.Errorf(codes.InvalidArgument, "Error processing response headers from ext_proc: can't process the response")
206210
}
207-
return s.messageProcessor.OnResponseHeaders(&responseHeadersEnvoy{v}, *currentRequest)
211+
return s.messageProcessor.OnResponseHeaders(&responseHeadersEnvoy{v}, currentRequest)
208212

209213
case *envoyextproc.ProcessingRequest_ResponseBody:
210214
if !currentRequest.Ongoing {
211215
return message_processor.Action{}, status.Errorf(codes.InvalidArgument, "Received response body without request context")
212216
}
213-
return s.messageProcessor.OnResponseBody(&responseBodyEnvoy{v}, *currentRequest)
217+
return s.messageProcessor.OnResponseBody(&responseBodyEnvoy{v}, currentRequest)
214218

215219
case *envoyextproc.ProcessingRequest_RequestTrailers:
216220
instr.Logger().Debug("external_processing: received unexpected message of type RequestTrailers, ignoring it. " +

contrib/envoyproxy/go-control-plane/envoy_utils.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,26 +49,34 @@ func buildImmediateResponse(data *message_processor.BlockResponseData) *envoyext
4949
}
5050
}
5151

52-
// buildHeadersResponse creates an Envoy HeadersResponse from provided data answering a RequestHeaders message
52+
// buildHeadersResponse creates an Envoy HeadersResponse from provided data answering a RequestHeaders or ResponseHeaders message
5353
func buildHeadersResponse(data *message_processor.HeadersResponseData) *envoyextproc.ProcessingResponse {
5454
var modeOverride *envoyextprocfilter.ProcessingMode
5555
if data.RequestBody {
5656
modeOverride = &envoyextprocfilter.ProcessingMode{RequestBodyMode: envoyextprocfilter.ProcessingMode_STREAMED}
5757
}
5858

59-
return &envoyextproc.ProcessingResponse{
60-
Response: &envoyextproc.ProcessingResponse_RequestHeaders{
61-
RequestHeaders: &envoyextproc.HeadersResponse{
62-
Response: &envoyextproc.CommonResponse{
63-
Status: envoyextproc.CommonResponse_CONTINUE,
64-
HeaderMutation: &envoyextproc.HeaderMutation{
65-
SetHeaders: convertHeadersToEnvoy(data.HeaderMutation),
66-
},
67-
},
59+
processingResponse := &envoyextproc.ProcessingResponse{ModeOverride: modeOverride}
60+
headersResponse := &envoyextproc.HeadersResponse{
61+
Response: &envoyextproc.CommonResponse{
62+
Status: envoyextproc.CommonResponse_CONTINUE,
63+
HeaderMutation: &envoyextproc.HeaderMutation{
64+
SetHeaders: convertHeadersToEnvoy(data.HeaderMutation),
6865
},
6966
},
70-
ModeOverride: modeOverride,
7167
}
68+
69+
if data.Direction == message_processor.DirectionRequest {
70+
processingResponse.Response = &envoyextproc.ProcessingResponse_RequestHeaders{
71+
RequestHeaders: headersResponse,
72+
}
73+
} else {
74+
processingResponse.Response = &envoyextproc.ProcessingResponse_ResponseHeaders{
75+
ResponseHeaders: headersResponse,
76+
}
77+
}
78+
79+
return processingResponse
7280
}
7381

7482
// convertHeadersToEnvoy converts standard HTTP headers to Envoy HeaderValueOption format

contrib/envoyproxy/go-control-plane/message_processor/action.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@ func newContinueAction() Action {
2020

2121
// newContinueActionWithResponseData creates an ActionTypeContinue with header mutations and requestBody flag.
2222
// The requestBody flag indicates whether the body should be requested from the proxy to the external processing service.
23-
func newContinueActionWithResponseData(mutations http.Header, requestBody bool) Action {
23+
func newContinueActionWithResponseData(mutations http.Header, requestBody bool, direction Direction) Action {
2424
return Action{
25-
Type: ActionTypeContinue,
26-
Response: &HeadersResponseData{HeaderMutation: mutations, RequestBody: requestBody},
25+
Type: ActionTypeContinue,
26+
Response: &HeadersResponseData{
27+
HeaderMutation: mutations,
28+
RequestBody: requestBody,
29+
Direction: direction,
30+
},
2731
}
2832
}
2933

@@ -46,10 +50,19 @@ func newFinishAction() Action {
4650
}
4751
}
4852

53+
// Direction indicates the direction of the message being processed.
54+
type Direction int
55+
56+
const (
57+
DirectionRequest Direction = iota // DirectionRequest indicates a request message.
58+
DirectionResponse // DirectionResponse indicates a response message.
59+
)
60+
4961
// HeadersResponseData is the data for a headers response.
5062
type HeadersResponseData struct {
5163
HeaderMutation http.Header
5264
RequestBody bool
65+
Direction Direction
5366
}
5467

5568
// BlockResponseData is the data for a blocking response.

contrib/envoyproxy/go-control-plane/message_processor/message_processor.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,17 @@ func NewMessageProcessor(config MessageProcessorConfig, instr *instrumentation.I
3535
}
3636

3737
// OnRequestHeaders handles incoming request headers
38-
func (mp *MessageProcessor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (RequestState, Action, error) {
38+
func (mp *MessageProcessor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (*RequestState, Action, error) {
3939
httpReq, err := req.NewRequest(ctx)
4040
if err != nil {
41-
return RequestState{}, Action{}, err
41+
return nil, Action{}, err
4242
}
4343

4444
componentName := req.Component(ctx)
4545
framework := req.Framework()
4646
reqState, blocked := newRequestState(httpReq, mp.instr, mp.config.BodyParsingSizeLimit, componentName, framework)
47-
if reqState.span == nil {
48-
reqState.Close()
49-
return RequestState{}, Action{}, fmt.Errorf("error getting span from context")
47+
if reqState == nil {
48+
return nil, Action{}, fmt.Errorf("error getting span from context")
5049
}
5150

5251
if !mp.config.BlockingUnavailable && blocked {
@@ -57,7 +56,7 @@ func (mp *MessageProcessor) OnRequestHeaders(ctx context.Context, req RequestHea
5756
headerMutation, err := reqState.PropagationHeaders()
5857
if err != nil {
5958
reqState.Close()
60-
return RequestState{}, Action{}, err
59+
return nil, Action{}, err
6160
}
6261

6362
// Determine if we instruct the proxy to send the body
@@ -68,11 +67,11 @@ func (mp *MessageProcessor) OnRequestHeaders(ctx context.Context, req RequestHea
6867
// Todo: Set telemetry body size (using content-length)
6968
}
7069

71-
return reqState, newContinueActionWithResponseData(headerMutation, requestBody), nil
70+
return reqState, newContinueActionWithResponseData(headerMutation, requestBody, DirectionRequest), nil
7271
}
7372

7473
// OnRequestBody handles incoming request body chunks
75-
func (mp *MessageProcessor) OnRequestBody(req RequestBody, reqState RequestState) (Action, error) {
74+
func (mp *MessageProcessor) OnRequestBody(req RequestBody, reqState *RequestState) (Action, error) {
7675
if mp.config.BodyParsingSizeLimit <= 0 || !reqState.AwaitingRequestBody {
7776
mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " +
7877
"Please refer to the official documentation for guidance on the proper settings or contact support.")
@@ -89,7 +88,7 @@ func (mp *MessageProcessor) OnRequestBody(req RequestBody, reqState RequestState
8988
}
9089

9190
// OnResponseHeaders handles incoming response headers
92-
func (mp *MessageProcessor) OnResponseHeaders(res ResponseHeaders, reqState RequestState) (Action, error) {
91+
func (mp *MessageProcessor) OnResponseHeaders(res ResponseHeaders, reqState *RequestState) (Action, error) {
9392
mp.instr.Logger().Debug("message_processor: received response headers")
9493
reqState.AwaitingRequestBody = false
9594

@@ -128,11 +127,11 @@ func (mp *MessageProcessor) OnResponseHeaders(res ResponseHeaders, reqState Requ
128127

129128
// Todo: Set telemetry body size (using content-length)
130129

131-
return newContinueActionWithResponseData(nil, true), nil
130+
return newContinueActionWithResponseData(nil, true, DirectionResponse), nil
132131
}
133132

134133
// OnResponseBody handles incoming response body chunks
135-
func (mp *MessageProcessor) OnResponseBody(res ResponseBody, reqState RequestState) (Action, error) {
134+
func (mp *MessageProcessor) OnResponseBody(res ResponseBody, reqState *RequestState) (Action, error) {
136135
mp.instr.Logger().Debug("message_processor: received response body: %v - EOS: %v\n", len(res.Body()), res.EndOfStream())
137136

138137
if mp.config.BodyParsingSizeLimit <= 0 || !reqState.AwaitingResponseBody {

contrib/envoyproxy/go-control-plane/message_processor/request_state.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ var _ io.Closer = (*RequestState)(nil)
2222
// RequestState manages the state of a single request through its lifecycle
2323
type RequestState struct {
2424
ctx context.Context
25-
span *tracer.Span
25+
Span *tracer.Span
2626
afterHandle func()
2727

2828
instr *instrumentation.Instrumentation
@@ -43,7 +43,7 @@ type RequestState struct {
4343
}
4444

4545
// newRequestState creates a new request state
46-
func newRequestState(request *http.Request, instr *instrumentation.Instrumentation, bodyLimit int, componentName string, framework string) (RequestState, bool) {
46+
func newRequestState(request *http.Request, instr *instrumentation.Instrumentation, bodyLimit int, componentName string, framework string) (*RequestState, bool) {
4747
fakeResponseWriter := newFakeResponseWriter()
4848
wrappedResponseWriter, spanRequest, afterHandle, blocked := httptrace.BeforeHandle(&httptrace.ServeConfig{
4949
Framework: framework,
@@ -56,7 +56,7 @@ func newRequestState(request *http.Request, instr *instrumentation.Instrumentati
5656

5757
span, ok := tracer.SpanFromContext(spanRequest.Context())
5858
if !ok {
59-
return RequestState{}, false
59+
return nil, false
6060
}
6161

6262
var requestBuffer *bodyBuffer
@@ -69,9 +69,9 @@ func newRequestState(request *http.Request, instr *instrumentation.Instrumentati
6969
responseBuffer = newBodyBuffer(bodyLimit)
7070
}
7171

72-
return RequestState{
72+
return &RequestState{
7373
ctx: spanRequest.Context(),
74-
span: span,
74+
Span: span,
7575
afterHandle: afterHandle,
7676
instr: instr,
7777
fakeResponseWriter: fakeResponseWriter,
@@ -85,7 +85,7 @@ func newRequestState(request *http.Request, instr *instrumentation.Instrumentati
8585
// PropagationHeaders creates header mutations for trace propagation
8686
func (rs *RequestState) PropagationHeaders() (http.Header, error) {
8787
newHeaders := make(http.Header)
88-
if err := tracer.Inject(rs.span.Context(), tracer.HTTPHeadersCarrier(newHeaders)); err != nil {
88+
if err := tracer.Inject(rs.Span.Context(), tracer.HTTPHeadersCarrier(newHeaders)); err != nil {
8989
return nil, err
9090
}
9191

0 commit comments

Comments
 (0)