From bd8885e5991305f558332d448b5e3a1d2003fbad Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Fri, 27 Jun 2025 12:37:03 -0700 Subject: [PATCH 01/10] add zstd and snappy compression for query api Signed-off-by: Ahmed Hassan --- pkg/api/queryapi/compression.go | 90 +++++++++++++++++++ pkg/api/queryapi/query_api.go | 4 +- pkg/frontend/transport/handler.go | 4 +- .../tripperware/instantquery/instant_query.go | 22 ++++- pkg/querier/tripperware/query.go | 67 ++++++++++---- pkg/querier/tripperware/query_test.go | 50 ----------- .../tripperware/queryrange/query_range.go | 21 ++++- 7 files changed, 182 insertions(+), 76 deletions(-) create mode 100644 pkg/api/queryapi/compression.go diff --git a/pkg/api/queryapi/compression.go b/pkg/api/queryapi/compression.go new file mode 100644 index 00000000000..7dd6fcbacab --- /dev/null +++ b/pkg/api/queryapi/compression.go @@ -0,0 +1,90 @@ +package queryapi + +import ( + "io" + "net/http" + "strings" + + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zlib" + "github.com/klauspost/compress/zstd" +) + +const ( + acceptEncodingHeader = "Accept-Encoding" + contentEncodingHeader = "Content-Encoding" + gzipEncoding = "gzip" + deflateEncoding = "deflate" + snappyEncoding = "snappy" + zstdEncoding = "zstd" +) + +// Wrapper around http.Handler which adds suitable response compression based +// on the client's Accept-Encoding headers. +type compressedResponseWriter struct { + http.ResponseWriter + writer io.Writer +} + +// Writes HTTP response content data. +func (c *compressedResponseWriter) Write(p []byte) (int, error) { + return c.writer.Write(p) +} + +// Closes the compressedResponseWriter and ensures to flush all data before. +func (c *compressedResponseWriter) Close() { + if zstdWriter, ok := c.writer.(*zstd.Encoder); ok { + zstdWriter.Flush() + } + if snappyWriter, ok := c.writer.(*snappy.Writer); ok { + snappyWriter.Flush() + } + if zlibWriter, ok := c.writer.(*zlib.Writer); ok { + zlibWriter.Flush() + } + if gzipWriter, ok := c.writer.(*gzip.Writer); ok { + gzipWriter.Flush() + } + if closer, ok := c.writer.(io.Closer); ok { + defer closer.Close() + } +} + +// Constructs a new compressedResponseWriter based on client request headers. +func newCompressedResponseWriter(writer http.ResponseWriter, req *http.Request) *compressedResponseWriter { + encodings := strings.Split(req.Header.Get(acceptEncodingHeader), ",") + for _, encoding := range encodings { + switch strings.TrimSpace(encoding) { + case zstdEncoding: + encoder, err := zstd.NewWriter(writer) + if err == nil { + writer.Header().Set(contentEncodingHeader, zstdEncoding) + return &compressedResponseWriter{ResponseWriter: writer, writer: encoder} + } + case snappyEncoding: + writer.Header().Set(contentEncodingHeader, snappyEncoding) + return &compressedResponseWriter{ResponseWriter: writer, writer: snappy.NewBufferedWriter(writer)} + case gzipEncoding: + writer.Header().Set(contentEncodingHeader, gzipEncoding) + return &compressedResponseWriter{ResponseWriter: writer, writer: gzip.NewWriter(writer)} + case deflateEncoding: + writer.Header().Set(contentEncodingHeader, deflateEncoding) + return &compressedResponseWriter{ResponseWriter: writer, writer: zlib.NewWriter(writer)} + } + } + return &compressedResponseWriter{ResponseWriter: writer, writer: writer} +} + +// CompressionHandler is a wrapper around http.Handler which adds suitable +// response compression based on the client's Accept-Encoding headers. +type CompressionHandler struct { + Handler http.Handler +} + +// ServeHTTP adds compression to the original http.Handler's ServeHTTP() method. +func (c CompressionHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) { + compWriter := newCompressedResponseWriter(writer, req) + c.Handler.ServeHTTP(compWriter, req) + compWriter.Close() +} diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go index e3793ef5bee..5dd125a6c39 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/queryapi/query_api.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "strconv" "time" "github.com/go-kit/log" @@ -208,7 +209,7 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc { w.WriteHeader(http.StatusNoContent) } - return httputil.CompressionHandler{ + return CompressionHandler{ Handler: http.HandlerFunc(hf), }.ServeHTTP } @@ -237,6 +238,7 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interf } w.Header().Set("Content-Type", codec.ContentType().String()) + w.Header().Set("X-Uncompressed-Length", strconv.Itoa(len(b))) w.WriteHeader(http.StatusOK) if n, err := w.Write(b); err != nil { level.Error(q.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index c988209fb0a..0f843e9b9cb 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -76,8 +76,6 @@ const ( limitBytesStoreGateway = `exceeded bytes limit` ) -var noopResponseSizeLimiter = limiter.NewResponseSizeLimiter(0) - // Config for a Handler. type HandlerConfig struct { LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"` @@ -308,7 +306,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // If the response status code is not 2xx, try to get the // error message from response body. if resp.StatusCode/100 != 2 { - body, err2 := tripperware.BodyBytes(resp, noopResponseSizeLimiter, f.log) + body, err2 := tripperware.BodyBytes(resp, f.log) if err2 == nil { err = httpgrpc.Errorf(resp.StatusCode, "%s", string(body)) } diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 54fe4aeba0d..0d75a777763 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "net/url" + "strconv" "strings" "time" @@ -45,8 +46,15 @@ type instantQueryCodec struct { func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) instantQueryCodec { compression := tripperware.NonCompression // default - if compressionStr == string(tripperware.GzipCompression) { + switch compressionStr { + case string(tripperware.GzipCompression): compression = tripperware.GzipCompression + + case string(tripperware.SnappyCompression): + compression = tripperware.SnappyCompression + + case string(tripperware.ZstdCompression): + compression = tripperware.ZstdCompression } defaultCodecType := tripperware.JsonCodecType // default @@ -100,8 +108,18 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, return nil, err } + responseSize, err := strconv.Atoi(r.Header.Get("X-Uncompressed-Length")) + if err != nil { + log.Error(err) + return nil, err + } + responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx) - body, err := tripperware.BodyBytes(r, responseSizeLimiter, log) + if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil { + return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error()) + } + + body, err := tripperware.BodyBytes(r, log) if err != nil { log.Error(err) return nil, err diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index e20ab6e3c4e..059fa0deb3f 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/gzip" "context" - "encoding/binary" "fmt" "io" "net/http" @@ -16,6 +15,8 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" jsoniter "github.com/json-iterator/go" + "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zstd" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" @@ -27,7 +28,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/runutil" ) @@ -44,6 +44,8 @@ type Compression string const ( GzipCompression Compression = "gzip" + ZstdCompression Compression = "zstd" + SnappyCompression Compression = "snappy" NonCompression Compression = "" JsonCodecType CodecType = "json" ProtobufCodecType CodecType = "protobuf" @@ -434,7 +436,7 @@ type Buffer interface { Bytes() []byte } -func BodyBytes(res *http.Response, responseSizeLimiter *limiter.ResponseSizeLimiter, logger log.Logger) ([]byte, error) { +func BodyBytes(res *http.Response, logger log.Logger) ([]byte, error) { var buf *bytes.Buffer // Attempt to cast the response body to a Buffer and use it if possible. @@ -452,11 +454,6 @@ func BodyBytes(res *http.Response, responseSizeLimiter *limiter.ResponseSizeLimi } } - responseSize := getResponseSize(res, buf) - if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil { - return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error()) - } - // if the response is gzipped, lets unzip it here if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") { gReader, err := gzip.NewReader(buf) @@ -468,15 +465,33 @@ func BodyBytes(res *http.Response, responseSizeLimiter *limiter.ResponseSizeLimi return io.ReadAll(gReader) } + // if the response is snappy compressed, decode it here + if strings.EqualFold(res.Header.Get("Content-Encoding"), "snappy") { + sReader := snappy.NewReader(buf) + return io.ReadAll(sReader) + } + + // if the response is zstd compressed, decode it here + if strings.EqualFold(res.Header.Get("Content-Encoding"), "zstd") { + zReader, err := zstd.NewReader(buf) + if err != nil { + return nil, err + } + defer runutil.CloseWithLogOnErr(logger, io.NopCloser(zReader), "close zstd decoder") + + return io.ReadAll(zReader) + } + return buf.Bytes(), nil } func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) { - // if the response is gzipped, lets unzip it here headers := http.Header{} for _, h := range res.Headers { headers[h.Key] = h.Values } + + // if the response is gzipped, lets unzip it here if strings.EqualFold(headers.Get("Content-Encoding"), "gzip") { gReader, err := gzip.NewReader(bytes.NewBuffer(res.Body)) if err != nil { @@ -487,16 +502,24 @@ func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger return io.ReadAll(gReader) } - return res.Body, nil -} + // if the response is snappy compressed, decode it here + if strings.EqualFold(headers.Get("Content-Encoding"), "snappy") { + sReader := snappy.NewReader(bytes.NewBuffer(res.Body)) + return io.ReadAll(sReader) + } + + // if the response is zstd compressed, decode it here + if strings.EqualFold(headers.Get("Content-Encoding"), "zstd") { + zReader, err := zstd.NewReader(bytes.NewBuffer(res.Body)) + if err != nil { + return nil, err + } + defer runutil.CloseWithLogOnErr(logger, io.NopCloser(zReader), "close zstd decoder") -func getResponseSize(res *http.Response, buf *bytes.Buffer) int { - if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") && len(buf.Bytes()) >= 4 { - // GZIP body contains the size of the original (uncompressed) input data - // modulo 2^32 in the last 4 bytes (https://www.ietf.org/rfc/rfc1952.txt). - return int(binary.LittleEndian.Uint32(buf.Bytes()[len(buf.Bytes())-4:])) + return io.ReadAll(zReader) } - return len(buf.Bytes()) + + return res.Body, nil } // UnmarshalJSON implements json.Unmarshaler. @@ -755,9 +778,17 @@ func (s *PrometheusResponseStats) MarshalJSON() ([]byte, error) { } func SetRequestHeaders(h http.Header, defaultCodecType CodecType, compression Compression) { - if compression == GzipCompression { + switch compression { + case GzipCompression: h.Set("Accept-Encoding", string(GzipCompression)) + + case SnappyCompression: + h.Set("Accept-Encoding", string(SnappyCompression)) + + case ZstdCompression: + h.Set("Accept-Encoding", string(ZstdCompression)) } + if defaultCodecType == ProtobufCodecType { h.Set("Accept", ApplicationProtobuf+", "+ApplicationJson) } else { diff --git a/pkg/querier/tripperware/query_test.go b/pkg/querier/tripperware/query_test.go index 04606df99e6..08f149f43b0 100644 --- a/pkg/querier/tripperware/query_test.go +++ b/pkg/querier/tripperware/query_test.go @@ -1,10 +1,7 @@ package tripperware import ( - "bytes" - "compress/gzip" "math" - "net/http" "strconv" "testing" "time" @@ -196,50 +193,3 @@ func generateData(timeseries, datapoints int) (floatMatrix, histogramMatrix []*S } return } - -func Test_getResponseSize(t *testing.T) { - tests := []struct { - body []byte - useGzip bool - }{ - { - body: []byte(`foo`), - useGzip: false, - }, - { - body: []byte(`foo`), - useGzip: true, - }, - { - body: []byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`), - useGzip: false, - }, - { - body: []byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`), - useGzip: true, - }, - } - - for i, test := range tests { - t.Run(strconv.Itoa(i), func(t *testing.T) { - expectedBodyLength := len(test.body) - buf := &bytes.Buffer{} - response := &http.Response{} - - if test.useGzip { - response = &http.Response{ - Header: http.Header{"Content-Encoding": []string{"gzip"}}, - } - w := gzip.NewWriter(buf) - _, err := w.Write(test.body) - require.NoError(t, err) - w.Close() - } else { - buf = bytes.NewBuffer(test.body) - } - - bodyLength := getResponseSize(response, buf) - require.Equal(t, expectedBodyLength, bodyLength) - }) - } -} diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 9d82031fc0b..8ac81ef6e31 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -63,8 +63,15 @@ type prometheusCodec struct { func NewPrometheusCodec(sharded bool, compressionStr string, defaultCodecTypeStr string) *prometheusCodec { //nolint:revive compression := tripperware.NonCompression // default - if compressionStr == string(tripperware.GzipCompression) { + switch compressionStr { + case string(tripperware.GzipCompression): compression = tripperware.GzipCompression + + case string(tripperware.SnappyCompression): + compression = tripperware.SnappyCompression + + case string(tripperware.ZstdCompression): + compression = tripperware.ZstdCompression } defaultCodecType := tripperware.JsonCodecType // default @@ -196,8 +203,18 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return nil, err } + responseSize, err := strconv.Atoi(r.Header.Get("X-Uncompressed-Length")) + if err != nil { + log.Error(err) + return nil, err + } + responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx) - body, err := tripperware.BodyBytes(r, responseSizeLimiter, log) + if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil { + return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error()) + } + + body, err := tripperware.BodyBytes(r, log) if err != nil { log.Error(err) return nil, err From c4a6b86fc0d5f7ef212f8c81163c9245fa55c35a Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Tue, 8 Jul 2025 17:28:30 -0700 Subject: [PATCH 02/10] parse X-Uncompressed-Length only if header exists Signed-off-by: Ahmed Hassan --- .../tripperware/instantquery/instant_query.go | 13 +++++++++---- pkg/querier/tripperware/queryrange/query_range.go | 13 +++++++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 0d75a777763..09c0bacd8ac 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -108,10 +108,15 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, return nil, err } - responseSize, err := strconv.Atoi(r.Header.Get("X-Uncompressed-Length")) - if err != nil { - log.Error(err) - return nil, err + responseSize := 0 + responseSizeHeader := r.Header.Get("X-Uncompressed-Length") + if responseSizeHeader != "" { + var err error + responseSize, err = strconv.Atoi(responseSizeHeader) + if err != nil { + log.Error(err) + return nil, err + } } responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx) diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 8ac81ef6e31..513df803c05 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -203,10 +203,15 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return nil, err } - responseSize, err := strconv.Atoi(r.Header.Get("X-Uncompressed-Length")) - if err != nil { - log.Error(err) - return nil, err + responseSize := 0 + responseSizeHeader := r.Header.Get("X-Uncompressed-Length") + if responseSizeHeader != "" { + var err error + responseSize, err = strconv.Atoi(responseSizeHeader) + if err != nil { + log.Error(err) + return nil, err + } } responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx) From 4f76eea5f59e0aaa17353f1308a2d863d7a9fc7e Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Tue, 8 Jul 2025 17:39:17 -0700 Subject: [PATCH 03/10] fix formatting Signed-off-by: Ahmed Hassan --- pkg/querier/tripperware/instantquery/instant_query.go | 2 +- pkg/querier/tripperware/queryrange/query_range.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 09c0bacd8ac..a16303aa741 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -108,7 +108,7 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, return nil, err } - responseSize := 0 + responseSize := 0 responseSizeHeader := r.Header.Get("X-Uncompressed-Length") if responseSizeHeader != "" { var err error diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 513df803c05..84670f8028f 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -203,7 +203,7 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return nil, err } - responseSize := 0 + responseSize := 0 responseSizeHeader := r.Header.Get("X-Uncompressed-Length") if responseSizeHeader != "" { var err error From ecafc447f71e3de5785fb54bd2c3c8f275a90dec Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Thu, 17 Jul 2025 13:32:41 -0700 Subject: [PATCH 04/10] refactor query decompression Signed-off-by: Ahmed Hassan --- pkg/querier/tripperware/query.go | 53 ++++++++++---------------------- 1 file changed, 17 insertions(+), 36 deletions(-) diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 059fa0deb3f..4270d718312 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -454,35 +454,9 @@ func BodyBytes(res *http.Response, logger log.Logger) ([]byte, error) { } } - // if the response is gzipped, lets unzip it here - if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") { - gReader, err := gzip.NewReader(buf) - if err != nil { - return nil, err - } - defer runutil.CloseWithLogOnErr(logger, gReader, "close gzip reader") - - return io.ReadAll(gReader) - } - - // if the response is snappy compressed, decode it here - if strings.EqualFold(res.Header.Get("Content-Encoding"), "snappy") { - sReader := snappy.NewReader(buf) - return io.ReadAll(sReader) - } - - // if the response is zstd compressed, decode it here - if strings.EqualFold(res.Header.Get("Content-Encoding"), "zstd") { - zReader, err := zstd.NewReader(buf) - if err != nil { - return nil, err - } - defer runutil.CloseWithLogOnErr(logger, io.NopCloser(zReader), "close zstd decoder") - - return io.ReadAll(zReader) - } - - return buf.Bytes(), nil + // Handle decoding response if it was compressed + encoding := res.Header.Get("Content-Encoding") + return decode(buf, encoding, logger) } func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) { @@ -491,9 +465,16 @@ func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger headers[h.Key] = h.Values } + // Handle decoding response if it was compressed + encoding := headers.Get("Content-Encoding") + buf := bytes.NewBuffer(res.Body) + return decode(buf, encoding, logger) +} + +func decode(buf *bytes.Buffer, encoding string, logger log.Logger) ([]byte, error) { // if the response is gzipped, lets unzip it here - if strings.EqualFold(headers.Get("Content-Encoding"), "gzip") { - gReader, err := gzip.NewReader(bytes.NewBuffer(res.Body)) + if strings.EqualFold(encoding, "gzip") { + gReader, err := gzip.NewReader(buf) if err != nil { return nil, err } @@ -503,14 +484,14 @@ func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger } // if the response is snappy compressed, decode it here - if strings.EqualFold(headers.Get("Content-Encoding"), "snappy") { - sReader := snappy.NewReader(bytes.NewBuffer(res.Body)) + if strings.EqualFold(encoding, "snappy") { + sReader := snappy.NewReader(buf) return io.ReadAll(sReader) } // if the response is zstd compressed, decode it here - if strings.EqualFold(headers.Get("Content-Encoding"), "zstd") { - zReader, err := zstd.NewReader(bytes.NewBuffer(res.Body)) + if strings.EqualFold(encoding, "zstd") { + zReader, err := zstd.NewReader(buf) if err != nil { return nil, err } @@ -519,7 +500,7 @@ func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger return io.ReadAll(zReader) } - return res.Body, nil + return buf.Bytes(), nil } // UnmarshalJSON implements json.Unmarshaler. From cdbb153c02b7dbfeef06297da8a9f8b39ca0adfa Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Fri, 18 Jul 2025 12:45:05 -0700 Subject: [PATCH 05/10] ensure zstd reader is closed after decompression Signed-off-by: Ahmed Hassan --- pkg/querier/tripperware/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 4270d718312..dd54d13a8cb 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -495,7 +495,7 @@ func decode(buf *bytes.Buffer, encoding string, logger log.Logger) ([]byte, erro if err != nil { return nil, err } - defer runutil.CloseWithLogOnErr(logger, io.NopCloser(zReader), "close zstd decoder") + defer runutil.CloseWithLogOnErr(logger, zReader.IOReadCloser(), "close zstd decoder") return io.ReadAll(zReader) } From 370cdde6b96a1650a900c79bd6504adba1ca4748 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Fri, 25 Jul 2025 17:44:26 -0700 Subject: [PATCH 06/10] add tests for zstd and snappy compression Signed-off-by: Ahmed Hassan --- integration/query_frontend_test.go | 24 +++- pkg/api/queryapi/compression_test.go | 159 +++++++++++++++++++++++++++ pkg/querier/querier.go | 6 +- 3 files changed, 184 insertions(+), 5 deletions(-) create mode 100644 pkg/api/queryapi/compression_test.go diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 6d7b0651d7a..9167f954b45 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -216,14 +216,34 @@ func TestQueryFrontendProtobufCodec(t *testing.T) { require.NoError(t, s.StartAndWaitReady(minio)) flags = mergeFlags(e2e.EmptyFlags(), map[string]string{ - "-api.querier-default-codec": "protobuf", - "-querier.response-compression": "gzip", + "-api.querier-default-codec": "protobuf", }) return cortexConfigFile, flags }, }) } +func TestQuerierToQueryFrontendCompression(t *testing.T) { + for _, compression := range []string{"gzip","zstd","snappy",""} { + runQueryFrontendTest(t, queryFrontendTestConfig{ + testMissingMetricName: false, + querySchedulerEnabled: true, + queryStatsEnabled: true, + setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) + + minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + flags = mergeFlags(e2e.EmptyFlags(), map[string]string{ + "-querier.response-compression": compression, + }) + return cortexConfigFile, flags + }, + }) + } +} + func TestQueryFrontendRemoteRead(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ remoteReadEnabled: true, diff --git a/pkg/api/queryapi/compression_test.go b/pkg/api/queryapi/compression_test.go new file mode 100644 index 00000000000..f4be2e0181b --- /dev/null +++ b/pkg/api/queryapi/compression_test.go @@ -0,0 +1,159 @@ +package queryapi + +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zlib" + "github.com/klauspost/compress/zstd" + "github.com/stretchr/testify/require" +) + +func decompress(t *testing.T, encoding string, b []byte) []byte { + t.Helper() + + switch encoding { + case gzipEncoding: + r, err := gzip.NewReader(bytes.NewReader(b)) + require.NoError(t, err) + defer r.Close() + data, err := io.ReadAll(r) + require.NoError(t, err) + return data + case deflateEncoding: + r, err := zlib.NewReader(bytes.NewReader(b)) + require.NoError(t, err) + defer r.Close() + data, err := io.ReadAll(r) + require.NoError(t, err) + return data + case snappyEncoding: + data, err := io.ReadAll(snappy.NewReader(bytes.NewReader(b))) + require.NoError(t, err) + return data + case zstdEncoding: + r, err := zstd.NewReader(bytes.NewReader(b)) + require.NoError(t, err) + defer r.Close() + data, err := io.ReadAll(r) + require.NoError(t, err) + return data + default: + return b + } +} + +func TestNewCompressedResponseWriter_SupportedEncodings(t *testing.T) { + for _, tc := range []string{gzipEncoding, deflateEncoding, snappyEncoding, zstdEncoding} { + t.Run(tc, func(t *testing.T) { + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.Header.Set(acceptEncodingHeader, tc) + + cw := newCompressedResponseWriter(rec, req) + payload := []byte("hello world") + _, err := cw.Write(payload) + require.NoError(t, err) + cw.Close() + + require.Equal(t, tc, rec.Header().Get(contentEncodingHeader)) + + decompressed := decompress(t, tc, rec.Body.Bytes()) + require.Equal(t, payload, decompressed) + + switch tc { + case gzipEncoding: + _, ok := cw.writer.(*gzip.Writer) + require.True(t, ok) + case deflateEncoding: + _, ok := cw.writer.(*zlib.Writer) + require.True(t, ok) + case snappyEncoding: + _, ok := cw.writer.(*snappy.Writer) + require.True(t, ok) + case zstdEncoding: + _, ok := cw.writer.(*zstd.Encoder) + require.True(t, ok) + } + }) + } +} + +func TestNewCompressedResponseWriter_UnsupportedEncoding(t *testing.T) { + for _, tc := range []string{"", "br", "unknown"} { + t.Run(tc, func(t *testing.T) { + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/", nil) + if tc != "" { + req.Header.Set(acceptEncodingHeader, tc) + } + + cw := newCompressedResponseWriter(rec, req) + payload := []byte("data") + _, err := cw.Write(payload) + require.NoError(t, err) + cw.Close() + + require.Empty(t, rec.Header().Get(contentEncodingHeader)) + require.Equal(t, payload, rec.Body.Bytes()) + require.Same(t, rec, cw.writer) + }) + } +} + +func TestNewCompressedResponseWriter_MultipleEncodings(t *testing.T) { + tests := []struct { + header string + expectEnc string + expectType interface{} + }{ + {"snappy, gzip", snappyEncoding, &snappy.Writer{}}, + {"unknown, gzip", gzipEncoding, &gzip.Writer{}}, + } + + for _, tc := range tests { + t.Run(tc.header, func(t *testing.T) { + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.Header.Set(acceptEncodingHeader, tc.header) + + cw := newCompressedResponseWriter(rec, req) + _, err := cw.Write([]byte("payload")) + require.NoError(t, err) + cw.Close() + + require.Equal(t, tc.expectEnc, rec.Header().Get(contentEncodingHeader)) + decompressed := decompress(t, tc.expectEnc, rec.Body.Bytes()) + require.Equal(t, []byte("payload"), decompressed) + + switch tc.expectEnc { + case gzipEncoding: + require.IsType(t, &gzip.Writer{}, cw.writer) + case snappyEncoding: + require.IsType(t, &snappy.Writer{}, cw.writer) + } + }) + } +} + +func TestCompressionHandler_ServeHTTP(t *testing.T) { + handler := CompressionHandler{Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, err := w.Write([]byte("hello")) + require.NoError(t, err) + })} + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.Header.Set(acceptEncodingHeader, gzipEncoding) + + handler.ServeHTTP(rec, req) + + require.Equal(t, gzipEncoding, rec.Header().Get(contentEncodingHeader)) + decompressed := decompress(t, gzipEncoding, rec.Body.Bytes()) + require.Equal(t, []byte("hello"), decompressed) +} \ No newline at end of file diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b1a94f1d40f..e8adaa43229 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -103,7 +103,7 @@ var ( errBadLookbackConfigs = errors.New("bad settings, query_store_after >= query_ingesters_within which can result in queries not being sent") errShuffleShardingLookbackLessThanQueryStoreAfter = errors.New("the shuffle-sharding lookback period should be greater or equal than the configured 'query store after'") errEmptyTimeRange = errors.New("empty time range") - errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)") + errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)") errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1") errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1") errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet") @@ -128,7 +128,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.") f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.") - f.StringVar(&cfg.ResponseCompression, "querier.response-compression", "gzip", "Use compression for metrics query API or instant and range query APIs. Supports 'gzip' and '' (disable compression)") + f.StringVar(&cfg.ResponseCompression, "querier.response-compression", "gzip", "Use compression for metrics query API or instant and range query APIs. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)") f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.") @@ -157,7 +157,7 @@ func (cfg *Config) Validate() error { } } - if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" { + if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" && cfg.ResponseCompression != "snappy" && cfg.ResponseCompression != "zstd" { return errUnsupportedResponseCompression } From cde5f41dac4a8ff2b219df440b618ceea811c471 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Fri, 25 Jul 2025 17:47:44 -0700 Subject: [PATCH 07/10] update changelog Signed-off-by: Ahmed Hassan --- CHANGELOG.md | 1 + integration/query_frontend_test.go | 2 +- pkg/api/queryapi/compression_test.go | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dc2b3498e5..8ea75e48a7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580 * [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738 * [FEATURE] Querier: Allow choosing PromQL engine via header. #6777 +* [ENHANCEMENT] Querier: Support snappy and zstd response compression for `-querier.response-compression` flag. #6848 * [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845 * [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859 * [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715 diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 9167f954b45..b77bfa64756 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -224,7 +224,7 @@ func TestQueryFrontendProtobufCodec(t *testing.T) { } func TestQuerierToQueryFrontendCompression(t *testing.T) { - for _, compression := range []string{"gzip","zstd","snappy",""} { + for _, compression := range []string{"gzip", "zstd", "snappy", ""} { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: true, diff --git a/pkg/api/queryapi/compression_test.go b/pkg/api/queryapi/compression_test.go index f4be2e0181b..bcd36a3728c 100644 --- a/pkg/api/queryapi/compression_test.go +++ b/pkg/api/queryapi/compression_test.go @@ -156,4 +156,4 @@ func TestCompressionHandler_ServeHTTP(t *testing.T) { require.Equal(t, gzipEncoding, rec.Header().Get(contentEncodingHeader)) decompressed := decompress(t, gzipEncoding, rec.Body.Bytes()) require.Equal(t, []byte("hello"), decompressed) -} \ No newline at end of file +} From 4a51ea26c9b3c42c203ea6b10de9934ec45c4e69 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Fri, 25 Jul 2025 20:20:16 -0700 Subject: [PATCH 08/10] update docs Signed-off-by: Ahmed Hassan --- docs/blocks-storage/querier.md | 2 +- docs/configuration/config-file-reference.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index ac9dd92d291..4845a620371 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -127,7 +127,7 @@ querier: [per_step_stats_enabled: | default = false] # Use compression for metrics query API or instant and range query APIs. - # Supports 'gzip' and '' (disable compression) + # Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression) # CLI flag: -querier.response-compression [response_compression: | default = "gzip"] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2963a87348c..f1f74db9c6e 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4177,7 +4177,7 @@ The `querier_config` configures the Cortex querier. [per_step_stats_enabled: | default = false] # Use compression for metrics query API or instant and range query APIs. -# Supports 'gzip' and '' (disable compression) +# Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression) # CLI flag: -querier.response-compression [response_compression: | default = "gzip"] From 100bc4d3435e6e8fd4d182c14b1b58fb389aa856 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Sat, 26 Jul 2025 00:20:05 -0700 Subject: [PATCH 09/10] apply query response size limit after decompression if header is missing Signed-off-by: Ahmed Hassan --- .../tripperware/instantquery/instant_query.go | 28 ++++++++++--------- pkg/querier/tripperware/query.go | 11 ++++++++ .../tripperware/queryrange/query_range.go | 26 +++++++++-------- 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index d86afe92463..c8b41b165e1 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -6,7 +6,6 @@ import ( "io" "net/http" "net/url" - "strconv" "strings" "time" @@ -110,20 +109,17 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, return nil, err } - responseSize := 0 responseSizeHeader := r.Header.Get("X-Uncompressed-Length") - if responseSizeHeader != "" { - var err error - responseSize, err = strconv.Atoi(responseSizeHeader) - if err != nil { - log.Error(err) - return nil, err - } - } - responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx) - if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil { - return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error()) + responseSize, hasSizeHeader, err := tripperware.ParseResponseSizeHeader(responseSizeHeader) + if err != nil { + log.Error(err) + return nil, err + } + if hasSizeHeader { + if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil { + return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error()) + } } body, err := tripperware.BodyBytes(r, log) @@ -132,6 +128,12 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, return nil, err } + if !hasSizeHeader { + if err := responseSizeLimiter.AddResponseBytes(len(body)); err != nil { + return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error()) + } + } + if r.StatusCode/100 != 2 { return nil, httpgrpc.Errorf(r.StatusCode, "%s", string(body)) } diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 70254c9b2c3..180ce1c27d0 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -789,6 +789,17 @@ func SetRequestHeaders(h http.Header, defaultCodecType CodecType, compression Co } } +func ParseResponseSizeHeader(header string) (int, bool, error) { + if header == "" { + return 0, false, nil + } + size, err := strconv.Atoi(header) + if err != nil { + return 0, false, err + } + return size, true, nil +} + func UnmarshalResponse(r *http.Response, buf []byte, resp *PrometheusResponse) error { if r.Header == nil { return json.Unmarshal(buf, resp) diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 453aefe1b6d..704ae745d8a 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -225,21 +225,19 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return nil, err } - responseSize := 0 responseSizeHeader := r.Header.Get("X-Uncompressed-Length") - if responseSizeHeader != "" { - var err error - responseSize, err = strconv.Atoi(responseSizeHeader) - if err != nil { - log.Error(err) - return nil, err + responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx) + responseSize, hasSizeHeader, err := tripperware.ParseResponseSizeHeader(responseSizeHeader) + if err != nil { + log.Error(err) + return nil, err + } + if hasSizeHeader { + if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil { + return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error()) } } - responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx) - if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil { - return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error()) - } body, err := tripperware.BodyBytes(r, log) if err != nil { @@ -247,6 +245,12 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return nil, err } + if !hasSizeHeader { + if err := responseSizeLimiter.AddResponseBytes(len(body)); err != nil { + return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error()) + } + } + if r.StatusCode/100 != 2 { return nil, httpgrpc.Errorf(r.StatusCode, "%s", string(body)) } From 4ec242bcc76c123082a5f87a2c09131a3ef93c21 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Mon, 28 Jul 2025 20:32:59 -0700 Subject: [PATCH 10/10] fix formatting Signed-off-by: Ahmed Hassan --- pkg/querier/tripperware/queryrange/query_range.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 704ae745d8a..f0b11db6121 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -238,7 +238,6 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ } } - body, err := tripperware.BodyBytes(r, log) if err != nil { log.Error(err)