Skip to content

Commit 69a3144

Browse files
committed
added tracing headers injection + add test
1 parent fe048af commit 69a3144

File tree

2 files changed

+82
-17
lines changed

2 files changed

+82
-17
lines changed

contrib/haproxy/stream-processing-offload/haproxy_test.go

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func TestAppSec(t *testing.T) {
6969
handler, mt, cleanup := setup()
7070
defer cleanup()
7171

72-
_, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "dd-test-scanner-log-block"}, "GET", "/", 0)
72+
_, _, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "dd-test-scanner-log-block"}, "GET", "/", 0)
7373

7474
require.Equal(t, 403, blockedAct.statusCode)
7575
require.Equal(t, "application/json", blockedAct.headers["Content-Type"])
@@ -87,7 +87,7 @@ func TestAppSec(t *testing.T) {
8787
handler, mt, cleanup := setup()
8888
defer cleanup()
8989

90-
_, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Mistake Not..."}, "GET", "/hello?match=match-request-query", 0)
90+
_, _, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Mistake Not..."}, "GET", "/hello?match=match-request-query", 0)
9191

9292
require.Equal(t, 418, blockedAct.statusCode)
9393
require.Equal(t, "application/json", blockedAct.headers["Content-Type"])
@@ -105,7 +105,7 @@ func TestAppSec(t *testing.T) {
105105
handler, mt, cleanup := setup()
106106
defer cleanup()
107107

108-
_, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"Cookie": "foo=jdfoSDGFkivRG_234"}, "OPTIONS", "/", 0)
108+
_, _, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"Cookie": "foo=jdfoSDGFkivRG_234"}, "OPTIONS", "/", 0)
109109

110110
require.Equal(t, 418, blockedAct.statusCode)
111111
require.Equal(t, "application/json", blockedAct.headers["Content-Type"])
@@ -144,7 +144,7 @@ func TestAppSec(t *testing.T) {
144144
handler, mt, cleanup := setup()
145145
defer cleanup()
146146

147-
_, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Mistake not...", "X-Forwarded-For": "111.222.111.222"}, "GET", "/", 0)
147+
_, _, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Mistake not...", "X-Forwarded-For": "111.222.111.222"}, "GET", "/", 0)
148148

149149
// Handle the immediate response
150150
require.Equal(t, 403, blockedAct.statusCode)
@@ -230,7 +230,7 @@ func TestAppSecBodyParsingEnabled(t *testing.T) {
230230
handler, mt, cleanup := setup()
231231
defer cleanup()
232232

233-
spanId, requestedRequestBody, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, "GET", "/", 0)
233+
spanId, requestedRequestBody, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, "GET", "/", 0)
234234
require.False(t, requestedRequestBody)
235235
require.Nil(t, blockedAct)
236236

@@ -258,7 +258,7 @@ func TestAppSecBodyParsingEnabled(t *testing.T) {
258258
defer cleanup()
259259

260260
body := []byte(`{ "name": "$globals" }`)
261-
spanId, requestedRequestBody, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, "GET", "/", len(body))
261+
spanId, requestedRequestBody, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, "GET", "/", len(body))
262262
require.True(t, requestedRequestBody)
263263
require.Nil(t, blockedAct)
264264

@@ -335,7 +335,7 @@ func TestAppSecBodyParsingEnabled(t *testing.T) {
335335
}
336336
requestBody := fmt.Sprintf(`{ "name": "$globals", "text": "%s" }`, largeText)
337337

338-
spanId, bodyRequested, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, "GET", "/", len(requestBody))
338+
spanId, bodyRequested, _, blockedAct := sendProcessingRequestHeaders(t, handler, map[string]string{"User-Agent": "Chromium", "Content-Type": "application/json"}, "GET", "/", len(requestBody))
339339
require.True(t, bodyRequested)
340340
require.Nil(t, blockedAct)
341341

