Skip to content
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
1ba2e3b
added missing components to state stream backend, added execution dat…
AndriiDiachuk Sep 30, 2025
05acbf1
Update cmd/access/node_builder/access_node_builder.go
AndriiDiachuk Oct 2, 2025
cdbb793
Update cmd/observer/node_builder/observer_builder.go
AndriiDiachuk Oct 2, 2025
fab2478
Update cmd/observer/node_builder/observer_builder.go
AndriiDiachuk Oct 2, 2025
20e6f6b
Update cmd/access/node_builder/access_node_builder.go
AndriiDiachuk Oct 2, 2025
0d88fb7
Linted
AndriiDiachuk Oct 2, 2025
c411bb3
Added check if GetIncludeExecutorMetadata is true for GetExecutionDat…
AndriiDiachuk Oct 2, 2025
eacc8e2
Added check for errNot found
AndriiDiachuk Oct 2, 2025
50cb0c0
Added BlockExecutionData to the Snapshot interface, changed implement…
AndriiDiachuk Oct 2, 2025
1152695
Merged conflicts
AndriiDiachuk Oct 6, 2025
af2c148
Added new interface for execution data reading, fixed corresponding p…
AndriiDiachuk Oct 7, 2025
5bb7569
Fixed executionData tests
AndriiDiachuk Oct 8, 2025
d4b4830
Fixed TestGetExecutionDataByBlockID in handler tests
AndriiDiachuk Oct 8, 2025
c4cd2ce
Added checks if events storage is not empty
AndriiDiachuk Oct 9, 2025
9bafca1
Added check to see if CI will fail
AndriiDiachuk Oct 10, 2025
ad1c42c
Added check for builder.events
AndriiDiachuk Oct 10, 2025
4ce817c
Moved events init above calling snapshot
AndriiDiachuk Oct 10, 2025
96b8a84
Initing events ion separate module
AndriiDiachuk Oct 10, 2025
3293f9c
Moved back events storage init
AndriiDiachuk Oct 10, 2025
424380f
Removed check if Events are non empty
AndriiDiachuk Oct 10, 2025
6dd50bc
Removed not nil checks foir new snapshot mock
AndriiDiachuk Oct 10, 2025
ee10ca1
Added again storage event init at the top of the build function
AndriiDiachuk Oct 10, 2025
6b23d30
Added check for notNil for events
AndriiDiachuk Oct 11, 2025
beb8d0c
Removed not nol check as one of the tests expects events to be nil
AndriiDiachuk Oct 11, 2025
c1da4d2
Added error handilng to the test
AndriiDiachuk Oct 14, 2025
dbc1972
Linted
AndriiDiachuk Oct 14, 2025
d0dd55e
Removed commented code
AndriiDiachuk Oct 14, 2025
6964e5e
Linted again
AndriiDiachuk Oct 14, 2025
78a76e8
Fixed comments regarding to errors in the GetExecutionDataByBlockID
AndriiDiachuk Oct 15, 2025
4bdd5df
Added additional checks in the tests
AndriiDiachuk Oct 15, 2025
742c0ce
Extended checks for handler test< added godoc
AndriiDiachuk Oct 15, 2025
1ee4ba4
Removed notNil check for ExecutionStateCache
AndriiDiachuk Oct 15, 2025
e730298
Removed other notNil check for executionStateCache
AndriiDiachuk Oct 15, 2025
257f514
Moved init of executionResultInfoProvider to the top of the Build fun…
AndriiDiachuk Oct 16, 2025
7ad32c5
Moved init of executionResultInfoProvider to the top of the Build fun…
AndriiDiachuk Oct 16, 2025
c8e7038
Added not nil checks for the execution result info provider
AndriiDiachuk Oct 16, 2025
2dba2af
Created separate function for result info provider
AndriiDiachuk Oct 16, 2025
616324c
Fixed mocked values, added error handling for missing error
AndriiDiachuk Oct 17, 2025
8a293ef
Removed separate config for the optimistic_sync package
AndriiDiachuk Oct 17, 2025
dc1e1a8
Fixed init of NewSnapshotMock in observer builder
AndriiDiachuk Oct 17, 2025
5adc47d
Added not nil check for events in NewSnapshotMock
AndriiDiachuk Oct 17, 2025
44fca74
Changed check for builder events
AndriiDiachuk Oct 17, 2025
b14cd1c
Added check for the collections storage
AndriiDiachuk Oct 17, 2025
a44b079
added check for nil for transactions and executionDataStoreCache
AndriiDiachuk Oct 17, 2025
b86afba
Moved lightTransactionResults and transactionResultErrorMessages init…
AndriiDiachuk Oct 17, 2025
beff0ad
Fixed notNil function
AndriiDiachuk Oct 20, 2025
bf29402
Changed impl of notNil
AndriiDiachuk Oct 20, 2025
7c4062a
Added function to check storages
AndriiDiachuk Oct 20, 2025
fc9ef3c
Added godoc, and refactored one test
AndriiDiachuk Oct 21, 2025
386498a
Added more test cases, added godoc woth expected errors
AndriiDiachuk Oct 27, 2025
47bbf72
Removed empty line
AndriiDiachuk Oct 27, 2025
29cbe4d
Merged both builders
AndriiDiachuk Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ packages:
all: False
interfaces:
Core:
Snapshot:
BlockExecutionDataReader:
github.com/onflow/flow-go/module/executiondatasync/tracker:
github.com/onflow/flow-go/module/forest:
github.com/onflow/flow-go/module/mempool:
Expand Down
95 changes: 63 additions & 32 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,10 @@ type FlowAccessNodeBuilder struct {
unsecureGrpcServer *grpcserver.GrpcServer
stateStreamGrpcServer *grpcserver.GrpcServer

stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend
stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend
executionResultInfoProvider optimistic_sync.ExecutionResultInfoProvider
executionStateCache optimistic_sync.ExecutionStateCache

ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider
TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
Expand Down Expand Up @@ -650,6 +652,21 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess

return nil
}).
Module("execution state cache", func(node *cmd.NodeConfig) error {
// TODO: use real objects instead of mocks once they're implemented
snapshot := osyncsnapshot.NewSnapshotMock(
builder.events,
builder.collections,
builder.transactions,
builder.lightTransactionResults,
builder.transactionResultErrorMessages,
nil,
executionDataStoreCache,
)
builder.executionStateCache = execution_state.NewExecutionStateCacheMock(snapshot)

return nil
}).
Component("execution data service", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
opts := []network.BlobServiceOption{
blob.WithBitswapOptions(
Expand Down Expand Up @@ -1084,6 +1101,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.stateStreamConf.ClientSendBufferSize,
),
executionDataTracker,
notNil(builder.executionResultInfoProvider),
builder.executionStateCache,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
Expand Down Expand Up @@ -1714,10 +1733,50 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
})
}

