Skip to content

Add test to verify query response compression (#6849) #6877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions integration/query_response_compression_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//go:build requires_docker
// +build requires_docker

package integration

import (
"compress/gzip"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestQueryResponseCompression(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-api.response-compression-enabled": "true",
})

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

// Wait until the distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push series to Cortex.
now := time.Now()
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

series, _ := generateSeries("series_1", now)
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need store gateway in this test so let's remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, got it.

querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(storeGateway, querier))

// Wait until the querier has updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))

endpoint := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=series_1", querier.HTTPEndpoint())

t.Run("Compressed", func(t *testing.T) {
req, err := http.NewRequest("GET", endpoint, nil)
require.NoError(t, err)
req.Header.Set("X-Scope-OrgID", "user-1")
req.Header.Set("Accept-Encoding", "gzip")

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, http.StatusOK, resp.StatusCode)
require.Equal(t, "gzip", resp.Header.Get("Content-Encoding"))

gzipReader, err := gzip.NewReader(resp.Body)
require.NoError(t, err)
defer gzipReader.Close()
})

t.Run("Uncompressed", func(t *testing.T) {
req, err := http.NewRequest("GET", endpoint, nil)
require.NoError(t, err)
req.Header.Set("X-Scope-OrgID", "user-1")

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, http.StatusOK, resp.StatusCode)
require.Empty(t, resp.Header.Get("Content-Encoding"))
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current tests are good. But I'd like to see we have test cases covering queries against query frontend as well. Not just querier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh alright, I'l add a new endpoint with query frontend and update the tests. Thanks a lot for the review !

Copy link
Contributor Author

@siddarth2810 siddarth2810 Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @yeya24 , after tweaking around with query frontend for a while, I realized that testing both query frontend and querier cannot be done with same instance of cortex. So I divided the code into two isolated functions.

Is this what you had in mind ?

  1. TestQuerierDirectResponseCompression - tests compression directly using querier endpoint
  2. TestQueryFrontendResponseCompression - tests compression via query frontend endpoint

The query frontend test is failing because the response is not being compressed

Error: Not equal:
expected: "gzip"
actual  : ""
Test: TestQueryFrontendResponseCompression/Compressed
func TestQueryFrontendResponseCompression(t *testing.T) {
	s, err := e2e.NewScenario(networkName)
	require.NoError(t, err)
	defer s.Close()

	// Start dependencies.
	consul := e2edb.NewConsul()
	minio := e2edb.NewMinio(9000, bucketName)
	require.NoError(t, s.StartAndWaitReady(consul, minio))

	flags := mergeFlags(BlocksStorageFlags(), map[string]string{
		"-api.response-compression-enabled": "true",
	})

	// Start the query-frontend.
	queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
	require.NoError(t, s.Start(queryFrontend))

	distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
	ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
	require.NoError(t, s.StartAndWaitReady(distributor, ingester))

	// Wait until both the distributor updated the ring.
	require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

	querier := e2ecortex.NewQuerier("querierWithFrontend", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
		"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
	}), "")

	require.NoError(t, s.StartAndWaitReady(querier))
	require.NoError(t, s.WaitReady(queryFrontend))

	now := time.Now()
	series, _ := generateSeries("series_1", now)

	c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
	require.NoError(t, err)

	res, err := c.Push(series)
	require.NoError(t, err)
	require.Equal(t, 200, res.StatusCode)

	require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

	endpoint := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=series_1", queryFrontend.HTTPEndpoint())

	runQueryResponseTest(t, endpoint) //func to run existing compressed and uncompressed logic
}

Been stuck on this for a while, Is this expected or am I totally mistaken ? Would be really really thankful for any inputs, thanks !!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after tweaking around with query frontend for a while, I realized that testing both query frontend and querier cannot be done with same instance of cortex

Can you elaborate more why we cannot reuse the same test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using the same test, I just moved the compression logic to runQueryResponseTest function. And yes I am able to test both endpoints using the same cortex instance. Apologies for the confusion.

	querierDirect := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

	querierWithFrontend := e2ecortex.NewQuerier("querierWithFrontend", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
		"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
	}), "")

	require.NoError(t, s.StartAndWaitReady(querierDirect))
	require.NoError(t, s.StartAndWaitReady(querierWithFrontend))
	require.NoError(t, s.WaitReady(queryFrontend))

	require.NoError(t, querierDirect.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
	require.NoError(t, querierWithFrontend.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

	querierWithFrontendEndpoint := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=series_1", queryFrontend.HTTPEndpoint())
	querierDirectEndpoint := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=series_1", querierDirect.HTTPEndpoint())

        runQueryResponseTest(t, querierDirectEndpoint)
        runQueryResponseTest(t, querierWithFrontendEndpoint)

so when using query frontend endpoint the existing compression test is failing. I logged the response headers and I dont see Content-Encoding header. I could not figure out why

req headers: [Accept-Encoding:[gzip]  X-Scope-Orgid:[user-1]]

resp headers: [Content-Length:[140]  Content-Type:[application/json]  Date:[Tue, 15 Jul 2025 11:53:27 GMT] Vary:[Accept-Encoding]]

 Error: Not equal:
       expected: "gzip"
       actual  : ""

}
Loading