@@ -466,6 +466,32 @@ func TestGeneratedSpan(t *testing.T) {
466466
require.Equal(t, "00000000000000000000000000003039", span.Context().TraceID())
467467
require.Equal(t, uint64(67890), span.ParentID())
468468
})
469+
t.Run("span-with-propagation-headers-set", func(t *testing.T) {
470+
handler, mt, cleanup := setup()
471+
defer cleanup()
472+
473+
spanId, _, injectedHeaders, _ := sendProcessingRequestHeaders(t, handler, map[string]string{}, "GET", "/../../../resource-span/.?id=test", 0)
474+
475+
// Check for trace propagation headers injected
476+
require.Contains(t, injectedHeaders, "tracing_x_datadog_trace_id")
477+
require.Contains(t, injectedHeaders, "tracing_x_datadog_parent_id")
478+
require.Contains(t, injectedHeaders, "tracing_x_datadog_tags")
479+
480+
sendProcessingResponseHeaders(t, handler, nil, "200", spanId, 0)
481+
482+
finished := mt.FinishedSpans()
483+
require.Len(t, finished, 1)
484+
485+
// Check for tags
486+
span := finished[0]
487+
require.Equal(t, "http.request", span.OperationName())
488+
require.Equal(t, "https://datadoghq.com/../../../resource-span/.?id=test", span.Tag("http.url"))
489+
require.Equal(t, "GET", span.Tag("http.method"))
490+
require.Equal(t, "datadoghq.com", span.Tag("http.host"))
491+
require.Equal(t, "GET /resource-span", span.Tag("resource.name"))
492+
require.Equal(t, "server", span.Tag("span.kind"))
493+
require.Equal(t, "haproxy-spoa", span.Tag("component"))
494+
})
469495
}
470496

