Skip to content

Commit cc48e77

Browse files
authored
Do not use buffer pool when message is not releasable (#6881)
* Do not use buffer pool when message is not releasable Signed-off-by: alanprot <[email protected]> * Adding test Signed-off-by: alanprot <[email protected]> * fix lint Signed-off-by: alanprot <[email protected]> * fix lint Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent f7d19ca commit cc48e77

File tree

2 files changed

+109
-8
lines changed

2 files changed

+109
-8
lines changed

pkg/cortexpb/codec.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
const Name = "proto"
1515

1616
func init() {
17-
encoding.RegisterCodecV2(&cortexCodec{})
17+
encoding.RegisterCodecV2(&cortexCodec{
18+
noOpBufferPool: mem.NopBufferPool{},
19+
defaultBufferPool: mem.DefaultBufferPool(),
20+
})
1821
}
1922

2023
type ReleasableMessage interface {
@@ -25,7 +28,10 @@ type GogoProtoMessage interface {
2528
MarshalToSizedBuffer(dAtA []byte) (int, error)
2629
}
2730

28-
type cortexCodec struct{}
31+
type cortexCodec struct {
32+
noOpBufferPool mem.BufferPool
33+
defaultBufferPool mem.BufferPool
34+
}
2935

3036
func (c cortexCodec) Name() string {
3137
return Name
@@ -64,7 +70,7 @@ func (c *cortexCodec) Marshal(v any) (data mem.BufferSlice, err error) {
6470

6571
data = append(data, buf)
6672
} else {
67-
pool := mem.DefaultBufferPool()
73+
pool := c.defaultBufferPool
6874
buf := pool.Get(size)
6975

7076
// If v implements MarshalToSizedBuffer we should use it as it is more optimized
@@ -94,10 +100,17 @@ func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error {
94100
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
95101
}
96102

97-
// To be in the safe side, we will never automatically release the buffer used to Unmarshal the message automatically.
98-
// This should simulate the same behavior of grpc v1.65.0 and before.
99-
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
103+
// To be safe, we avoid automatically releasing the buffer used to unmarshal the message.
104+
// Additionally, we avoid using a pooled byte slice unless the message implements ReleasableMessage.
105+
// This mimics the behavior of gRPC versions 1.65.0 and earlier.
106+
rm, ok := v.(ReleasableMessage)
107+
bufferPool := c.defaultBufferPool
108+
109+
if !ok {
110+
bufferPool = c.noOpBufferPool
111+
}
100112

113+
buf := data.MaterializeToBuffer(bufferPool)
101114
err := proto.Unmarshal(buf.ReadOnlyData(), vv)
102115

103116
if err != nil {
@@ -106,8 +119,8 @@ func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error {
106119
}
107120

108121
// If v implements ReleasableMessage interface, we add the buff to be freed later when the request is no longer being used
109-
if fm, ok := v.(ReleasableMessage); ok {
110-
fm.RegisterBuffer(buf)
122+
if rm != nil {
123+
rm.RegisterBuffer(buf)
111124
}
112125

113126
return err

pkg/cortexpb/codec_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package cortexpb
2+
3+
import (
4+
"strings"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
"google.golang.org/grpc/mem"
9+
)
10+
11+
type wrappedBufferPool struct {
12+
inner mem.BufferPool
13+
getCount int
14+
putCount int
15+
}
16+
17+
func (w *wrappedBufferPool) Get(length int) *[]byte {
18+
w.getCount++
19+
return w.inner.Get(length)
20+
}
21+
22+
func (w *wrappedBufferPool) Put(b *[]byte) {
23+
w.putCount++
24+
w.inner.Put(b)
25+
}
26+
27+
func (w *wrappedBufferPool) Reset() {
28+
w.getCount = 0
29+
w.putCount = 0
30+
}
31+
32+
func TestNoopBufferWhenNotReleasableMessage(t *testing.T) {
33+
codec := &cortexCodec{
34+
noOpBufferPool: &wrappedBufferPool{inner: mem.NopBufferPool{}},
35+
defaultBufferPool: &wrappedBufferPool{inner: mem.DefaultBufferPool()},
36+
}
37+
38+
tc := map[string]struct {
39+
noopBufferGets int
40+
defaultBufferGets int
41+
m any
42+
}{
43+
"releasable": {
44+
noopBufferGets: 0,
45+
defaultBufferGets: 1,
46+
m: &WriteRequest{
47+
Metadata: []*MetricMetadata{
48+
{
49+
Unit: strings.Repeat("a", 10000),
50+
},
51+
},
52+
},
53+
},
54+
"not releasable": {
55+
noopBufferGets: 1,
56+
defaultBufferGets: 0,
57+
m: &MetricMetadata{
58+
Unit: strings.Repeat("a", 10000),
59+
},
60+
},
61+
}
62+
63+
for name, tc := range tc {
64+
t.Run(name, func(t *testing.T) {
65+
data, err := codec.Marshal(tc.m)
66+
require.NoError(t, err)
67+
68+
// lets split the buffer into 2 so we force get another buffer from the pool
69+
r := data.Reader()
70+
size := r.Remaining()
71+
b1 := make([]byte, size/2)
72+
b2 := make([]byte, (size/2)+1)
73+
buffer1 := mem.NewBuffer(&b1, mem.NopBufferPool{})
74+
buffer2 := mem.NewBuffer(&b2, mem.NopBufferPool{})
75+
76+
_, err = r.Read(b1)
77+
require.NoError(t, err)
78+
_, err = r.Read(b2)
79+
require.NoError(t, err)
80+
81+
codec.noOpBufferPool.(*wrappedBufferPool).Reset()
82+
codec.defaultBufferPool.(*wrappedBufferPool).Reset()
83+
require.NoError(t, codec.Unmarshal(mem.BufferSlice{buffer1, buffer2}, tc.m))
84+
require.Equal(t, tc.noopBufferGets, codec.noOpBufferPool.(*wrappedBufferPool).getCount)
85+
require.Equal(t, tc.defaultBufferGets, codec.defaultBufferPool.(*wrappedBufferPool).getCount)
86+
})
87+
}
88+
}

0 commit comments

Comments
 (0)