Skip to content

Commit 583fa70

Browse files
committed
add request ID injection to context to enable tracking reqeusts across downstream services
Signed-off-by: Erlan Zholdubai uulu <[email protected]>
1 parent f7d19ca commit 583fa70

File tree

13 files changed

+203
-22
lines changed

13 files changed

+203
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
* [ENHANCEMENT] Distributor: Add native histograms max sample size bytes limit validation. #6834
5757
* [ENHANCEMENT] Querier: Support caching parquet labels file in parquet queryable. #6835
5858
* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870
59+
* [ENHANCEMENT] API: add request ID injection to context to enable tracking reqeusts across downstream services. #6895
5960
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
6061
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
6162
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ api:
102102
# CLI flag: -api.http-request-headers-to-log
103103
[http_request_headers_to_log: <list of string> | default = []]
104104

105+
# HTTP header that can be used as request id
106+
# CLI flag: -api.request-id-header
107+
[request_id_header: <string> | default = ""]
108+
105109
# Regex for CORS origin. It is fully anchored. Example:
106110
# 'https?://(domain1|domain2)\.com'
107111
# CLI flag: -server.cors-origin

pkg/api/api.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ type Config struct {
7171
// Allows and is used to configure the addition of HTTP Header fields to logs
7272
HTTPRequestHeadersToLog flagext.StringSlice `yaml:"http_request_headers_to_log"`
7373

74+
// HTTP header that can be used as request id.
75+
// If it's not provided, or this header is empty, then random requestId will be generated
76+
RequestIdHeader string `yaml:"request_id_header"`
77+
7478
// This sets the Origin header value
7579
corsRegexString string `yaml:"cors_origin"`
7680

@@ -87,6 +91,7 @@ var (
8791
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8892
f.BoolVar(&cfg.ResponseCompression, "api.response-compression-enabled", false, "Use GZIP compression for API responses. Some endpoints serve large YAML or JSON blobs which can benefit from compression.")
8993
f.Var(&cfg.HTTPRequestHeadersToLog, "api.http-request-headers-to-log", "Which HTTP Request headers to add to logs")
94+
f.StringVar(&cfg.RequestIdHeader, "api.request-id-header", "", "HTTP header that can be used as request id")
9095
f.BoolVar(&cfg.buildInfoEnabled, "api.build-info-enabled", false, "If enabled, build Info API will be served by query frontend or querier.")
9196
f.StringVar(&cfg.QuerierDefaultCodec, "api.querier-default-codec", "json", "Choose default codec for querier response serialization. Supports 'json' and 'protobuf'.")
9297
cfg.RegisterFlagsWithPrefix("", f)
@@ -169,8 +174,9 @@ func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logge
169174
if cfg.HTTPAuthMiddleware == nil {
170175
api.AuthMiddleware = middleware.AuthenticateUser
171176
}
172-
if len(cfg.HTTPRequestHeadersToLog) > 0 {
173-
api.HTTPHeaderMiddleware = &HTTPHeaderMiddleware{TargetHeaders: cfg.HTTPRequestHeadersToLog}
177+
api.HTTPHeaderMiddleware = &HTTPHeaderMiddleware{
178+
TargetHeaders: cfg.HTTPRequestHeadersToLog,
179+
RequestIdHeader: cfg.RequestIdHeader,
174180
}
175181

176182
return api, nil

pkg/api/api_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func TestNewApiWithHeaderLogging(t *testing.T) {
8989

9090
}
9191

92+
// HTTPHeaderMiddleware should be added even if no headers are specified to log because it also handles request ID injection.
9293
func TestNewApiWithoutHeaderLogging(t *testing.T) {
9394
cfg := Config{
9495
HTTPRequestHeadersToLog: []string{},
@@ -102,7 +103,8 @@ func TestNewApiWithoutHeaderLogging(t *testing.T) {
102103

103104
api, err := New(cfg, serverCfg, server, &FakeLogger{})
104105
require.NoError(t, err)
105-
require.Nil(t, api.HTTPHeaderMiddleware)
106+
require.NotNil(t, api.HTTPHeaderMiddleware)
107+
require.Empty(t, api.HTTPHeaderMiddleware.TargetHeaders)
106108

107109
}
108110

pkg/api/middlewares.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,28 @@
11
package api
22

33
import (
4-
"context"
54
"net/http"
65

6+
"github.com/google/uuid"
7+
78
util_log "github.com/cortexproject/cortex/pkg/util/log"
9+
requestutil "github.com/cortexproject/cortex/pkg/util/request"
810
)
911

1012
// HTTPHeaderMiddleware adds specified HTTPHeaders to the request context
1113
type HTTPHeaderMiddleware struct {
12-
TargetHeaders []string
14+
TargetHeaders []string
15+
RequestIdHeader string
1316
}
1417

1518
// InjectTargetHeadersIntoHTTPRequest injects specified HTTPHeaders into the request context
16-
func (h HTTPHeaderMiddleware) InjectTargetHeadersIntoHTTPRequest(r *http.Request) context.Context {
19+
func (h HTTPHeaderMiddleware) InjectTargetHeadersIntoHTTPRequest(r *http.Request) *http.Request {
1720
headerMap := make(map[string]string)
1821

1922
// Check to make sure that Headers have not already been injected
2023
checkMapInContext := util_log.HeaderMapFromContext(r.Context())
2124
if checkMapInContext != nil {
22-
return r.Context()
25+
return r
2326
}
2427

2528
for _, target := range h.TargetHeaders {
@@ -28,13 +31,30 @@ func (h HTTPHeaderMiddleware) InjectTargetHeadersIntoHTTPRequest(r *http.Request
2831
headerMap[target] = contents
2932
}
3033
}
31-
return util_log.ContextWithHeaderMap(r.Context(), headerMap)
34+
ctx := util_log.ContextWithHeaderMap(r.Context(), headerMap)
35+
return r.WithContext(ctx)
36+
}
37+
38+
// InjectRequestIdHeader injects a request ID into the request context if one is not already present
39+
func (h HTTPHeaderMiddleware) InjectRequestIdHeader(r *http.Request) *http.Request {
40+
41+
if requestutil.RequestIdFromContext(r.Context()) != "" {
42+
return r
43+
}
44+
45+
reqId := r.Header.Get(h.RequestIdHeader)
46+
if reqId == "" {
47+
reqId = uuid.NewString()
48+
}
49+
ctx := requestutil.ContextWithRequestIdHeader(r.Context(), reqId)
50+
return r.WithContext(ctx)
3251
}
3352

3453
// Wrap implements Middleware
3554
func (h HTTPHeaderMiddleware) Wrap(next http.Handler) http.Handler {
3655
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
37-
ctx := h.InjectTargetHeadersIntoHTTPRequest(r)
38-
next.ServeHTTP(w, r.WithContext(ctx))
56+
r = h.InjectTargetHeadersIntoHTTPRequest(r)
57+
r = h.InjectRequestIdHeader(r)
58+
next.ServeHTTP(w, r)
3959
})
4060
}

pkg/api/middlewares_test.go

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/stretchr/testify/require"
99

1010
util_log "github.com/cortexproject/cortex/pkg/util/log"
11+
requestutil "github.com/cortexproject/cortex/pkg/util/request"
1112
)
1213

1314
var HTTPTestMiddleware = HTTPHeaderMiddleware{TargetHeaders: []string{"TestHeader1", "TestHeader2", "Test3"}}
@@ -32,9 +33,9 @@ func TestHeaderInjection(t *testing.T) {
3233
}
3334

3435
req = req.WithContext(ctx)
35-
ctx = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)
36+
req = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)
3637

37-
headerMap := util_log.HeaderMapFromContext(ctx)
38+
headerMap := util_log.HeaderMapFromContext(req.Context())
3839
require.NotNil(t, headerMap)
3940

4041
for _, header := range HTTPTestMiddleware.TargetHeaders {
@@ -67,8 +68,74 @@ func TestExistingHeaderInContextIsNotOverridden(t *testing.T) {
6768
}
6869

6970
req = req.WithContext(ctx)
70-
ctx = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)
71+
req = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)
7172

