Skip to content
Merged
90 changes: 90 additions & 0 deletions pkg/api/queryapi/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package queryapi

import (
"io"
"net/http"
"strings"

"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zlib"
"github.com/klauspost/compress/zstd"
)

const (
acceptEncodingHeader = "Accept-Encoding"
contentEncodingHeader = "Content-Encoding"
gzipEncoding = "gzip"
deflateEncoding = "deflate"
snappyEncoding = "snappy"
zstdEncoding = "zstd"
)

// Wrapper around http.Handler which adds suitable response compression based
// on the client's Accept-Encoding headers.
type compressedResponseWriter struct {
http.ResponseWriter
writer io.Writer
}

// Writes HTTP response content data.
func (c *compressedResponseWriter) Write(p []byte) (int, error) {
return c.writer.Write(p)
}

// Closes the compressedResponseWriter and ensures to flush all data before.
func (c *compressedResponseWriter) Close() {
if zstdWriter, ok := c.writer.(*zstd.Encoder); ok {
zstdWriter.Flush()
}
if snappyWriter, ok := c.writer.(*snappy.Writer); ok {
snappyWriter.Flush()
}
if zlibWriter, ok := c.writer.(*zlib.Writer); ok {
zlibWriter.Flush()
}
if gzipWriter, ok := c.writer.(*gzip.Writer); ok {
gzipWriter.Flush()
}
if closer, ok := c.writer.(io.Closer); ok {
defer closer.Close()
}
}

// Constructs a new compressedResponseWriter based on client request headers.
func newCompressedResponseWriter(writer http.ResponseWriter, req *http.Request) *compressedResponseWriter {
encodings := strings.Split(req.Header.Get(acceptEncodingHeader), ",")
for _, encoding := range encodings {
switch strings.TrimSpace(encoding) {
case zstdEncoding:
encoder, err := zstd.NewWriter(writer)
if err == nil {
writer.Header().Set(contentEncodingHeader, zstdEncoding)
return &compressedResponseWriter{ResponseWriter: writer, writer: encoder}
}
case snappyEncoding:
writer.Header().Set(contentEncodingHeader, snappyEncoding)
return &compressedResponseWriter{ResponseWriter: writer, writer: snappy.NewBufferedWriter(writer)}
case gzipEncoding:
writer.Header().Set(contentEncodingHeader, gzipEncoding)
return &compressedResponseWriter{ResponseWriter: writer, writer: gzip.NewWriter(writer)}
case deflateEncoding:
writer.Header().Set(contentEncodingHeader, deflateEncoding)
return &compressedResponseWriter{ResponseWriter: writer, writer: zlib.NewWriter(writer)}
}
}
return &compressedResponseWriter{ResponseWriter: writer, writer: writer}
}

// CompressionHandler is a wrapper around http.Handler which adds suitable
// response compression based on the client's Accept-Encoding headers.
type CompressionHandler struct {
Handler http.Handler
}

// ServeHTTP adds compression to the original http.Handler's ServeHTTP() method.
func (c CompressionHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
compWriter := newCompressedResponseWriter(writer, req)
c.Handler.ServeHTTP(compWriter, req)
compWriter.Close()
}
4 changes: 3 additions & 1 deletion pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -208,7 +209,7 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
w.WriteHeader(http.StatusNoContent)
}

return httputil.CompressionHandler{
return CompressionHandler{
Handler: http.HandlerFunc(hf),
}.ServeHTTP
}
Expand Down Expand Up @@ -237,6 +238,7 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interf
}

