Skip to content

Commit 3164fe0

Browse files
committed
skeleton code, no operands
1 parent b30bb6c commit 3164fe0

File tree

6 files changed

+202
-8
lines changed

6 files changed

+202
-8
lines changed

enterprise/server/routing/routing_action_cache_client/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ go_library(
66
importpath = "github.com/buildbuddy-io/buildbuddy/enterprise/server/routing/routing_action_cache_client",
77
visibility = ["//visibility:public"],
88
deps = [
9+
"//enterprise/server/batch_operator",
910
"//proto:remote_execution_go_proto",
1011
"//server/environment",
1112
"//server/interfaces",
13+
"//server/util/log",
1214
"//server/util/status",
1315
"@org_golang_google_grpc//:grpc",
1416
],

enterprise/server/routing/routing_action_cache_client/routing_action_cache_client.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,23 @@ package routing_action_cache_client
22

33
import (
44
"context"
5+
"math/rand/v2"
56

67
"google.golang.org/grpc"
78

9+
"github.com/buildbuddy-io/buildbuddy/enterprise/server/batch_operator"
810
repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
911
"github.com/buildbuddy-io/buildbuddy/server/environment"
1012
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
13+
"github.com/buildbuddy-io/buildbuddy/server/util/log"
1114
"github.com/buildbuddy-io/buildbuddy/server/util/status"
1215
)
1316

1417
type RoutingACClient struct {
15-
router interfaces.CacheRoutingService
18+
router interfaces.CacheRoutingService
19+
copyOp batch_operator.BatchDigestOperator
20+
readOp batch_operator.BatchDigestOperator
21+
verifyOp batch_operator.BatchDigestOperator
1622
}
1723

1824
func New(env environment.Env) (repb.ActionCacheClient, error) {
@@ -31,13 +37,45 @@ func (r *RoutingACClient) GetActionResult(ctx context.Context, req *repb.GetActi
3137
if err != nil {
3238
return nil, status.InternalErrorf("Failed to get primary AC client: %s", err)
3339
}
34-
return primaryClient.GetActionResult(ctx, req, opts...)
40+
rsp, err := primaryClient.GetActionResult(ctx, req, opts...)
41+
if err != nil {
42+
return rsp, err
43+
}
44+
45+
c, err := r.router.GetCacheRoutingConfig(ctx)
46+
if err != nil {
47+
log.CtxWarningf(ctx, "Failed to fetch routing config: %s", err)
48+
return rsp, nil
49+
}
50+
singleRandValue := rand.Float32()
51+
if singleRandValue < c.GetBackgroundReadVerifyFraction() {
52+
r.verifyOp.Enqueue(ctx, req.GetInstanceName(), []*repb.Digest{req.GetActionDigest()}, req.GetDigestFunction())
53+
} else if singleRandValue < c.GetBackgroundReadFraction() {
54+
r.readOp.Enqueue(ctx, req.GetInstanceName(), []*repb.Digest{req.GetActionDigest()}, req.GetDigestFunction())
55+
} else if (singleRandValue) < c.GetBackgroundCopyFraction() {
56+
r.copyOp.Enqueue(ctx, req.GetInstanceName(), []*repb.Digest{req.GetActionDigest()}, req.GetDigestFunction())
57+
}
58+
return rsp, nil
3559
}
3660

3761
func (r *RoutingACClient) UpdateActionResult(ctx context.Context, req *repb.UpdateActionResultRequest, opts ...grpc.CallOption) (*repb.ActionResult, error) {
3862
primaryClient, err := r.router.GetPrimaryACClient(ctx)
3963
if err != nil {
4064
return nil, status.InternalErrorf("Failed to get primary AC client: %s", err)
4165
}
42-
return primaryClient.UpdateActionResult(ctx, req, opts...)
66+
rsp, err := primaryClient.UpdateActionResult(ctx, req, opts...)
67+
if err != nil {
68+
return rsp, err
69+
}
70+
71+
c, err := r.router.GetCacheRoutingConfig(ctx)
72+
if err != nil {
73+
log.CtxWarningf(ctx, "Failed to fetch routing config: %s", err)
74+
return rsp, nil
75+
}
76+
singleRandValue := rand.Float32()
77+
if (singleRandValue) < c.GetBackgroundCopyFraction() {
78+
r.copyOp.Enqueue(ctx, req.GetInstanceName(), []*repb.Digest{req.GetActionDigest()}, req.GetDigestFunction())
79+
}
80+
return rsp, nil
4381
}

enterprise/server/routing/routing_byte_stream_client/BUILD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@ go_library(
66
importpath = "github.com/buildbuddy-io/buildbuddy/enterprise/server/routing/routing_byte_stream_client",
77
visibility = ["//visibility:public"],
88
deps = [
9+
"//enterprise/server/batch_operator",
10+
"//proto:remote_execution_go_proto",
911
"//server/environment",
1012
"//server/interfaces",
13+
"//server/remote_cache/digest",
14+
"//server/util/log",
1115
"//server/util/status",
1216
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
1317
"@org_golang_google_grpc//:grpc",

enterprise/server/routing/routing_byte_stream_client/routing_byte_stream_client.go

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,16 @@ package routing_byte_stream_client
22

33
import (
44
"context"
5+
"math/rand/v2"
56

67
"google.golang.org/grpc"
78

9+
"github.com/buildbuddy-io/buildbuddy/enterprise/server/batch_operator"
10+
repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
811
"github.com/buildbuddy-io/buildbuddy/server/environment"
912
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
13+
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
14+
"github.com/buildbuddy-io/buildbuddy/server/util/log"
1015
"github.com/buildbuddy-io/buildbuddy/server/util/status"
1116

1217
bspb "google.golang.org/genproto/googleapis/bytestream"
@@ -15,6 +20,9 @@ import (
1520
type RoutingByteStreamClient struct {
1621
acClient map[string]bspb.ByteStreamClient
1722
router interfaces.CacheRoutingService
23+
copyOp batch_operator.BatchDigestOperator
24+
readOp batch_operator.BatchDigestOperator
25+
verifyOp batch_operator.BatchDigestOperator
1826
}
1927

2028
func New(env environment.Env) (bspb.ByteStreamClient, error) {
@@ -41,13 +49,70 @@ func (r *RoutingByteStreamClient) Read(ctx context.Context, req *bspb.ReadReques
4149
if err != nil {
4250
return nil, status.InternalErrorf("Failed to get primary AC client: %s", err)
4351
}
44-
return primaryClient.Read(ctx, req, opts...)
52+
rsp, err := primaryClient.Read(ctx, req, opts...)
53+
if err != nil {
54+
return rsp, err
55+
}
56+
57+
c, err := r.router.GetCacheRoutingConfig(ctx)
58+
if err != nil {
59+
log.CtxWarningf(ctx, "Failed to fetch routing config: %s", err)
60+
return rsp, nil
61+
}
62+
singleRandValue := rand.Float32()
63+
if singleRandValue < c.GetBackgroundReadVerifyFraction() {
64+
if rn, err := digest.ParseDownloadResourceName(req.GetResourceName()); err == nil {
65+
r.verifyOp.Enqueue(ctx, rn.GetInstanceName(), []*repb.Digest{rn.GetDigest()}, rn.GetDigestFunction())
66+
}
67+
} else if singleRandValue < c.GetBackgroundReadFraction() {
68+
if rn, err := digest.ParseDownloadResourceName(req.GetResourceName()); err == nil {
69+
r.readOp.Enqueue(ctx, rn.GetInstanceName(), []*repb.Digest{rn.GetDigest()}, rn.GetDigestFunction())
70+
}
71+
} else if (singleRandValue) < c.GetBackgroundCopyFraction() {
72+
if rn, err := digest.ParseDownloadResourceName(req.GetResourceName()); err == nil {
73+
r.copyOp.Enqueue(ctx, rn.GetInstanceName(), []*repb.Digest{rn.GetDigest()}, rn.GetDigestFunction())
74+
}
75+
}
76+
return rsp, nil
77+
}
78+
79+
type wrappedWriteStream struct {
80+
ctx context.Context
81+
copyOp batch_operator.BatchDigestOperator
82+
bspb.ByteStream_WriteClient
83+
}
84+
85+
func (w *wrappedWriteStream) Send(req *bspb.WriteRequest) error {
86+
if req.GetResourceName() != "" {
87+
if rn, err := digest.ParseDownloadResourceName(req.GetResourceName()); err == nil {
88+
w.copyOp.Enqueue(w.ctx, rn.GetInstanceName(), []*repb.Digest{rn.GetDigest()}, rn.GetDigestFunction())
89+
}
90+
}
91+
return w.ByteStream_WriteClient.Send(req)
4592
}
4693

4794
func (r *RoutingByteStreamClient) Write(ctx context.Context, opts ...grpc.CallOption) (bspb.ByteStream_WriteClient, error) {
4895
primaryClient, err := r.router.GetPrimaryBSClient(ctx)
4996
if err != nil {
5097
return nil, status.InternalErrorf("Failed to get primary AC client: %s", err)
5198
}
52-
return primaryClient.Write(ctx, opts...)
99+
rsp, err := primaryClient.Write(ctx, opts...)
100+
if err != nil {
101+
return rsp, err
102+
}
103+
104+
c, err := r.router.GetCacheRoutingConfig(ctx)
105+
if err != nil {
106+
log.CtxWarningf(ctx, "Failed to fetch routing config: %s", err)
107+
return rsp, nil
108+
}
109+
singleRandValue := rand.Float32()
110+
if (singleRandValue) < c.GetBackgroundCopyFraction() {
111+
return &wrappedWriteStream{
112+
ctx: ctx,
113+
copyOp: r.copyOp,
114+
ByteStream_WriteClient: rsp,
115+
}, nil
116+
}
117+
return rsp, nil
53118
}

enterprise/server/routing/routing_content_addressable_storage_client/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ go_library(
66
importpath = "github.com/buildbuddy-io/buildbuddy/enterprise/server/routing/routing_content_addressable_storage_client",
77
visibility = ["//visibility:public"],
88
deps = [
9+
"//enterprise/server/batch_operator",
910
"//proto:remote_execution_go_proto",
1011
"//server/environment",
1112
"//server/interfaces",
13+
"//server/util/log",
1214
"//server/util/status",
1315
"@org_golang_google_grpc//:grpc",
1416
],

enterprise/server/routing/routing_content_addressable_storage_client/routing_content_addressable_storage_client.go

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,25 @@ package routing_content_addressable_storage_client
22

33
import (
44
"context"
5+
"math/rand/v2"
56

67
"google.golang.org/grpc"
78

9+
"github.com/buildbuddy-io/buildbuddy/enterprise/server/batch_operator"
810
repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
911
"github.com/buildbuddy-io/buildbuddy/server/environment"
1012
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
13+
"github.com/buildbuddy-io/buildbuddy/server/util/log"
1114
"github.com/buildbuddy-io/buildbuddy/server/util/status"
1215
)
1316

1417
type RoutingCASClient struct {
1518
casClients map[string]repb.ContentAddressableStorageClient
1619
router interfaces.CacheRoutingService
20+
copyOp batch_operator.BatchDigestOperator
21+
readOp batch_operator.BatchDigestOperator
22+
verifyOp batch_operator.BatchDigestOperator
23+
treeOp batch_operator.BatchDigestOperator
1724
}
1825

1926
func New(env environment.Env) (*RoutingCASClient, error) {
@@ -32,6 +39,31 @@ func (r *RoutingCASClient) FindMissingBlobs(ctx context.Context, req *repb.FindM
3239
if err != nil {
3340
return nil, status.InternalErrorf("Failed to get primary CAS client: %s", err)
3441
}
42+
rsp, err := primaryClient.FindMissingBlobs(ctx, req, opts...)
43+
if err != nil {
44+
return rsp, err
45+
}
46+
47+
c, err := r.router.GetCacheRoutingConfig(ctx)
48+
if err != nil {
49+
log.CtxWarningf(ctx, "Failed to fetch routing config: %s", err)
50+
return rsp, nil
51+
}
52+
53+
if rand.Float32() < c.GetBackgroundCopyFraction() {
54+
foundDigestsMap := map[string]*repb.Digest{}
55+
for _, d := range req.BlobDigests {
56+
foundDigestsMap[d.GetHash()] = d
57+
}
58+
for _, missing := range rsp.MissingBlobDigests {
59+
delete(foundDigestsMap, missing.GetHash())
60+
}
61+
digestsToSync := make([]*repb.Digest, 0, len(foundDigestsMap))
62+
for _, d := range foundDigestsMap {
63+
digestsToSync = append(digestsToSync, d)
64+
}
65+
r.copyOp.Enqueue(ctx, req.GetInstanceName(), digestsToSync, req.GetDigestFunction())
66+
}
3567
return primaryClient.FindMissingBlobs(ctx, req, opts...)
3668
}
3769

@@ -40,23 +72,74 @@ func (r *RoutingCASClient) BatchUpdateBlobs(ctx context.Context, req *repb.Batch
4072
if err != nil {
4173
return nil, status.InternalErrorf("Failed to get primary CAS client: %s", err)
4274
}
43-
return primaryClient.BatchUpdateBlobs(ctx, req, opts...)
75+
rsp, err := primaryClient.BatchUpdateBlobs(ctx, req, opts...)
76+
if err != nil {
77+
return rsp, err
78+
}
79+
80+
c, err := r.router.GetCacheRoutingConfig(ctx)
81+
if err != nil {
82+
log.CtxWarningf(ctx, "Failed to fetch routing config: %s", err)
83+
return rsp, nil
84+
}
85+
86+
if rand.Float32() < c.GetBackgroundCopyFraction() {
87+
digestsToSync := make([]*repb.Digest, 0, len(req.GetRequests()))
88+
for _, d := range req.GetRequests() {
89+
digestsToSync = append(digestsToSync, d.GetDigest())
90+
}
91+
r.copyOp.Enqueue(ctx, req.GetInstanceName(), digestsToSync, req.GetDigestFunction())
92+
}
93+
94+
return rsp, nil
4495
}
4596

4697
func (r *RoutingCASClient) BatchReadBlobs(ctx context.Context, req *repb.BatchReadBlobsRequest, opts ...grpc.CallOption) (*repb.BatchReadBlobsResponse, error) {
4798
primaryClient, err := r.router.GetPrimaryCASClient(ctx)
4899
if err != nil {
49100
return nil, status.InternalErrorf("Failed to get primary CAS client: %s", err)
50101
}
51-
return primaryClient.BatchReadBlobs(ctx, req, opts...)
102+
rsp, err := primaryClient.BatchReadBlobs(ctx, req, opts...)
103+
if err != nil {
104+
return rsp, err
105+
}
106+
107+
c, err := r.router.GetCacheRoutingConfig(ctx)
108+
if err != nil {
109+
log.CtxWarningf(ctx, "Failed to fetch routing config: %s", err)
110+
return rsp, nil
111+
}
112+
singleRandValue := rand.Float32()
113+
if singleRandValue < c.GetBackgroundReadVerifyFraction() {
114+
r.verifyOp.Enqueue(ctx, req.GetInstanceName(), req.GetDigests(), req.GetDigestFunction())
115+
} else if singleRandValue < c.GetBackgroundReadFraction() {
116+
r.readOp.Enqueue(ctx, req.GetInstanceName(), req.GetDigests(), req.GetDigestFunction())
117+
} else if (singleRandValue) < c.GetBackgroundCopyFraction() {
118+
r.copyOp.Enqueue(ctx, req.GetInstanceName(), req.GetDigests(), req.GetDigestFunction())
119+
}
120+
121+
return rsp, nil
52122
}
53123

54124
func (r *RoutingCASClient) GetTree(ctx context.Context, req *repb.GetTreeRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[repb.GetTreeResponse], error) {
55125
primaryClient, err := r.router.GetPrimaryCASClient(ctx)
56126
if err != nil {
57127
return nil, status.InternalErrorf("Failed to get primary CAS client: %s", err)
58128
}
59-
return primaryClient.GetTree(ctx, req, opts...)
129+
rsp, err := primaryClient.GetTree(ctx, req, opts...)
130+
if err != nil {
131+
return rsp, err
132+
}
133+
134+
c, err := r.router.GetCacheRoutingConfig(ctx)
135+
if err != nil {
136+
log.CtxWarningf(ctx, "Failed to fetch routing config: %s", err)
137+
return rsp, nil
138+
}
139+
if rand.Float32() < c.GetBackgroundCopyFraction() {
140+
r.treeOp.Enqueue(ctx, req.GetInstanceName(), []*repb.Digest{req.GetRootDigest()}, req.GetDigestFunction())
141+
}
142+
return rsp, nil
60143
}
61144

62145
func (r *RoutingCASClient) SpliceBlob(ctx context.Context, req *repb.SpliceBlobRequest, opts ...grpc.CallOption) (*repb.SpliceBlobResponse, error) {

0 commit comments

Comments
 (0)