// buildExecutionResultInfoProvider registers a module that wires the
// optimistic_sync.ExecutionResultInfoProvider on the builder.
func (builder *FlowAccessNodeBuilder) buildExecutionResultInfoProvider() *FlowAccessNodeBuilder {
builder.Module("execution result info provider", func(node *cmd.NodeConfig) error {
backendConfig := builder.rpcConf.BackendConfig

preferredENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

execNodeSelector := execution_result.NewExecutionNodeSelector(
preferredENIdentifiers,
fixedENIdentifiers,
)

builder.executionResultInfoProvider = execution_result.NewExecutionResultInfoProvider(
node.Logger,
node.State,
node.Storage.Receipts,
execNodeSelector,
optimistic_sync.DefaultCriteria,
)

return nil
})
return builder
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedFinalizedBlockHeight storage.ConsumerProgressInitializer
var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer

builder.Module("events storage", func(node *cmd.NodeConfig) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Events storage is the only one that can be initialized at this point. Other storages are initialized later or based on flags like storeTxResultErrorMessages and executionDataSyncEnabled. That’s why it’s a separate module instead of being initialized through a function.

builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB)
return nil
})
builder.buildExecutionResultInfoProvider()

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
}
Expand Down Expand Up @@ -1924,10 +1983,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.RegistersAsyncStore = execution.NewRegistersAsyncStore()
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB)
return nil
}).
Module("reporter", func(node *cmd.NodeConfig) error {
builder.Reporter = index.NewReporter()
return nil
Expand Down Expand Up @@ -2113,30 +2168,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
notNil(builder.ExecNodeIdentitiesProvider),
)

execNodeSelector := execution_result.NewExecutionNodeSelector(
preferredENIdentifiers,
fixedENIdentifiers,
)

execResultInfoProvider := execution_result.NewExecutionResultInfoProvider(
node.Logger,
node.State,
node.Storage.Receipts,
execNodeSelector,
optimistic_sync.DefaultCriteria,
)

// TODO: use real objects instead of mocks once they're implemented
snapshot := osyncsnapshot.NewSnapshotMock(
builder.events,
builder.collections,
builder.transactions,
builder.lightTransactionResults,
builder.transactionResultErrorMessages,
nil,
)
execStateCache := execution_state.NewExecutionStateCacheMock(snapshot)

builder.nodeBackend, err = backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC, // might be nil
Expand Down Expand Up @@ -2177,8 +2208,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
VersionControl: notNil(builder.VersionControl),
ExecNodeIdentitiesProvider: notNil(builder.ExecNodeIdentitiesProvider),
TxErrorMessageProvider: notNil(builder.txResultErrorMessageProvider),
ExecutionResultInfoProvider: execResultInfoProvider,
ExecutionStateCache: execStateCache,
ExecutionResultInfoProvider: notNil(builder.executionResultInfoProvider),
ExecutionStateCache: builder.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
MaxScriptAndArgumentSize: config.BackendConfig.AccessConfig.MaxRequestMsgSize,
ScheduledCallbacksEnabled: builder.scheduledCallbacksEnabled,
Expand Down
92 changes: 61 additions & 31 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,9 @@ type ObserverServiceBuilder struct {
unsecureGrpcServer *grpcserver.GrpcServer
stateStreamGrpcServer *grpcserver.GrpcServer

stateStreamBackend *statestreambackend.StateStreamBackend
stateStreamBackend *statestreambackend.StateStreamBackend
executionResultInfoProvider optimistic_sync.ExecutionResultInfoProvider
executionStateCache optimistic_sync.ExecutionStateCache
}

// deriveBootstrapPeerIdentities derives the Flow Identity of the bootstrap peers from the parameters.
Expand Down Expand Up @@ -1092,10 +1094,45 @@ func (builder *ObserverServiceBuilder) initObserverLocal() func(node *cmd.NodeCo
}
}

