Skip to content

Commit 49099a3

Browse files
authored
add zstd and snappy compression for query api (#6848)
* add zstd and snappy compression for query api Signed-off-by: Ahmed Hassan <[email protected]> * parse X-Uncompressed-Length only if header exists Signed-off-by: Ahmed Hassan <[email protected]> * fix formatting Signed-off-by: Ahmed Hassan <[email protected]> * refactor query decompression Signed-off-by: Ahmed Hassan <[email protected]> * ensure zstd reader is closed after decompression Signed-off-by: Ahmed Hassan <[email protected]> * add tests for zstd and snappy compression Signed-off-by: Ahmed Hassan <[email protected]> * update changelog Signed-off-by: Ahmed Hassan <[email protected]> * update docs Signed-off-by: Ahmed Hassan <[email protected]> * apply query response size limit after decompression if header is missing Signed-off-by: Ahmed Hassan <[email protected]> * fix formatting Signed-off-by: Ahmed Hassan <[email protected]> --------- Signed-off-by: Ahmed Hassan <[email protected]> Signed-off-by: Ahmed Hassan <[email protected]>
1 parent 651c1dc commit 49099a3

File tree

13 files changed

+388
-95
lines changed

13 files changed

+388
-95
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
2121
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
2222
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
23+
* [ENHANCEMENT] Querier: Support snappy and zstd response compression for `-querier.response-compression` flag. #6848
2324
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
2425
* [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859
2526
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715

docs/blocks-storage/querier.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ querier:
127127
[per_step_stats_enabled: <boolean> | default = false]
128128

129129
# Use compression for metrics query API or instant and range query APIs.
130-
# Supports 'gzip' and '' (disable compression)
130+
# Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)
131131
# CLI flag: -querier.response-compression
132132
[response_compression: <string> | default = "gzip"]
133133

docs/configuration/config-file-reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4283,7 +4283,7 @@ The `querier_config` configures the Cortex querier.
42834283
[per_step_stats_enabled: <boolean> | default = false]
42844284
42854285
# Use compression for metrics query API or instant and range query APIs.
4286-
# Supports 'gzip' and '' (disable compression)
4286+
# Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)
42874287
# CLI flag: -querier.response-compression
42884288
[response_compression: <string> | default = "gzip"]
42894289

integration/query_frontend_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,14 +216,34 @@ func TestQueryFrontendProtobufCodec(t *testing.T) {
216216
require.NoError(t, s.StartAndWaitReady(minio))
217217

218218
flags = mergeFlags(e2e.EmptyFlags(), map[string]string{
219-
"-api.querier-default-codec": "protobuf",
220-
"-querier.response-compression": "gzip",
219+
"-api.querier-default-codec": "protobuf",
221220
})
222221
return cortexConfigFile, flags
223222
},
224223
})
225224
}
226225

