Skip to content

Commit 2d1f56b

Browse files
committed
implement ReadStateBytes
1 parent b8b455b commit 2d1f56b

File tree

4 files changed

+107
-1
lines changed

4 files changed

+107
-1
lines changed

tfprotov6/internal/fromproto/state_store.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ func ConfigureStateStoreRequest(in *tfplugin6.ConfigureStateStore_Request) *tfpr
3030
}
3131
}
3232

33+
func ReadStateBytesRequest(in *tfplugin6.ReadStateBytes_Request) *tfprotov6.ReadStateBytesRequest {
34+
if in == nil {
35+
return nil
36+
}
37+
38+
return &tfprotov6.ReadStateBytesRequest{
39+
TypeName: in.TypeName,
40+
StateId: in.StateId,
41+
}
42+
}
43+
3344
func GetStatesRequest(in *tfplugin6.GetStates_Request) *tfprotov6.GetStatesRequest {
3445
if in == nil {
3546
return nil

tfprotov6/internal/toproto/state_store.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,22 @@ func ConfigureStateStore_Response(in *tfprotov6.ConfigureStateStoreResponse) *tf
2828
}
2929
}
3030

31+
func ReadStateBytes_Response(in *tfprotov6.ReadStateByteChunk) *tfplugin6.ReadStateBytes_Response {
32+
if in == nil {
33+
return nil
34+
}
35+
36+
return &tfplugin6.ReadStateBytes_Response{
37+
Diagnostics: Diagnostics(in.Diagnostics),
38+
Bytes: in.Bytes,
39+
TotalLength: in.TotalLength,
40+
Range: &tfplugin6.StateRange{
41+
Start: in.Range.Start,
42+
End: in.Range.End,
43+
},
44+
}
45+
}
46+
3147
func GetStates_Response(in *tfprotov6.GetStatesResponse) *tfplugin6.GetStates_Response {
3248
if in == nil {
3349
return nil

tfprotov6/state_store.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33

44
package tfprotov6
55

6-
import "context"
6+
import (
7+
"context"
8+
"iter"
9+
)
710

811
// StateStoreServer is an interface containing the methods an list resource
912
// implementation needs to fill.
@@ -14,6 +17,11 @@ type StateStoreServer interface {
1417
// ConfigureStateStore configures the state store, such as S3 connection in the context of already configured provider
1518
ConfigureStateStore(context.Context, *ConfigureStateStoreRequest) (*ConfigureStateStoreResponse, error)
1619

20+
// ReadStateBytes streams byte chunks of a given state file from a state store
21+
ReadStateBytes(context.Context, *ReadStateBytesRequest) (*ReadStateBytesStream, error)
22+
23+
WriteStateBytes(context.Context, *WriteStateBytesStream) (*WriteStateBytesResponse, error)
24+
1725
// GetStates returns a list of all states (i.e. CE workspaces) managed by a given state store
1826
GetStates(context.Context, *GetStatesRequest) (*GetStatesResponse, error)
1927

@@ -39,6 +47,30 @@ type ConfigureStateStoreResponse struct {
3947
Diagnostics []*Diagnostic
4048
}
4149

50+
type ReadStateBytesRequest struct {
51+
TypeName string
52+
StateId string
53+
}
54+
55+
type ReadStateBytesStream struct {
56+
Chunks iter.Seq[ReadStateByteChunk]
57+
}
58+
59+
type ReadStateByteChunk struct {
60+
StateByteChunk
61+
Diagnostics []*Diagnostic
62+
}
63+
64+
type StateByteChunk struct {
65+
Bytes []byte
66+
TotalLength int64
67+
Range StateByteRange
68+
}
69+
70+
type StateByteRange struct {
71+
Start, End int64
72+
}
73+
4274
type GetStatesRequest struct {
4375
TypeName string
4476
}

tfprotov6/tf6server/server.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,6 +1577,53 @@ func (s *server) ConfigureStateStore(ctx context.Context, protoReq *tfplugin6.Co
15771577
return protoResp, nil
15781578
}
15791579

1580+
func (s *server) ReadStateBytes(protoReq *tfplugin6.ReadStateBytes_Request, protoStream grpc.ServerStreamingServer[tfplugin6.ReadStateBytes_Response]) error {
1581+
rpc := "ReadStateBytes"
1582+
ctx := protoStream.Context()
1583+
ctx = s.loggingContext(ctx)
1584+
ctx = logging.RpcContext(ctx, rpc)
1585+
ctx = logging.StateStoreContext(ctx, protoReq.TypeName)
1586+
ctx = s.stoppableContext(ctx)
1587+
logging.ProtocolTrace(ctx, "Received request")
1588+
defer logging.ProtocolTrace(ctx, "Served request")
1589+
1590+
req := fromproto.ReadStateBytesRequest(protoReq)
1591+
logging.ProtocolData(ctx, s.protocolDataDir, rpc, "Request", "StateId", req.StateId)
1592+
1593+
ctx = tf6serverlogging.DownstreamRequest(ctx)
1594+
1595+
server, ok := s.downstream.(tfprotov6.StateStoreServer)
1596+
if !ok {
1597+
err := status.Error(codes.Unimplemented, "ProviderServer does not implement ReadStateBytes")
1598+
logging.ProtocolError(ctx, err.Error())
1599+
return err
1600+
}
1601+
1602+
stream, err := server.ReadStateBytes(ctx, req)
1603+
if err != nil {
1604+
logging.ProtocolError(ctx, "Error from downstream", map[string]interface{}{logging.KeyError: err})
1605+
return err
1606+
}
1607+
1608+
for chunk := range stream.Chunks {
1609+
select {
1610+
// TODO: check how interruptions are handled
1611+
case <-ctx.Done():
1612+
logging.ProtocolTrace(ctx, "all chunks sent")
1613+
return nil
1614+
1615+
default:
1616+
protoChunk := toproto.ReadStateBytes_Response(&chunk)
1617+
if err := protoStream.Send(protoChunk); err != nil {
1618+
logging.ProtocolError(ctx, "Error sending chunk", map[string]any{logging.KeyError: err})
1619+
return err
1620+
}
1621+
}
1622+
}
1623+
1624+
return nil
1625+
}
1626+
15801627
func (s *server) GetStates(ctx context.Context, protoReq *tfplugin6.GetStates_Request) (*tfplugin6.GetStates_Response, error) {
15811628
rpc := "GetStates"
15821629
ctx = s.loggingContext(ctx)

0 commit comments

Comments
 (0)