72-
require.Equal(t, contentsMap, util_log.HeaderMapFromContext(ctx))
73+
require.Equal(t, contentsMap, util_log.HeaderMapFromContext(req.Context()))
7374

7475
}
76+
77+
func TestRequestIdInjection(t *testing.T) {
78+
middleware := HTTPHeaderMiddleware{
79+
RequestIdHeader: "X-Request-ID",
80+
}
81+
82+
req := &http.Request{
83+
Method: "GET",
84+
RequestURI: "/test",
85+
Body: http.NoBody,
86+
Header: http.Header{},
87+
}
88+
req = req.WithContext(context.Background())
89+
req = middleware.InjectRequestIdHeader(req)
90+
91+
requestID := requestutil.RequestIdFromContext(req.Context())
92+
require.NotEmpty(t, requestID, "Request ID should be generated if not provided")
93+
}
94+
95+
func TestRequestIdFromHeaderIsUsed(t *testing.T) {
96+
const providedID = "my-test-id-123"
97+
98+
middleware := HTTPHeaderMiddleware{
99+
RequestIdHeader: "X-Request-ID",
100+
}
101+
102+
h := http.Header{}
103+
h.Add("X-Request-ID", providedID)
104+
105+
req := &http.Request{
106+
Method: "GET",
107+
RequestURI: "/test",
108+
Body: http.NoBody,
109+
Header: h,
110+
}
111+
req = req.WithContext(context.Background())
112+
req = middleware.InjectRequestIdHeader(req)
113+
114+
requestID := requestutil.RequestIdFromContext(req.Context())
115+
require.Equal(t, providedID, requestID, "Request ID from header should be used")
116+
}
117+
118+
func TestExistingRequestIdIsPreserved(t *testing.T) {
119+
const existingID = "already-present-id"
120+
121+
middleware := HTTPHeaderMiddleware{
122+
RequestIdHeader: "X-Request-ID",
123+
}
124+
125+
ctx := requestutil.ContextWithRequestIdHeader(context.Background(), existingID)
126+
127+
h := http.Header{}
128+
h.Add("X-Request-ID", "should-be-ignored")
129+
130+
req := &http.Request{
131+
Method: "GET",
132+
RequestURI: "/test",
133+
Body: http.NoBody,
134+
Header: h,
135+
}
136+
req = req.WithContext(ctx)
137+
req = middleware.InjectRequestIdHeader(req)
138+
139+
requestID := requestutil.RequestIdFromContext(req.Context())
140+
require.Equal(t, existingID, requestID, "Existing request ID in context should be preserved")
141+
}

