Skip to content

add request ID injection to context to enable tracking requests across services #6895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870
* [ENHANCEMENT] Ring: Add zone label to ring_members metric. #6900
* [ENHANCEMENT] Ingester: Add new metric `cortex_ingester_push_errors_total` to track reasons for ingester request failures. #6901
* [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ api:
# CLI flag: -api.http-request-headers-to-log
[http_request_headers_to_log: <list of string> | default = []]

# HTTP header that can be used as request id
# CLI flag: -api.request-id-header
[request_id_header: <string> | default = ""]

# Regex for CORS origin. It is fully anchored. Example:
# 'https?://(domain1|domain2)\.com'
# CLI flag: -server.cors-origin
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ require (
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
github.com/cespare/xxhash/v2 v2.3.0
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/oklog/ulid/v2 v2.1.1
Expand Down Expand Up @@ -170,7 +171,6 @@ require (
github.com/google/btree v1.1.3 // indirect
github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect
Expand Down
10 changes: 8 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type Config struct {
// Allows and is used to configure the addition of HTTP Header fields to logs
HTTPRequestHeadersToLog flagext.StringSlice `yaml:"http_request_headers_to_log"`

// HTTP header that can be used as request id. It will always be included in logs
// If it's not provided, or this header is empty, then random requestId will be generated
RequestIdHeader string `yaml:"request_id_header"`

// This sets the Origin header value
corsRegexString string `yaml:"cors_origin"`

Expand All @@ -87,6 +91,7 @@ var (
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ResponseCompression, "api.response-compression-enabled", false, "Use GZIP compression for API responses. Some endpoints serve large YAML or JSON blobs which can benefit from compression.")
f.Var(&cfg.HTTPRequestHeadersToLog, "api.http-request-headers-to-log", "Which HTTP Request headers to add to logs")
f.StringVar(&cfg.RequestIdHeader, "api.request-id-header", "", "HTTP header that can be used as request id")
f.BoolVar(&cfg.buildInfoEnabled, "api.build-info-enabled", false, "If enabled, build Info API will be served by query frontend or querier.")
f.StringVar(&cfg.QuerierDefaultCodec, "api.querier-default-codec", "json", "Choose default codec for querier response serialization. Supports 'json' and 'protobuf'.")
cfg.RegisterFlagsWithPrefix("", f)
Expand Down Expand Up @@ -169,8 +174,9 @@ func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logge
if cfg.HTTPAuthMiddleware == nil {
api.AuthMiddleware = middleware.AuthenticateUser
}
if len(cfg.HTTPRequestHeadersToLog) > 0 {
api.HTTPHeaderMiddleware = &HTTPHeaderMiddleware{TargetHeaders: cfg.HTTPRequestHeadersToLog}
api.HTTPHeaderMiddleware = &HTTPHeaderMiddleware{
TargetHeaders: cfg.HTTPRequestHeadersToLog,
RequestIdHeader: cfg.RequestIdHeader,
}

return api, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestNewApiWithHeaderLogging(t *testing.T) {

}

// HTTPHeaderMiddleware should be added even if no headers are specified to log because it also handles request ID injection.
func TestNewApiWithoutHeaderLogging(t *testing.T) {
cfg := Config{
HTTPRequestHeadersToLog: []string{},
Expand All @@ -102,7 +103,8 @@ func TestNewApiWithoutHeaderLogging(t *testing.T) {

api, err := New(cfg, serverCfg, server, &FakeLogger{})
require.NoError(t, err)
require.Nil(t, api.HTTPHeaderMiddleware)
require.NotNil(t, api.HTTPHeaderMiddleware)
require.Empty(t, api.HTTPHeaderMiddleware.TargetHeaders)

}

Expand Down
37 changes: 24 additions & 13 deletions pkg/api/middlewares.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,51 @@
package api

import (
"context"
"net/http"

util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/google/uuid"

"github.com/cortexproject/cortex/pkg/util/requestmeta"
)

// HTTPHeaderMiddleware adds specified HTTPHeaders to the request context
type HTTPHeaderMiddleware struct {
TargetHeaders []string
TargetHeaders []string
RequestIdHeader string
}

// InjectTargetHeadersIntoHTTPRequest injects specified HTTPHeaders into the request context
func (h HTTPHeaderMiddleware) InjectTargetHeadersIntoHTTPRequest(r *http.Request) context.Context {
headerMap := make(map[string]string)
// injectRequestContext injects request related metadata into the request context
func (h HTTPHeaderMiddleware) injectRequestContext(r *http.Request) *http.Request {
requestContextMap := make(map[string]string)

// Check to make sure that Headers have not already been injected
checkMapInContext := util_log.HeaderMapFromContext(r.Context())
// Check to make sure that request context have not already been injected
checkMapInContext := requestmeta.MapFromContext(r.Context())
if checkMapInContext != nil {
return r.Context()
return r
}

for _, target := range h.TargetHeaders {
contents := r.Header.Get(target)
if contents != "" {
headerMap[target] = contents
requestContextMap[target] = contents
}
}
return util_log.ContextWithHeaderMap(r.Context(), headerMap)
requestContextMap[requestmeta.LoggingHeadersKey] = requestmeta.LoggingHeaderKeysToString(h.TargetHeaders)

reqId := r.Header.Get(h.RequestIdHeader)
if reqId == "" {
reqId = uuid.NewString()
}
requestContextMap[requestmeta.RequestIdKey] = reqId

ctx := requestmeta.ContextWithRequestMetadataMap(r.Context(), requestContextMap)
return r.WithContext(ctx)
}

// Wrap implements Middleware
func (h HTTPHeaderMiddleware) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := h.InjectTargetHeadersIntoHTTPRequest(r)
next.ServeHTTP(w, r.WithContext(ctx))
r = h.injectRequestContext(r)
next.ServeHTTP(w, r)
})
}
87 changes: 78 additions & 9 deletions pkg/api/middlewares_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (

"github.com/stretchr/testify/require"

util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/requestmeta"
)

var HTTPTestMiddleware = HTTPHeaderMiddleware{TargetHeaders: []string{"TestHeader1", "TestHeader2", "Test3"}}

func TestHeaderInjection(t *testing.T) {
middleware := HTTPHeaderMiddleware{TargetHeaders: []string{"TestHeader1", "TestHeader2", "Test3"}}
ctx := context.Background()
h := http.Header{}
contentsMap := make(map[string]string)
Expand All @@ -32,12 +31,12 @@ func TestHeaderInjection(t *testing.T) {
}

req = req.WithContext(ctx)
ctx = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)
req = middleware.injectRequestContext(req)

headerMap := util_log.HeaderMapFromContext(ctx)
headerMap := requestmeta.MapFromContext(req.Context())
require.NotNil(t, headerMap)

for _, header := range HTTPTestMiddleware.TargetHeaders {
for _, header := range middleware.TargetHeaders {
require.Equal(t, contentsMap[header], headerMap[header])
}
for header, contents := range contentsMap {
Expand All @@ -46,6 +45,7 @@ func TestHeaderInjection(t *testing.T) {
}

func TestExistingHeaderInContextIsNotOverridden(t *testing.T) {
middleware := HTTPHeaderMiddleware{TargetHeaders: []string{"TestHeader1", "TestHeader2", "Test3"}}
ctx := context.Background()

h := http.Header{}
Expand All @@ -58,7 +58,7 @@ func TestExistingHeaderInContextIsNotOverridden(t *testing.T) {
h.Add("TestHeader2", "Fail2")
h.Add("Test3", "Fail3")

ctx = util_log.ContextWithHeaderMap(ctx, contentsMap)
ctx = requestmeta.ContextWithRequestMetadataMap(ctx, contentsMap)
req := &http.Request{
Method: "GET",
RequestURI: "/HTTPHeaderTest",
Expand All @@ -67,8 +67,77 @@ func TestExistingHeaderInContextIsNotOverridden(t *testing.T) {
}

req = req.WithContext(ctx)
ctx = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)
req = middleware.injectRequestContext(req)

require.Equal(t, contentsMap, requestmeta.MapFromContext(req.Context()))

}

func TestRequestIdInjection(t *testing.T) {
middleware := HTTPHeaderMiddleware{
RequestIdHeader: "X-Request-ID",
}

req := &http.Request{
Method: "GET",
RequestURI: "/test",
Body: http.NoBody,
Header: http.Header{},
}
req = req.WithContext(context.Background())
req = middleware.injectRequestContext(req)

requestID := requestmeta.RequestIdFromContext(req.Context())
require.NotEmpty(t, requestID, "Request ID should be generated if not provided")
}

func TestRequestIdFromHeaderIsUsed(t *testing.T) {
const providedID = "my-test-id-123"

middleware := HTTPHeaderMiddleware{
RequestIdHeader: "X-Request-ID",
}

h := http.Header{}
h.Add("X-Request-ID", providedID)

require.Equal(t, contentsMap, util_log.HeaderMapFromContext(ctx))
req := &http.Request{
Method: "GET",
RequestURI: "/test",
Body: http.NoBody,
Header: h,
}
req = req.WithContext(context.Background())
req = middleware.injectRequestContext(req)

requestID := requestmeta.RequestIdFromContext(req.Context())
require.Equal(t, providedID, requestID, "Request ID from header should be used")
}

func TestTargetHeaderAndRequestIdHeaderOverlap(t *testing.T) {
const headerKey = "X-Request-ID"
const providedID = "overlap-id-456"

middleware := HTTPHeaderMiddleware{
TargetHeaders: []string{headerKey, "Other-Header"},
RequestIdHeader: headerKey,
}

h := http.Header{}
h.Add(headerKey, providedID)
h.Add("Other-Header", "some-value")

req := &http.Request{
Method: "GET",
RequestURI: "/test",
Body: http.NoBody,
Header: h,
}
req = req.WithContext(context.Background())
req = middleware.injectRequestContext(req)

ctxMap := requestmeta.MapFromContext(req.Context())
requestID := requestmeta.RequestIdFromContext(req.Context())
require.Equal(t, providedID, ctxMap[headerKey], "Header value should be correctly stored")
require.Equal(t, providedID, requestID, "Request ID should come from the overlapping header")
}
6 changes: 2 additions & 4 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,8 @@ func (t *Cortex) setupThanosTracing() {
// setupGRPCHeaderForwarding appends a gRPC middleware used to enable the propagation of
// HTTP Headers through child gRPC calls
func (t *Cortex) setupGRPCHeaderForwarding() {
if len(t.Cfg.API.HTTPRequestHeadersToLog) > 0 {
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcutil.HTTPHeaderPropagationServerInterceptor)
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, grpcutil.HTTPHeaderPropagationStreamServerInterceptor)
}
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcutil.HTTPHeaderPropagationServerInterceptor)
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, grpcutil.HTTPHeaderPropagationStreamServerInterceptor)
}

func (t *Cortex) setupRequestSigning() {
Expand Down
4 changes: 1 addition & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
// request context.
internalQuerierRouter = t.API.AuthMiddleware.Wrap(internalQuerierRouter)

if len(t.Cfg.API.HTTPRequestHeadersToLog) > 0 {
internalQuerierRouter = t.API.HTTPHeaderMiddleware.Wrap(internalQuerierRouter)
}
internalQuerierRouter = t.API.HTTPHeaderMiddleware.Wrap(internalQuerierRouter)
}

// If neither frontend address or scheduler address is configured, no worker is needed.
Expand Down
7 changes: 4 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
util_math "github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/requestmeta"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -892,9 +893,9 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
// Get any HTTP headers that are supposed to be added to logs and add to localCtx for later use
if headerMap := util_log.HeaderMapFromContext(ctx); headerMap != nil {
localCtx = util_log.ContextWithHeaderMap(localCtx, headerMap)
// Get any HTTP request metadata that are supposed to be added to logs and add to localCtx for later use
if requestContextMap := requestmeta.MapFromContext(ctx); requestContextMap != nil {
localCtx = requestmeta.ContextWithRequestMetadataMap(localCtx, requestContextMap)
}
// Get clientIP(s) from Context and add it to localCtx
source := util.GetSourceIPsFromOutgoingCtx(ctx)
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/requestmeta"
)

const (
Expand Down Expand Up @@ -261,8 +261,8 @@ func (q roundTripper) Do(ctx context.Context, r Request) (Response, error) {
return nil, err
}

if headerMap := util_log.HeaderMapFromContext(ctx); headerMap != nil {
util_log.InjectHeadersIntoHTTPRequest(headerMap, request)
if requestMetadataMap := requestmeta.MapFromContext(ctx); requestMetadataMap != nil {
requestmeta.InjectMetadataIntoHTTPRequestHeaders(requestMetadataMap, request)
}

if err := user.InjectOrgIDIntoHTTPRequest(ctx, request); err != nil {
Expand Down
11 changes: 3 additions & 8 deletions pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util/backoff"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/requestmeta"
)

var (
Expand Down Expand Up @@ -129,18 +130,12 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H
for _, h := range request.Headers {
headers[h.Key] = h.Values[0]
}
headerMap := make(map[string]string, 0)
// Remove non-existent header.
for _, header := range fp.targetHeaders {
if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok {
headerMap[header] = v
}
}
ctx = requestmeta.ContextWithRequestMetadataMapFromHeaders(ctx, headers, fp.targetHeaders)

orgID, ok := headers[textproto.CanonicalMIMEHeaderKey(user.OrgIDHeaderName)]
if ok {
ctx = user.InjectOrgID(ctx, orgID)
}
ctx = util_log.ContextWithHeaderMap(ctx, headerMap)
logger := util_log.WithContext(ctx, fp.log)
if statsEnabled {
level.Info(logger).Log("msg", "started running request")
Expand Down
Loading
Loading