Skip to content

Commit 956ce98

Browse files
authored
add query frontend response compression integration test (#6909)
Signed-off-by: Ahmed Hassan <[email protected]>
1 parent 9861229 commit 956ce98

File tree

1 file changed

+120
-6
lines changed

1 file changed

+120
-6
lines changed

integration/query_response_compression_test.go

Lines changed: 120 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,23 @@ package integration
66
import (
77
"compress/gzip"
88
"fmt"
9+
"strings"
10+
911
"net/http"
12+
"net/url"
13+
1014
"testing"
1115
"time"
1216

17+
"github.com/prometheus/prometheus/prompb"
1318
"github.com/stretchr/testify/require"
1419

1520
"github.com/cortexproject/cortex/integration/e2e"
1621
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
1722
"github.com/cortexproject/cortex/integration/e2ecortex"
1823
)
1924

20-
func TestQueryResponseCompression(t *testing.T) {
25+
func TestQuerierResponseCompression(t *testing.T) {
2126
s, err := e2e.NewScenario(networkName)
2227
require.NoError(t, err)
2328
defer s.Close()
@@ -43,18 +48,127 @@ func TestQueryResponseCompression(t *testing.T) {
4348
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
4449
require.NoError(t, err)
4550

46-
series, _ := generateSeries("series_1", now)
47-
res, err := c.Push(series)
48-
require.NoError(t, err)
49-
require.Equal(t, 200, res.StatusCode)
51+
for i := 0; i < 200; i++ {
52+
series, _ := generateSeries(
53+
fmt.Sprintf("series_%d", i),
54+
now,
55+
prompb.Label{Name: fmt.Sprintf("label_%d", i), Value: strings.Repeat("val_", 10)},
56+
)
57+
res, err := c.Push(series)
58+
require.NoError(t, err)
59+
require.Equal(t, 200, res.StatusCode)
60+
}
5061

5162
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
5263
require.NoError(t, s.StartAndWaitReady(querier))
5364

5465
// Wait until the querier has updated the ring.
5566
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
5667

57-
endpoint := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=series_1", querier.HTTPEndpoint())
68+
query := `{__name__=~"series_.*"}`
69+
u := &url.URL{
70+
Scheme: "http",
71+
Path: fmt.Sprintf("%s/api/prom/api/v1/query", querier.HTTPEndpoint()),
72+
}
73+
q := u.Query()
74+
q.Set("query", query)
75+
q.Set("time", e2ecortex.FormatTime(now))
76+
u.RawQuery = q.Encode()
77+
endpoint := u.String()
78+
79+
t.Run("Compressed", func(t *testing.T) {
80+
req, err := http.NewRequest("GET", endpoint, nil)
81+
require.NoError(t, err)
82+
req.Header.Set("X-Scope-OrgID", "user-1")
83+
req.Header.Set("Accept-Encoding", "gzip")
84+
85+
resp, err := http.DefaultClient.Do(req)
86+
require.NoError(t, err)
87+
88+
defer resp.Body.Close()
89+
90+
require.Equal(t, http.StatusOK, resp.StatusCode)
91+
require.Equal(t, "gzip", resp.Header.Get("Content-Encoding"))
92+
93+
gzipReader, err := gzip.NewReader(resp.Body)
94+
require.NoError(t, err)
95+
defer gzipReader.Close()
96+
})
97+
98+
t.Run("Uncompressed", func(t *testing.T) {
99+
req, err := http.NewRequest("GET", endpoint, nil)
100+
require.NoError(t, err)
101+
req.Header.Set("X-Scope-OrgID", "user-1")
102+
103+
resp, err := http.DefaultClient.Do(req)
104+
require.NoError(t, err)
105+
defer resp.Body.Close()
106+
107+
require.Equal(t, http.StatusOK, resp.StatusCode)
108+
require.Empty(t, resp.Header.Get("Content-Encoding"))
109+
})
110+
}
111+
112+
func TestQueryFrontendResponseCompression(t *testing.T) {
113+
s, err := e2e.NewScenario(networkName)
114+
require.NoError(t, err)
115+
defer s.Close()
116+
117+
// Start dependencies.
118+
consul := e2edb.NewConsul()
119+
minio := e2edb.NewMinio(9000, bucketName)
120+
require.NoError(t, s.StartAndWaitReady(consul, minio))
121+
122+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
123+
"-api.response-compression-enabled": "true",
124+
})
125+
126+
// Start the query-frontend.
127+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
128+
require.NoError(t, s.Start(queryFrontend))
129+
130+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
131+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
132+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
133+
134+
// Wait until both the distributor updated the ring.
135+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
136+
137+
querier := e2ecortex.NewQuerier("querierWithFrontend", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
138+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
139+
}), "")
140+
141+
require.NoError(t, s.StartAndWaitReady(querier))
142+
require.NoError(t, s.WaitReady(queryFrontend))
143+
144+
now := time.Now()
145+
146+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
147+
require.NoError(t, err)
148+
149+
for i := 0; i < 200; i++ {
150+
series, _ := generateSeries(
151+
fmt.Sprintf("series_%d", i),
152+
now,
153+
prompb.Label{Name: fmt.Sprintf("label_%d", i), Value: strings.Repeat("val_", 10)},
154+
)
155+
res, err := c.Push(series)
156+
require.NoError(t, err)
157+
require.Equal(t, 200, res.StatusCode)
158+
}
159+
160+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
161+
162+
query := `{__name__=~"series_.*"}`
163+
u := &url.URL{
164+
Scheme: "http",
165+
Path: fmt.Sprintf("%s/api/prom/api/v1/query", queryFrontend.HTTPEndpoint()),
166+
}
167+
q := u.Query()
168+
q.Set("query", query)
169+
q.Set("time", e2ecortex.FormatTime(now))
170+
u.RawQuery = q.Encode()
171+
endpoint := u.String()
58172

59173
t.Run("Compressed", func(t *testing.T) {
60174
req, err := http.NewRequest("GET", endpoint, nil)

0 commit comments

Comments
 (0)