w.Header().Set("Content-Type", codec.ContentType().String())
w.Header().Set("X-Uncompressed-Length", strconv.Itoa(len(b)))
w.WriteHeader(http.StatusOK)
if n, err := w.Write(b); err != nil {
level.Error(q.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)
Expand Down
4 changes: 1 addition & 3 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ const (
limitBytesStoreGateway = `exceeded bytes limit`
)

var noopResponseSizeLimiter = limiter.NewResponseSizeLimiter(0)

// Config for a Handler.
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
Expand Down Expand Up @@ -308,7 +306,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// If the response status code is not 2xx, try to get the
// error message from response body.
if resp.StatusCode/100 != 2 {
body, err2 := tripperware.BodyBytes(resp, noopResponseSizeLimiter, f.log)
body, err2 := tripperware.BodyBytes(resp, f.log)
if err2 == nil {
err = httpgrpc.Errorf(resp.StatusCode, "%s", string(body))
}
Expand Down
27 changes: 25 additions & 2 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -45,8 +46,15 @@ type instantQueryCodec struct {

func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) instantQueryCodec {
compression := tripperware.NonCompression // default
if compressionStr == string(tripperware.GzipCompression) {
switch compressionStr {
case string(tripperware.GzipCompression):
compression = tripperware.GzipCompression

case string(tripperware.SnappyCompression):
compression = tripperware.SnappyCompression

case string(tripperware.ZstdCompression):
compression = tripperware.ZstdCompression
}

defaultCodecType := tripperware.JsonCodecType // default
Expand Down Expand Up @@ -100,8 +108,23 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response,
return nil, err
}

responseSize := 0
responseSizeHeader := r.Header.Get("X-Uncompressed-Length")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change means that we need a two phase deployment for querier to return this header for Query Frontend to properly apply the response size limit. Otherwise, the value is 0.

I think we should find a way (maybe a flag) to properly maintain the previous behavior during rollout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite get it, do you mean during deployment if Query Frontend is updated first then the limit wouldn't apply because the header is not being returned from Querier?

The old behaviour only works for gzip which is why I changed it. I can reverse it to use the old behaviour when gzip is used and the new one for other compression types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If we use gzip then this limiter won't work until Querier is updated, which is a breaking behavior.

We just need a way to fallback to the old behavior if it is gzip. Other types are fine since they are new

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If header is missing we now fallback to applying limit using size of the response after decoding it. I think this is a cleaner way to still apply the limit and avoid a breaking change if the header is not there.

Applying the limit before decoding response was an optimization in the old behaviour, but I don't think it is considered breaking if we are still applying the limit.

if responseSizeHeader != "" {
var err error
responseSize, err = strconv.Atoi(responseSizeHeader)
if err != nil {
log.Error(err)
return nil, err
}
}

responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx)
body, err := tripperware.BodyBytes(r, responseSizeLimiter, log)
if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil {
return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error())
}

body, err := tripperware.BodyBytes(r, log)
if err != nil {
log.Error(err)
return nil, err
Expand Down
67 changes: 49 additions & 18 deletions pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"fmt"
"io"
"net/http"
Expand All @@ -16,6 +15,8 @@ import (
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
jsoniter "github.com/json-iterator/go"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
Expand All @@ -27,7 +28,6 @@ import (

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/runutil"
)

Expand All @@ -44,6 +44,8 @@ type Compression string

const (
GzipCompression Compression = "gzip"
ZstdCompression Compression = "zstd"
SnappyCompression Compression = "snappy"
NonCompression Compression = ""
JsonCodecType CodecType = "json"
ProtobufCodecType CodecType = "protobuf"
Expand Down Expand Up @@ -434,7 +436,7 @@ type Buffer interface {
Bytes() []byte
}

func BodyBytes(res *http.Response, responseSizeLimiter *limiter.ResponseSizeLimiter, logger log.Logger) ([]byte, error) {
func BodyBytes(res *http.Response, logger log.Logger) ([]byte, error) {
var buf *bytes.Buffer

// Attempt to cast the response body to a Buffer and use it if possible.
Expand All @@ -452,11 +454,6 @@ func BodyBytes(res *http.Response, responseSizeLimiter *limiter.ResponseSizeLimi
}
}

responseSize := getResponseSize(res, buf)
if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil {
return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error())
}

// if the response is gzipped, lets unzip it here
if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") {
gReader, err := gzip.NewReader(buf)
Expand All @@ -468,15 +465,33 @@ func BodyBytes(res *http.Response, responseSizeLimiter *limiter.ResponseSizeLimi
return io.ReadAll(gReader)
}

// if the response is snappy compressed, decode it here
if strings.EqualFold(res.Header.Get("Content-Encoding"), "snappy") {
sReader := snappy.NewReader(buf)
return io.ReadAll(sReader)
}

// if the response is zstd compressed, decode it here
if strings.EqualFold(res.Header.Get("Content-Encoding"), "zstd") {
zReader, err := zstd.NewReader(buf)
if err != nil {
return nil, err
}
defer runutil.CloseWithLogOnErr(logger, io.NopCloser(zReader), "close zstd decoder")

return io.ReadAll(zReader)
}

return buf.Bytes(), nil
}

func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) {
// if the response is gzipped, lets unzip it here
headers := http.Header{}
for _, h := range res.Headers {
headers[h.Key] = h.Values
}

// if the response is gzipped, lets unzip it here
if strings.EqualFold(headers.Get("Content-Encoding"), "gzip") {
gReader, err := gzip.NewReader(bytes.NewBuffer(res.Body))
if err != nil {
Expand All @@ -487,16 +502,24 @@ func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger
return io.ReadAll(gReader)
}

return res.Body, nil
}
// if the response is snappy compressed, decode it here
if strings.EqualFold(headers.Get("Content-Encoding"), "snappy") {
sReader := snappy.NewReader(bytes.NewBuffer(res.Body))
return io.ReadAll(sReader)
}

// if the response is zstd compressed, decode it here
if strings.EqualFold(headers.Get("Content-Encoding"), "zstd") {
zReader, err := zstd.NewReader(bytes.NewBuffer(res.Body))
if err != nil {
return nil, err
}
defer runutil.CloseWithLogOnErr(logger, io.NopCloser(zReader), "close zstd decoder")

func getResponseSize(res *http.Response, buf *bytes.Buffer) int {
if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") && len(buf.Bytes()) >= 4 {
// GZIP body contains the size of the original (uncompressed) input data
// modulo 2^32 in the last 4 bytes (https://www.ietf.org/rfc/rfc1952.txt).
return int(binary.LittleEndian.Uint32(buf.Bytes()[len(buf.Bytes())-4:]))
return io.ReadAll(zReader)
}
return len(buf.Bytes())

return res.Body, nil
}

// UnmarshalJSON implements json.Unmarshaler.
Expand Down Expand Up @@ -755,9 +778,17 @@ func (s *PrometheusResponseStats) MarshalJSON() ([]byte, error) {
}

func SetRequestHeaders(h http.Header, defaultCodecType CodecType, compression Compression) {
if compression == GzipCompression {
switch compression {
case GzipCompression:
h.Set("Accept-Encoding", string(GzipCompression))

case SnappyCompression:
h.Set("Accept-Encoding", string(SnappyCompression))

case ZstdCompression:
h.Set("Accept-Encoding", string(ZstdCompression))
}

if defaultCodecType == ProtobufCodecType {
h.Set("Accept", ApplicationProtobuf+", "+ApplicationJson)
} else {
Expand Down
50 changes: 0 additions & 50 deletions pkg/querier/tripperware/query_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package tripperware

import (
"bytes"
"compress/gzip"
"math"
"net/http"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -196,50 +193,3 @@ func generateData(timeseries, datapoints int) (floatMatrix, histogramMatrix []*S
}
return
}

func Test_getResponseSize(t *testing.T) {
tests := []struct {
body []byte
useGzip bool
}{
{
body: []byte(`foo`),
useGzip: false,
},
{
body: []byte(`foo`),
useGzip: true,
},
{
body: []byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`),
useGzip: false,
},
{
body: []byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`),
useGzip: true,
},
}

for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
expectedBodyLength := len(test.body)
buf := &bytes.Buffer{}
response := &http.Response{}

if test.useGzip {
response = &http.Response{
Header: http.Header{"Content-Encoding": []string{"gzip"}},
}
w := gzip.NewWriter(buf)
_, err := w.Write(test.body)
require.NoError(t, err)
w.Close()
} else {
buf = bytes.NewBuffer(test.body)
}

bodyLength := getResponseSize(response, buf)
require.Equal(t, expectedBodyLength, bodyLength)
})
}
}
Loading
Loading