471497
func TestMalformedHAProxyProcessing(t *testing.T) {
@@ -509,7 +535,7 @@ func TestMalformedHAProxyProcessing(t *testing.T) {
509535
handler, mt, cleanup := setup()
510536
defer cleanup()
511537

512-
spanId, requestBody, blockedAct := sendProcessingRequestHeaders(t, handler, nil, "GET", "/%u002e/%ZZ/%tt/%uuuu/%uwu/%%", 0)
538+
spanId, requestBody, _, blockedAct := sendProcessingRequestHeaders(t, handler, nil, "GET", "/%u002e/%ZZ/%tt/%uuuu/%uwu/%%", 0)
513539
require.False(t, requestBody)
514540
require.Nil(t, blockedAct)
515541
require.Empty(t, spanId)
@@ -544,7 +570,7 @@ type haproxyAppsecRig struct {
544570

545571
// Helper functions
546572

547-
func sendProcessingRequestHeaders(t *testing.T, handler func(*request.Request), headers map[string]string, method string, path string, bodyLength int) (string, bool, *blockedAction) {
573+
func sendProcessingRequestHeaders(t *testing.T, handler func(*request.Request), headers map[string]string, method string, path string, bodyLength int) (string, bool, map[string]string, *blockedAction) {
548574
t.Helper()
549575

550576
if headers == nil {
@@ -596,20 +622,28 @@ func sendProcessingRequestHeaders(t *testing.T, handler func(*request.Request),
596622
blockedAct, err := createBlockedAction(pRequest.Actions)
597623
require.NoError(t, err)
598624
if blockedAct != nil {
599-
return "", false, blockedAct
625+
return "", false, nil, blockedAct
600626
}
601627

602628
spanId, err := findVar(pRequest.Actions, "span_id")
603629
if err != nil {
604-
return "", false, nil
630+
return "", false, nil, nil
605631
}
606632

607633
requestedBody, err := findVar(pRequest.Actions, "request_body")
608634
if err != nil {
609635
requestedBody = false
610636
}
611637

612-
return spanId.(string), requestedBody.(bool), nil
638+
// Handle injected headers
639+
injectedValues := make(map[string]string, len(haproxyTracingHeaderActions))
640+
for _, actionName := range haproxyTracingHeaderActions {
641+
if v, err := findVar(pRequest.Actions, actionName); err == nil {
642+
injectedValues[actionName] = v.(string)
643+
}
644+
}
645+
646+
return spanId.(string), requestedBody.(bool), injectedValues, nil
613647
}
614648

615649
// sendProcessingRequestBody sends the request body
@@ -809,7 +843,7 @@ func end2EndStreamRequest(t *testing.T, handler func(*request.Request), path str
809843
// First part: request
810844
// 1- Send the headers
811845
requestBodyLength := len(requestBody)
812-
spanId, requestBodyRequested, blocked := sendProcessingRequestHeaders(t, handler, requestHeaders, method, path, requestBodyLength)
846+
spanId, requestBodyRequested, _, blocked := sendProcessingRequestHeaders(t, handler, requestHeaders, method, path, requestBodyLength)
813847
require.Nil(t, blocked, "expected no blocked action when sending request headers")
814848

815849
require.NotEmpty(t, spanId)

contrib/haproxy/stream-processing-offload/haproxy_utils.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func spanIDFromMessage(msg *message.Message) (uint64, error) {
8787
return spanId, nil
8888
}
8989

90-
// setHeadersResponseData sets HeadersResponseData data into the request variables answering a Request Headers message
90+
// continueActionFunc sets HeadersResponseData data into the request variables answering a Request Headers message
9191
func continueActionFunc(ctx context.Context, options proxy.ContinueActionOptions) error {
9292
requestContextData, _ := ctx.Value(haproxyRequestKey).(*haproxyContextRequestDataType)
9393
if requestContextData == nil {
@@ -111,17 +111,48 @@ func continueActionFunc(ctx context.Context, options proxy.ContinueActionOptions
111111
spanId := s.Context().SpanID()
112112
spanIdStr := strconv.FormatUint(spanId, 10)
113113
requestContextData.req.Actions.SetVar(action.ScopeTransaction, "span_id", spanIdStr)
114+
115+
injectTracingHeaders(options.HeaderMutations, &requestContextData.req.Actions)
114116
}
115117

116118
if options.Body {
117119
requestContextData.req.Actions.SetVar(action.ScopeTransaction, "request_body", true)
118120
}
119121

120-
if len(options.HeaderMutations) > 0 {
121-
// TODO: List all possible headers that can be mutated (trace injection)
122+
return nil
123+
}
124+
125+
const headerCount = 5
126+
127+
var haproxyTracingHeaderActions = [headerCount]string{
128+
"tracing_x_datadog_trace_id",
129+
"tracing_x_datadog_parent_id",
130+
"tracing_x_datadog_origin",
131+
"tracing_x_datadog_sampling_priority",
132+
"tracing_x_datadog_tags",
133+
}
134+
135+
var datadogTracingHeaders = [headerCount]string{
136+
tracer.DefaultTraceIDHeader,
137+
tracer.DefaultParentIDHeader,
138+
"x-datadog-origin",
139+
tracer.DefaultPriorityHeader,
140+
"x-datadog-tags",
141+
}
142+
143+
// injectTracingHeaders injects tracing headers when present. Supporting only the Datadog tracing format.
144+
// https://docs.datadoghq.com/tracing/trace_collection/trace_context_propagation/#datadog-format
145+
func injectTracingHeaders(headerMutations map[string][]string, actions *action.Actions) {
146+
if len(headerMutations) == 0 {
147+
return
122148
}
123149

124-
return nil
150+
for i := range haproxyTracingHeaderActions {
151+
mutationHeader := http.CanonicalHeaderKey(datadogTracingHeaders[i])
152+
if v, ok := headerMutations[mutationHeader]; ok {
153+
actions.SetVar(action.ScopeTransaction, haproxyTracingHeaderActions[i], strings.TrimSpace(strings.Join(v, ",")))
154+
}
155+
}
125156
}
126157

127158
// setBlockResponseData sets blocked data into the request variables when the request is blocked

0 commit comments

Comments
 (0)