226+
func TestQuerierToQueryFrontendCompression(t *testing.T) {
227+
for _, compression := range []string{"gzip", "zstd", "snappy", ""} {
228+
runQueryFrontendTest(t, queryFrontendTestConfig{
229+
testMissingMetricName: false,
230+
querySchedulerEnabled: true,
231+
queryStatsEnabled: true,
232+
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
233+
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
234+
235+
minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
236+
require.NoError(t, s.StartAndWaitReady(minio))
237+
238+
flags = mergeFlags(e2e.EmptyFlags(), map[string]string{
239+
"-querier.response-compression": compression,
240+
})
241+
return cortexConfigFile, flags
242+
},
243+
})
244+
}
245+
}
246+
227247
func TestQueryFrontendRemoteRead(t *testing.T) {
228248
runQueryFrontendTest(t, queryFrontendTestConfig{
229249
remoteReadEnabled: true,

pkg/api/queryapi/compression.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package queryapi
2+
3+
import (
4+
"io"
5+
"net/http"
6+
"strings"
7+
8+
"github.com/klauspost/compress/gzip"
9+
"github.com/klauspost/compress/snappy"
10+
"github.com/klauspost/compress/zlib"
11+
"github.com/klauspost/compress/zstd"
12+
)
13+
14+
const (
15+
acceptEncodingHeader = "Accept-Encoding"
16+
contentEncodingHeader = "Content-Encoding"
17+
gzipEncoding = "gzip"
18+
deflateEncoding = "deflate"
19+
snappyEncoding = "snappy"
20+
zstdEncoding = "zstd"
21+
)
22+
23+
// Wrapper around http.Handler which adds suitable response compression based
24+
// on the client's Accept-Encoding headers.
25+
type compressedResponseWriter struct {
26+
http.ResponseWriter
27+
writer io.Writer
28+
}
29+
30+
// Writes HTTP response content data.
31+
func (c *compressedResponseWriter) Write(p []byte) (int, error) {
32+
return c.writer.Write(p)
33+
}
34+
35+
// Closes the compressedResponseWriter and ensures to flush all data before.
36+
func (c *compressedResponseWriter) Close() {
37+
if zstdWriter, ok := c.writer.(*zstd.Encoder); ok {
38+
zstdWriter.Flush()
39+
}
40+
if snappyWriter, ok := c.writer.(*snappy.Writer); ok {
41+
snappyWriter.Flush()
42+
}
43+
if zlibWriter, ok := c.writer.(*zlib.Writer); ok {
44+
zlibWriter.Flush()
45+
}
46+
if gzipWriter, ok := c.writer.(*gzip.Writer); ok {
47+
gzipWriter.Flush()
48+
}
49+
if closer, ok := c.writer.(io.Closer); ok {
50+
defer closer.Close()
51+
}
52+
}
53+
54+
// Constructs a new compressedResponseWriter based on client request headers.
55+
func newCompressedResponseWriter(writer http.ResponseWriter, req *http.Request) *compressedResponseWriter {
56+
encodings := strings.Split(req.Header.Get(acceptEncodingHeader), ",")
57+
for _, encoding := range encodings {
58+
switch strings.TrimSpace(encoding) {
59+
case zstdEncoding:
60+
encoder, err := zstd.NewWriter(writer)
61+
if err == nil {
62+
writer.Header().Set(contentEncodingHeader, zstdEncoding)
63+
return &compressedResponseWriter{ResponseWriter: writer, writer: encoder}
64+
}
65+
case snappyEncoding:
66+
writer.Header().Set(contentEncodingHeader, snappyEncoding)
67+
return &compressedResponseWriter{ResponseWriter: writer, writer: snappy.NewBufferedWriter(writer)}
68+
case gzipEncoding:
69+
writer.Header().Set(contentEncodingHeader, gzipEncoding)
70+
return &compressedResponseWriter{ResponseWriter: writer, writer: gzip.NewWriter(writer)}
71+
case deflateEncoding:
72+
writer.Header().Set(contentEncodingHeader, deflateEncoding)
73+
return &compressedResponseWriter{ResponseWriter: writer, writer: zlib.NewWriter(writer)}
74+
}
75+
}
76+
return &compressedResponseWriter{ResponseWriter: writer, writer: writer}
77+
}
78+
79+
// CompressionHandler is a wrapper around http.Handler which adds suitable
80+
// response compression based on the client's Accept-Encoding headers.
81+
type CompressionHandler struct {
82+
Handler http.Handler
83+
}
84+
85+
// ServeHTTP adds compression to the original http.Handler's ServeHTTP() method.
86+
func (c CompressionHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
87+
compWriter := newCompressedResponseWriter(writer, req)
88+
c.Handler.ServeHTTP(compWriter, req)
89+
compWriter.Close()
90+
}

pkg/api/queryapi/compression_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package queryapi
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
10+
"github.com/klauspost/compress/gzip"
11+
"github.com/klauspost/compress/snappy"
12+
"github.com/klauspost/compress/zlib"
13+
"github.com/klauspost/compress/zstd"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func decompress(t *testing.T, encoding string, b []byte) []byte {
18+
t.Helper()
19+
20+
switch encoding {
21+
case gzipEncoding:
22+
r, err := gzip.NewReader(bytes.NewReader(b))
23+
require.NoError(t, err)
24+
defer r.Close()
25+
data, err := io.ReadAll(r)
26+
require.NoError(t, err)
27+
return data
28+
case deflateEncoding:
29+
r, err := zlib.NewReader(bytes.NewReader(b))
30+
require.NoError(t, err)
31+
defer r.Close()
32+
data, err := io.ReadAll(r)
33+
require.NoError(t, err)
34+
return data
35+
case snappyEncoding:
36+
data, err := io.ReadAll(snappy.NewReader(bytes.NewReader(b)))
37+
require.NoError(t, err)
38+
return data
39+
case zstdEncoding:
40+
r, err := zstd.NewReader(bytes.NewReader(b))
41+
require.NoError(t, err)
42+
defer r.Close()
43+
data, err := io.ReadAll(r)
44+
require.NoError(t, err)
45+
return data
46+
default:
47+
return b
48+
}
49+
}
50+
51+
func TestNewCompressedResponseWriter_SupportedEncodings(t *testing.T) {
52+
for _, tc := range []string{gzipEncoding, deflateEncoding, snappyEncoding, zstdEncoding} {
53+
t.Run(tc, func(t *testing.T) {
54+
rec := httptest.NewRecorder()
55+
req := httptest.NewRequest(http.MethodGet, "/", nil)
56+
req.Header.Set(acceptEncodingHeader, tc)
57+
58+
cw := newCompressedResponseWriter(rec, req)
59+
payload := []byte("hello world")
60+
_, err := cw.Write(payload)
61+
require.NoError(t, err)
62+
cw.Close()
63+
64+
require.Equal(t, tc, rec.Header().Get(contentEncodingHeader))
65+
66+
decompressed := decompress(t, tc, rec.Body.Bytes())
67+
require.Equal(t, payload, decompressed)
68+
69+
switch tc {
70+
case gzipEncoding:
71+
_, ok := cw.writer.(*gzip.Writer)
72+
require.True(t, ok)
73+
case deflateEncoding:
74+
_, ok := cw.writer.(*zlib.Writer)
75+
require.True(t, ok)
76+
case snappyEncoding:
77+
_, ok := cw.writer.(*snappy.Writer)
78+
require.True(t, ok)
79+
case zstdEncoding:
80+
_, ok := cw.writer.(*zstd.Encoder)
81+
require.True(t, ok)
82+
}
83+
})
84+
}
85+
}
86+
87+
func TestNewCompressedResponseWriter_UnsupportedEncoding(t *testing.T) {
88+
for _, tc := range []string{"", "br", "unknown"} {
89+
t.Run(tc, func(t *testing.T) {
90+
rec := httptest.NewRecorder()
91+
req := httptest.NewRequest(http.MethodGet, "/", nil)
92+
if tc != "" {
93+
req.Header.Set(acceptEncodingHeader, tc)
94+
}
95+
96+
cw := newCompressedResponseWriter(rec, req)
97+
payload := []byte("data")
98+
_, err := cw.Write(payload)
99+
require.NoError(t, err)
100+
cw.Close()
101+
102+
require.Empty(t, rec.Header().Get(contentEncodingHeader))
103+
require.Equal(t, payload, rec.Body.Bytes())
104+
require.Same(t, rec, cw.writer)
105+
})
106+
}
107+
}
108+
109+
func TestNewCompressedResponseWriter_MultipleEncodings(t *testing.T) {
110+
tests := []struct {
111+
header string
112+
expectEnc string
113+
expectType interface{}
114+
}{
115+
{"snappy, gzip", snappyEncoding, &snappy.Writer{}},
116+
{"unknown, gzip", gzipEncoding, &gzip.Writer{}},
117+
}
118+
119+
for _, tc := range tests {
120+
t.Run(tc.header, func(t *testing.T) {
121+
rec := httptest.NewRecorder()
122+
req := httptest.NewRequest(http.MethodGet, "/", nil)
123+
req.Header.Set(acceptEncodingHeader, tc.header)
124+
125+
cw := newCompressedResponseWriter(rec, req)
126+
_, err := cw.Write([]byte("payload"))
127+
require.NoError(t, err)
128+
cw.Close()
129+
130+
require.Equal(t, tc.expectEnc, rec.Header().Get(contentEncodingHeader))
131+
decompressed := decompress(t, tc.expectEnc, rec.Body.Bytes())
132+
require.Equal(t, []byte("payload"), decompressed)
133+
134+
switch tc.expectEnc {
135+
case gzipEncoding:
136+
require.IsType(t, &gzip.Writer{}, cw.writer)
137+
case snappyEncoding:
138+
require.IsType(t, &snappy.Writer{}, cw.writer)
139+
}
140+
})
141+
}
142+
}
143+
144+
func TestCompressionHandler_ServeHTTP(t *testing.T) {
145+
handler := CompressionHandler{Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
146+
_, err := w.Write([]byte("hello"))
147+
require.NoError(t, err)
148+
})}
149+
150+
rec := httptest.NewRecorder()
151+
req := httptest.NewRequest(http.MethodGet, "/", nil)
152+
req.Header.Set(acceptEncodingHeader, gzipEncoding)
153+
154+
handler.ServeHTTP(rec, req)
155+
156+
require.Equal(t, gzipEncoding, rec.Header().Get(contentEncodingHeader))
157+
decompressed := decompress(t, gzipEncoding, rec.Body.Bytes())
158+
require.Equal(t, []byte("hello"), decompressed)
159+
}

pkg/api/queryapi/query_api.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"strconv"
78
"time"
89

910
"github.com/go-kit/log"
@@ -208,7 +209,7 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
208209
w.WriteHeader(http.StatusNoContent)
209210
}
210211