// buildExecutionResultInfoProvider registers a module that wires the
// optimistic_sync.ExecutionResultInfoProvider on the builder.
func (builder *ObserverServiceBuilder) buildExecutionResultInfoProvider() *ObserverServiceBuilder {
builder.Module("execution result info provider", func(node *cmd.NodeConfig) error {
backendConfig := builder.rpcConf.BackendConfig

preferredENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

execNodeSelector := execution_result.NewExecutionNodeSelector(
preferredENIdentifiers,
fixedENIdentifiers,
)

builder.executionResultInfoProvider = execution_result.NewExecutionResultInfoProvider(
node.Logger,
node.State,
node.Storage.Receipts,
execNodeSelector,
optimistic_sync.DefaultCriteria,
)

return nil
})
return builder
}

// Build enqueues the sync engine and the follower engine for the observer.
// Currently, the observer only runs the follower engine.
func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) {
builder.BuildConsensusFollower()
builder.buildExecutionResultInfoProvider()

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
Expand Down Expand Up @@ -1206,6 +1243,25 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS

return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB)
return nil
}).
Module("execution state cache", func(node *cmd.NodeConfig) error {
// TODO: use real objects instead of mocks once they're implemented
snapshot := osyncsnapshot.NewSnapshotMock(
builder.events,
nil,
nil,
builder.lightTransactionResults,
nil,
nil,
executionDataStoreCache,
)
builder.executionStateCache = execution_state.NewExecutionStateCacheMock(snapshot)

return nil
}).
Component("public execution data service", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
opts := []network.BlobServiceOption{
blob.WithBitswapOptions(
Expand Down Expand Up @@ -1593,6 +1649,8 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.stateStreamConf.ClientSendBufferSize,
),
executionDataTracker,
builder.executionResultInfoProvider,
builder.executionStateCache,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
Expand Down Expand Up @@ -1813,10 +1871,6 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.RegistersAsyncStore = execution.NewRegistersAsyncStore()
return nil
})
builder.Module("events storage", func(node *cmd.NodeConfig) error {
builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB)
return nil
})
builder.Module("reporter", func(node *cmd.NodeConfig) error {
builder.Reporter = index.NewReporter()
return nil
Expand Down Expand Up @@ -1972,30 +2026,6 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
fixedENIdentifiers,
)

execNodeSelector := execution_result.NewExecutionNodeSelector(
preferredENIdentifiers,
fixedENIdentifiers,
)

execResultInfoProvider := execution_result.NewExecutionResultInfoProvider(
node.Logger,
node.State,
node.Storage.Receipts,
execNodeSelector,
optimistic_sync.DefaultCriteria,
)

// TODO: use real objects instead of mocks once they're implemented
snapshot := osyncsnapshot.NewSnapshotMock(
builder.events,
nil,
nil,
builder.lightTransactionResults,
nil,
nil,
)
execStateCache := execution_state.NewExecutionStateCacheMock(snapshot)

backendParams := backend.Params{
State: node.State,
Blocks: node.Storage.Blocks,
Expand Down Expand Up @@ -2027,8 +2057,8 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
VersionControl: builder.VersionControl,
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
MaxScriptAndArgumentSize: config.BackendConfig.AccessConfig.MaxRequestMsgSize,
ExecutionResultInfoProvider: execResultInfoProvider,
ExecutionStateCache: execStateCache,
ExecutionResultInfoProvider: builder.executionResultInfoProvider,
ExecutionStateCache: builder.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
}

Expand Down
2 changes: 2 additions & 0 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
state_stream.DefaultRegisterIDsRequestLimit,
subscriptionHandler,
suite.executionDataTracker,
suite.executionResultInfoProvider,
suite.executionStateCache,
)
assert.NoError(suite.T(), err)

Expand Down
15 changes: 10 additions & 5 deletions engine/access/state_stream/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
Expand Down Expand Up @@ -95,6 +96,8 @@ func New(
registerIDsRequestLimit int,
subscriptionHandler *subscription.SubscriptionHandler,
executionDataTracker tracker.ExecutionDataTracker,
executionResultProvider optimistic_sync.ExecutionResultInfoProvider,
executionStateCache optimistic_sync.ExecutionStateCache,
) (*StateStreamBackend, error) {
logger := log.With().Str("module", "state_stream_api").Logger()

Expand All @@ -113,11 +116,13 @@ func New(
}

b.ExecutionDataBackend = ExecutionDataBackend{
log: logger,
headers: headers,
subscriptionHandler: subscriptionHandler,
getExecutionData: b.getExecutionData,
executionDataTracker: executionDataTracker,
log: logger,
headers: headers,
subscriptionHandler: subscriptionHandler,
getExecutionData: b.getExecutionData,
executionDataTracker: executionDataTracker,
executionResultProvider: executionResultProvider,
executionStateCache: executionStateCache,
}

eventsProvider := EventsProvider{
Expand Down
Loading
Loading