Skip to content

Commit 68c09af

Browse files
committed
Reusing grpc membuffer for QueryStreamResponse
Signed-off-by: alanprot <[email protected]>
1 parent aea76ee commit 68c09af

File tree

7 files changed

+221
-93
lines changed

7 files changed

+221
-93
lines changed

integration/grpc_server_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,12 @@ func TestConcurrentGrpcCalls(t *testing.T) {
202202
require.NoError(t, err)
203203
resp, err := s.Recv()
204204
require.NoError(t, err)
205+
require.NotEmpty(t, resp.Buffers())
205206
expected := createStreamResponse(i)
207+
expected.MessageWithBufRef = resp.MessageWithBufRef
206208
require.Equal(t, expected.String(), resp.String())
209+
resp.Free()
210+
207211
}(i)
208212
}
209213

pkg/cortexpb/codec.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ func init() {
1919

2020
type ReleasableMessage interface {
2121
RegisterBuffer(mem.Buffer)
22+
buffers() []mem.Buffer
23+
unref()
2224
}
2325

2426
type GogoProtoMessage interface {
@@ -130,10 +132,25 @@ type MessageWithBufRef struct {
130132
bs mem.BufferSlice
131133
}
132134

135+
func (m *MessageWithBufRef) unref() {
136+
m.bs = nil
137+
}
138+
139+
func (m *MessageWithBufRef) buffers() []mem.Buffer {
140+
return m.bs
141+
}
142+
133143
func (m *MessageWithBufRef) RegisterBuffer(buffer mem.Buffer) {
134144
m.bs = append(m.bs, buffer)
135145
}
136146

147+
func (m *MessageWithBufRef) MergeBuffers(o any) {
148+
if fm, ok := o.(ReleasableMessage); ok {
149+
m.bs = append(m.bs, fm.buffers()...)
150+
fm.unref()
151+
}
152+
}
153+
137154
func (m *MessageWithBufRef) Free() {
138155
m.bs.Free()
139156
m.bs = m.bs[:0]

pkg/cortexpb/codec_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package cortexpb
2+
3+
import (
4+
"testing"
5+
"unsafe"
6+
7+
"github.com/stretchr/testify/require"
8+
"google.golang.org/grpc/mem"
9+
)
10+
11+
func TestMergingBuffers(t *testing.T) {
12+
codec := &cortexCodec{}
13+
wr := &WriteRequest{}
14+
15+
data, err := codec.Marshal(wr)
16+
require.NoError(t, err)
17+
18+
wr1 := &WriteRequest{}
19+
wr2 := &WriteRequest{}
20+
require.NoError(t, codec.Unmarshal(data, wr1))
21+
require.NoError(t, codec.Unmarshal(data, wr2))
22+
// should not be the same reference but should have the same value
23+
require.False(t, isSameReference(wr1.bs, wr2.bs))
24+
require.NotEmpty(t, wr1.bs)
25+
require.Equal(t, wr1.bs, wr2.bs)
26+
finalWr := &WriteRequest{}
27+
finalWr.MergeBuffers(wr1)
28+
require.Empty(t, wr1.bs)
29+
finalWr.MergeBuffers(wr2)
30+
require.Empty(t, wr2.bs)
31+
require.Len(t, finalWr.bs, 2)
32+
finalWr.Free()
33+
require.Empty(t, finalWr.bs)
34+
}
35+
36+
func isSameReference(a, b mem.BufferSlice) bool {
37+
return len(a) > 0 && len(b) > 0 &&
38+
unsafe.Pointer(&a[0]) == unsafe.Pointer(&b[0])
39+
}

pkg/distributor/query.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,11 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
297297
defer span.Finish()
298298
hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{}
299299

300+
resp := &ingester_client.QueryStreamResponse{}
301+
300302
for _, result := range results {
301303
response := result.(*ingester_client.QueryStreamResponse)
304+
resp.MergeBuffers(response)
302305

303306
// Parse any chunk series
304307
for _, series := range response.Chunkseries {
@@ -310,9 +313,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
310313
}
311314
}
312315

313-
resp := &ingester_client.QueryStreamResponse{
314-
Chunkseries: make([]ingester_client.TimeSeriesChunk, 0, len(hashToChunkseries)),
315-
}
316+
resp.Chunkseries = make([]ingester_client.TimeSeriesChunk, 0, len(hashToChunkseries))
317+
316318
for _, series := range hashToChunkseries {
317319
resp.Chunkseries = append(resp.Chunkseries, series)
318320
}

pkg/ingester/client/ingester.pb.go

Lines changed: 147 additions & 90 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/ingester/client/ingester.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ message QueryStreamResponse {
5757
repeated TimeSeriesChunk chunkseries = 1 [(gogoproto.nullable) = false];
5858
// Not used anymore
5959
reserved 2;
60+
cortexpb.MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/cortexpb.MessageWithBufRef", (gogoproto.nullable) = false];
6061
}
6162

6263
message ExemplarQueryResponse {

pkg/querier/distributor_queryable.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ type distributorQuerier struct {
9292
queryIngestersWithin time.Duration
9393
isPartialDataEnabled partialdata.IsCfgEnabledFunc
9494
ingesterQueryMaxAttempts int
95+
96+
freeBuffers func()
9597
}
9698

9799
// Select implements storage.Querier interface.
@@ -166,6 +168,8 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa
166168
return storage.ErrSeriesSet(err)
167169
}
168170

171+
q.freeBuffers = results.Free
172+
169173
serieses := make([]storage.Series, 0, len(results.Chunkseries))
170174
for _, result := range results.Chunkseries {
171175
// Sometimes the ingester can send series that have no data.
@@ -357,6 +361,10 @@ func (q *distributorQuerier) labelNamesWithMatchers(ctx context.Context, hints *
357361
}
358362

359363
func (q *distributorQuerier) Close() error {
364+
if q.freeBuffers != nil {
365+
q.freeBuffers()
366+
q.freeBuffers = nil
367+
}
360368
return nil
361369
}
362370

0 commit comments

Comments
 (0)