211-
return httputil.CompressionHandler{
212+
return CompressionHandler{
212213
Handler: http.HandlerFunc(hf),
213214
}.ServeHTTP
214215
}
@@ -237,6 +238,7 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interf
237238
}
238239

239240
w.Header().Set("Content-Type", codec.ContentType().String())
241+
w.Header().Set("X-Uncompressed-Length", strconv.Itoa(len(b)))
240242
w.WriteHeader(http.StatusOK)
241243
if n, err := w.Write(b); err != nil {
242244
level.Error(q.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)

pkg/frontend/transport/handler.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ const (
7575
limitBytesStoreGateway = `exceeded bytes limit`
7676
)
7777

78-
var noopResponseSizeLimiter = limiter.NewResponseSizeLimiter(0)
79-
8078
// Config for a Handler.
8179
type HandlerConfig struct {
8280
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
@@ -332,7 +330,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
332330
// If the response status code is not 2xx, try to get the
333331
// error message from response body.
334332
if resp.StatusCode/100 != 2 {
335-
body, err2 := tripperware.BodyBytes(resp, noopResponseSizeLimiter, f.log)
333+
body, err2 := tripperware.BodyBytes(resp, f.log)
336334
if err2 == nil {
337335
err = httpgrpc.Errorf(resp.StatusCode, "%s", string(body))
338336
}

pkg/querier/querier.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ var (
102102
errBadLookbackConfigs = errors.New("bad settings, query_store_after >= query_ingesters_within which can result in queries not being sent")
103103
errShuffleShardingLookbackLessThanQueryStoreAfter = errors.New("the shuffle-sharding lookback period should be greater or equal than the configured 'query store after'")
104104
errEmptyTimeRange = errors.New("empty time range")
105-
errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)")
105+
errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)")
106106
errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1")
107107
errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1")
108108
errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet")
@@ -129,7 +129,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
129129
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
130130
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
131131
f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.")
132-
f.StringVar(&cfg.ResponseCompression, "querier.response-compression", "gzip", "Use compression for metrics query API or instant and range query APIs. Supports 'gzip' and '' (disable compression)")
132+
f.StringVar(&cfg.ResponseCompression, "querier.response-compression", "gzip", "Use compression for metrics query API or instant and range query APIs. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)")
133133
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
134134
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
135135
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
@@ -158,7 +158,7 @@ func (cfg *Config) Validate() error {
158158
}
159159
}
160160

161-
if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" {
161+
if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" && cfg.ResponseCompression != "snappy" && cfg.ResponseCompression != "zstd" {
162162
return errUnsupportedResponseCompression
163163
}
164164

0 commit comments

Comments
 (0)