Skip to content

Commit 12d5a13

Browse files
authored
vtprotocodec: user codecV2 for grpc client/server (#8316)
Switch to use codecV2 backed by the default buffer pool to handle all Marshaling/Unmarshaling by grpc clients/servers. The default buffer pool is already setup here. ``` server/util/grpc_client/grpc_client.go 254: experimental.WithBufferPool(mem.DefaultBufferPool()), server/util/grpc_server/grpc_server.go 240: experimental.BufferPool(mem.DefaultBufferPool()), ``` This does not affect the manual Marshaling/Unmarshaling that we does via `server/util/proto.{Marshal,Unmarshal}()`. These should still use V1 for the time being. References: - planetscale/vtprotobuf#138 - vitessio/vitess#16790
1 parent b11a498 commit 12d5a13

File tree

3 files changed

+53
-22
lines changed

3 files changed

+53
-22
lines changed

server/util/proto/proto.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,26 @@ var Bool = gproto.Bool
2020
type Message = gproto.Message
2121
type MarshalOptions = gproto.MarshalOptions
2222

23-
type vtprotoMessage interface {
23+
type VTProtoMessage interface {
2424
MarshalVT() ([]byte, error)
2525
UnmarshalVT([]byte) error
2626
CloneMessageVT() Message
27+
28+
// For vtprotoCodecV2
29+
MarshalToSizedBufferVT(data []byte) (int, error)
30+
SizeVT() int
2731
}
2832

2933
func Marshal(v Message) ([]byte, error) {
30-
vt, ok := v.(vtprotoMessage)
34+
vt, ok := v.(VTProtoMessage)
3135
if ok {
3236
return vt.MarshalVT()
3337
}
3438
return MarshalOld(v)
3539
}
3640

3741
func Unmarshal(b []byte, v Message) error {
38-
vt, ok := v.(vtprotoMessage)
42+
vt, ok := v.(VTProtoMessage)
3943
if ok {
4044
return vt.UnmarshalVT(b)
4145
}
@@ -44,7 +48,7 @@ func Unmarshal(b []byte, v Message) error {
4448
}
4549

4650
func Clone(v Message) Message {
47-
vt, ok := v.(vtprotoMessage)
51+
vt, ok := v.(VTProtoMessage)
4852
if ok {
4953
return vt.CloneMessageVT()
5054
}

server/util/vtprotocodec/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ go_library(
99
"//server/util/proto",
1010
"@org_golang_google_grpc//encoding",
1111
"@org_golang_google_grpc//encoding/proto",
12+
"@org_golang_google_grpc//mem",
1213
],
1314
)
Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,68 @@
11
package vtprotocodec
22

33
import (
4-
"fmt"
5-
64
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
75

86
"google.golang.org/grpc/encoding"
7+
"google.golang.org/grpc/mem"
8+
99
_ "google.golang.org/grpc/encoding/proto" // for default proto registration purposes
1010
)
1111

1212
const Name = "proto"
1313

14-
// vtprotoCodec represents a codec able to encode and decode vt enabled
15-
// proto messages.
16-
type vtprotoCodec struct{}
14+
// CodecV2 implements encoding.CodecV2 and uses vtproto and default buffer pool
15+
// to encode/decode proto messages. The implementation is heavily inspired by
16+
// https://github.com/planetscale/vtprotobuf/pull/138
17+
// and https://github.com/vitessio/vitess/pull/16790.
18+
type CodecV2 struct {
19+
fallback encoding.CodecV2
20+
}
1721

18-
func (vtprotoCodec) Marshal(v any) ([]byte, error) {
19-
vv, ok := v.(proto.Message)
20-
if !ok {
21-
return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v)
22-
}
23-
return proto.Marshal(vv)
22+
func (CodecV2) Name() string {
23+
return Name
2424
}
2525

26-
func (vtprotoCodec) Unmarshal(data []byte, v any) error {
27-
vv, ok := v.(proto.Message)
26+
func (c *CodecV2) Marshal(v any) (mem.BufferSlice, error) {
27+
m, ok := v.(proto.VTProtoMessage)
2828
if !ok {
29-
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
29+
return c.fallback.Marshal(v)
3030
}
31-
return proto.Unmarshal(data, vv)
31+
size := m.SizeVT()
32+
if mem.IsBelowBufferPoolingThreshold(size) {
33+
buf := make([]byte, size)
34+
n, err := m.MarshalToSizedBufferVT(buf)
35+
if err != nil {
36+
return nil, err
37+
}
38+
return mem.BufferSlice{mem.SliceBuffer(buf[:n])}, nil
39+
}
40+
pool := mem.DefaultBufferPool()
41+
buf := pool.Get(size)
42+
n, err := m.MarshalToSizedBufferVT(*buf)
43+
if err != nil {
44+
pool.Put(buf)
45+
return nil, err
46+
}
47+
*buf = (*buf)[:n]
48+
return mem.BufferSlice{mem.NewBuffer(buf, pool)}, nil
3249
}
3350

34-
func (vtprotoCodec) Name() string {
35-
return Name
51+
func (c *CodecV2) Unmarshal(data mem.BufferSlice, v any) error {
52+
m, ok := v.(proto.VTProtoMessage)
53+
if !ok {
54+
return c.fallback.Unmarshal(data, v)
55+
}
56+
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
57+
defer buf.Free()
58+
return m.UnmarshalVT(buf.ReadOnlyData())
3659
}
3760

3861
// RegisterCodec registers the vtprotoCodec to encode/decode proto messages with
3962
// all gRPC clients and servers.
4063
func Register() {
41-
encoding.RegisterCodec(vtprotoCodec{})
64+
encoding.RegisterCodecV2(&CodecV2{
65+
// the default codecv2 implemented in @org_golang_google_grpc//encoding/proto.
66+
fallback: encoding.GetCodecV2("proto"),
67+
})
4268
}

0 commit comments

Comments
 (0)