pkg/cortex/cortex.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -392,10 +392,8 @@ func (t *Cortex) setupThanosTracing() {
392392
// setupGRPCHeaderForwarding appends a gRPC middleware used to enable the propagation of
393393
// HTTP Headers through child gRPC calls
394394
func (t *Cortex) setupGRPCHeaderForwarding() {
395-
if len(t.Cfg.API.HTTPRequestHeadersToLog) > 0 {
396-
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcutil.HTTPHeaderPropagationServerInterceptor)
397-
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, grpcutil.HTTPHeaderPropagationStreamServerInterceptor)
398-
}
395+
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcutil.HTTPHeaderPropagationServerInterceptor)
396+
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, grpcutil.HTTPHeaderPropagationStreamServerInterceptor)
399397
}
400398

401399
func (t *Cortex) setupRequestSigning() {

pkg/cortex/modules.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,9 +402,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
402402
// request context.
403403
internalQuerierRouter = t.API.AuthMiddleware.Wrap(internalQuerierRouter)
404404

405-
if len(t.Cfg.API.HTTPRequestHeadersToLog) > 0 {
406-
internalQuerierRouter = t.API.HTTPHeaderMiddleware.Wrap(internalQuerierRouter)
407-
}
405+
internalQuerierRouter = t.API.HTTPHeaderMiddleware.Wrap(internalQuerierRouter)
408406
}
409407

410408
// If neither frontend address or scheduler address is configured, no worker is needed.

pkg/querier/tripperware/roundtrip.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cortexproject/cortex/pkg/util"
3737
"github.com/cortexproject/cortex/pkg/util/limiter"
3838
util_log "github.com/cortexproject/cortex/pkg/util/log"
39+
requestutil "github.com/cortexproject/cortex/pkg/util/request"
3940
)
4041

4142
const (
@@ -269,6 +270,10 @@ func (q roundTripper) Do(ctx context.Context, r Request) (Response, error) {
269270
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
270271
}
271272

273+
if requestId := requestutil.RequestIdFromContext(ctx); requestId != "" {
274+
requestutil.InjectRequestIdIntoHTTPRequest(requestId, request)
275+
}
276+
272277
response, err := q.next.RoundTrip(request)
273278
if err != nil {
274279
return nil, err

pkg/querier/worker/frontend_processor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
1818
"github.com/cortexproject/cortex/pkg/util/backoff"
1919
util_log "github.com/cortexproject/cortex/pkg/util/log"
20+
requestutil "github.com/cortexproject/cortex/pkg/util/request"
2021
)
2122

2223
var (
@@ -141,6 +142,9 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H
141142
ctx = user.InjectOrgID(ctx, orgID)
142143
}
143144
ctx = util_log.ContextWithHeaderMap(ctx, headerMap)
145+
requestId := headers[textproto.CanonicalMIMEHeaderKey(requestutil.RequestIdPropagationString)]
146+
ctx = requestutil.ContextWithRequestIdHeader(ctx, requestId)
147+
144148
logger := util_log.WithContext(ctx, fp.log)
145149
if statsEnabled {
146150
level.Info(logger).Log("msg", "started running request")

0 commit comments

Comments
 (0)