Skip to content

Commit 1e39e03

Browse files
committed
vtprotocodec: user codecV2 for grpc client/server
Switch to use codecV2 backed by the default buffer pool to handle all Marshaling/Unmarshaling by grpc clients/servers. The default buffer pool is already setup here. ``` server/util/grpc_client/grpc_client.go 254: experimental.WithBufferPool(mem.DefaultBufferPool()), server/util/grpc_server/grpc_server.go 240: experimental.BufferPool(mem.DefaultBufferPool()), ``` This does not affect the manual Marshaling/Unmarshaling that we does via `server/util/proto.{Marshal,Unmarshal}()`. These should still use V1 for the time being.
1 parent 3297657 commit 1e39e03

File tree

3 files changed

+51
-21
lines changed

3 files changed

+51
-21
lines changed

server/util/proto/proto.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,26 @@ var Bool = gproto.Bool
2020
type Message = gproto.Message
2121
type MarshalOptions = gproto.MarshalOptions
2222

23-
type vtprotoMessage interface {
23+
type VtprotoMessage interface {
2424
MarshalVT() ([]byte, error)
2525
UnmarshalVT([]byte) error
2626
CloneMessageVT() Message
27+
28+
// For vtprotoCodecV2
29+
MarshalToSizedBufferVT(data []byte) (int, error)
30+
SizeVT() int
2731
}
2832

2933
func Marshal(v Message) ([]byte, error) {
30-
vt, ok := v.(vtprotoMessage)
34+
vt, ok := v.(VtprotoMessage)
3135
if ok {
3236
return vt.MarshalVT()
3337
}
3438
return MarshalOld(v)
3539
}
3640

3741
func Unmarshal(b []byte, v Message) error {
38-
vt, ok := v.(vtprotoMessage)
42+
vt, ok := v.(VtprotoMessage)
3943
if ok {
4044
return vt.UnmarshalVT(b)
4145
}
@@ -44,7 +48,7 @@ func Unmarshal(b []byte, v Message) error {
4448
}
4549

4650
func Clone(v Message) Message {
47-
vt, ok := v.(vtprotoMessage)
51+
vt, ok := v.(VtprotoMessage)
4852
if ok {
4953
return vt.CloneMessageVT()
5054
}

server/util/vtprotocodec/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ go_library(
99
"//server/util/proto",
1010
"@org_golang_google_grpc//encoding",
1111
"@org_golang_google_grpc//encoding/proto",
12+
"@org_golang_google_grpc//mem",
1213
],
1314
)
Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,67 @@
11
package vtprotocodec
22

33
import (
4-
"fmt"
5-
64
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
75

86
"google.golang.org/grpc/encoding"
7+
"google.golang.org/grpc/mem"
8+
99
_ "google.golang.org/grpc/encoding/proto" // for default proto registration purposes
1010
)
1111

1212
const Name = "proto"
1313

14-
// vtprotoCodec represents a codec able to encode and decode vt enabled
15-
// proto messages.
16-
type vtprotoCodec struct{}
14+
// vtprotoCodecV2 implements encoding.CodecV2 and uses vtproto and default buffer pool
15+
// to encode/decode proto messages. The implementation is heavily inspired by
16+
// https://github.com/planetscale/vtprotobuf/pull/138
17+
// and https://github.com/vitessio/vitess/pull/16790.
18+
type vtprotoCodecV2 struct {
19+
fallback encoding.CodecV2
20+
}
21+
22+
func (vtprotoCodecV2) Name() string {
23+
return Name
24+
}
1725

18-
func (vtprotoCodec) Marshal(v any) ([]byte, error) {
19-
vv, ok := v.(proto.Message)
26+
func (c *vtprotoCodecV2) Marshal(v any) (mem.BufferSlice, error) {
27+
m, ok := v.(proto.VtprotoMessage)
2028
if !ok {
21-
return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v)
29+
return c.fallback.Marshal(v)
2230
}
23-
return proto.Marshal(vv)
31+
32+
size := m.SizeVT()
33+
if mem.IsBelowBufferPoolingThreshold(size) {
34+
buf := make([]byte, 0, size)
35+
if _, err := m.MarshalToSizedBufferVT(buf[:size]); err != nil {
36+
return nil, err
37+
}
38+
return mem.BufferSlice{mem.SliceBuffer(buf)}, nil
39+
}
40+
41+
pool := mem.DefaultBufferPool()
42+
buf := pool.Get(size)
43+
if _, err := m.MarshalToSizedBufferVT((*buf)[:size]); err != nil {
44+
pool.Put(buf)
45+
return nil, err
46+
}
47+
return mem.BufferSlice{mem.NewBuffer(buf, pool)}, nil
2448
}
2549

26-
func (vtprotoCodec) Unmarshal(data []byte, v any) error {
27-
vv, ok := v.(proto.Message)
50+
func (c *vtprotoCodecV2) Unmarshal(data mem.BufferSlice, v any) error {
51+
m, ok := v.(proto.VtprotoMessage)
2852
if !ok {
29-
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
53+
return c.fallback.Unmarshal(data, v)
3054
}
31-
return proto.Unmarshal(data, vv)
32-
}
3355

34-
func (vtprotoCodec) Name() string {
35-
return Name
56+
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
57+
defer buf.Free()
58+
return m.UnmarshalVT(buf.ReadOnlyData())
3659
}
3760

3861
// RegisterCodec registers the vtprotoCodec to encode/decode proto messages with
3962
// all gRPC clients and servers.
4063
func Register() {
41-
encoding.RegisterCodec(vtprotoCodec{})
64+
encoding.RegisterCodecV2(&vtprotoCodecV2{
65+
fallback: encoding.GetCodecV2("proto"),
66+
})
4267
}

0 commit comments

Comments
 (0)