Skip to content

Commit d22661a

Browse files
committed
Merge branch 'master' into leo/refactor-stored-chunk-data-pack
2 parents 7fa1ef6 + ca73541 commit d22661a

File tree

64 files changed

+1678
-631
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1678
-631
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ type AccessNodeConfig struct {
161161
rpcMetricsEnabled bool
162162
executionDataSyncEnabled bool
163163
publicNetworkExecutionDataEnabled bool
164-
executionDataDBMode string
165164
executionDataPrunerHeightRangeTarget uint64
166165
executionDataPrunerThreshold uint64
167166
executionDataPruningInterval time.Duration
@@ -277,7 +276,6 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
277276
MaxRetryDelay: edrequester.DefaultMaxRetryDelay,
278277
},
279278
executionDataIndexingEnabled: false,
280-
executionDataDBMode: execution_data.ExecutionDataDBModePebble.String(),
281279
executionDataPrunerHeightRangeTarget: 0,
282280
executionDataPrunerThreshold: pruner.DefaultThreshold,
283281
executionDataPruningInterval: pruner.DefaultPruningInterval,
@@ -591,7 +589,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
591589
Module("execution data datastore and blobstore", func(node *cmd.NodeConfig) error {
592590
var err error
593591
builder.ExecutionDatastoreManager, err = edstorage.CreateDatastoreManager(
594-
node.Logger, builder.executionDataDir, builder.executionDataDBMode)
592+
node.Logger, builder.executionDataDir)
595593
if err != nil {
596594
return fmt.Errorf("could not create execution data datastore manager: %w", err)
597595
}
@@ -1362,10 +1360,13 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
13621360
"execution-data-max-retry-delay",
13631361
defaultConfig.executionDataConfig.MaxRetryDelay,
13641362
"maximum delay for exponential backoff when fetching execution data fails e.g. 5m")
1365-
flags.StringVar(&builder.executionDataDBMode,
1363+
1364+
var builderexecutionDataDBMode string
1365+
flags.StringVar(&builderexecutionDataDBMode,
13661366
"execution-data-db",
1367-
defaultConfig.executionDataDBMode,
1368-
"[experimental] the DB type for execution datastore. One of [badger, pebble]")
1367+
"pebble",
1368+
"[deprecated] the DB type for execution datastore")
1369+
13691370
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget,
13701371
"execution-data-height-range-target",
13711372
defaultConfig.executionDataPrunerHeightRangeTarget,
@@ -2060,7 +2061,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
20602061
// handles block-related operations.
20612062
blockTracker, err := subscriptiontracker.NewBlockTracker(
20622063
node.State,
2063-
builder.FinalizedRootBlock.Height,
2064+
builder.SealedRootBlock.Height,
20642065
node.Storage.Headers,
20652066
broadcaster,
20662067
)

cmd/bootstrap/cmd/key.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,27 +100,30 @@ func keyCmdRun(_ *cobra.Command, _ []string) {
100100
}
101101
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathNodeID)
102102

103-
err = common.WriteJSON(fmt.Sprintf(model.PathNodeInfoPriv, nodeInfo.NodeID), flagOutdir, private)
103+
privKeyPath := fmt.Sprintf(model.PathNodeInfoPriv, nodeInfo.NodeID)
104+
err = common.WriteJSON(privKeyPath, flagOutdir, private)
104105
if err != nil {
105106
log.Fatal().Err(err).Msg("failed to write json")
106107
}
107-
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathNodeInfoPriv)
108+
log.Info().Msgf("wrote file %s/%s", flagOutdir, privKeyPath)
108109

109-
err = common.WriteText(fmt.Sprintf(model.PathSecretsEncryptionKey, nodeInfo.NodeID), flagOutdir, secretsDBKey)
110+
secretsKeyPath := fmt.Sprintf(model.PathSecretsEncryptionKey, nodeInfo.NodeID)
111+
err = common.WriteText(secretsKeyPath, flagOutdir, secretsDBKey)
110112
if err != nil {
111113
log.Fatal().Err(err).Msg("failed to write file")
112114
}
113-
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathSecretsEncryptionKey)
115+
log.Info().Msgf("wrote file %s/%s", flagOutdir, secretsKeyPath)
114116

115117
public, err := nodeInfo.Public()
116118
if err != nil {
117119
log.Fatal().Err(err).Msg("could not access public keys")
118120
}
119-
err = common.WriteJSON(fmt.Sprintf(model.PathNodeInfoPub, nodeInfo.NodeID), flagOutdir, public)
121+
pubNodeInfoPath := fmt.Sprintf(model.PathNodeInfoPub, nodeInfo.NodeID)
122+
err = common.WriteJSON(pubNodeInfoPath, flagOutdir, public)
120123
if err != nil {
121124
log.Fatal().Err(err).Msg("failed to write json")
122125
}
123-
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathNodeInfoPub)
126+
log.Info().Msgf("wrote file %s/%s", flagOutdir, pubNodeInfoPath)
124127

125128
// write machine account info
126129
if role == flow.RoleCollection || role == flow.RoleConsensus {
@@ -134,11 +137,12 @@ func keyCmdRun(_ *cobra.Command, _ []string) {
134137
log.Debug().Str("address", flagAddress).Msg("assembling machine account information")
135138
// write the public key to terminal for entry in Flow Port
136139
machineAccountPriv := assembleNodeMachineAccountKey(machineKey)
137-
err = common.WriteJSON(fmt.Sprintf(model.PathNodeMachineAccountPrivateKey, nodeInfo.NodeID), flagOutdir, machineAccountPriv)
140+
privateKeyPath := fmt.Sprintf(model.PathNodeMachineAccountPrivateKey, nodeInfo.NodeID)
141+
err = common.WriteJSON(privateKeyPath, flagOutdir, machineAccountPriv)
138142
if err != nil {
139143
log.Fatal().Err(err).Msg("failed to write json")
140144
}
141-
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathNodeMachineAccountPrivateKey)
145+
log.Info().Msgf("wrote file %s/%s", flagOutdir, privateKeyPath)
142146
}
143147
}
144148

cmd/execution_builder.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ func (exeNode *ExecutionNode) LoadSyncCore(node *NodeConfig) error {
334334
func (exeNode *ExecutionNode) LoadExecutionStorage(
335335
node *NodeConfig,
336336
) error {
337+
var err error
337338
db := node.ProtocolDB
338339

339340
exeNode.events = store.NewEvents(node.Metrics.Cache, db)
@@ -342,7 +343,10 @@ func (exeNode *ExecutionNode) LoadExecutionStorage(
342343
exeNode.results = store.NewExecutionResults(node.Metrics.Cache, db)
343344
exeNode.receipts = store.NewExecutionReceipts(node.Metrics.Cache, db, exeNode.results, storage.DefaultCacheSize)
344345
exeNode.myReceipts = store.NewMyExecutionReceipts(node.Metrics.Cache, db, exeNode.receipts)
345-
exeNode.txResults = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)
346+
exeNode.txResults, err = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)
347+
if err != nil {
348+
return err
349+
}
346350
exeNode.eventsReader = exeNode.events
347351
exeNode.commitsReader = exeNode.commits
348352
exeNode.resultsReader = exeNode.results
@@ -716,7 +720,7 @@ func (exeNode *ExecutionNode) LoadExecutionDataDatastore(
716720
node *NodeConfig,
717721
) (err error) {
718722
exeNode.executionDataDatastore, err = edstorage.CreateDatastoreManager(
719-
node.Logger, exeNode.exeConf.executionDataDir, exeNode.exeConf.executionDataDBMode)
723+
node.Logger, exeNode.exeConf.executionDataDir)
720724
if err != nil {
721725
return fmt.Errorf("could not create execution data datastore manager: %w", err)
722726
}

cmd/execution_config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ type ExecutionConfig struct {
5959
importCheckpointWorkerCount int
6060
transactionExecutionMetricsEnabled bool
6161
transactionExecutionMetricsBufferSize uint
62-
executionDataDBMode string
6362
scheduleCallbacksEnabled bool
6463

6564
computationConfig computation.ComputationConfig
@@ -136,10 +135,12 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
136135
flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap")
137136
flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics")
138137
flags.UintVar(&exeConf.transactionExecutionMetricsBufferSize, "tx-execution-metrics-buffer-size", 200, "buffer size for transaction execution metrics. The buffer size is the number of blocks that are kept in memory by the metrics provider engine")
139-
flags.StringVar(&exeConf.executionDataDBMode,
138+
139+
var exeConfExecutionDataDBMode string
140+
flags.StringVar(&exeConfExecutionDataDBMode,
140141
"execution-data-db",
141142
execution_data.ExecutionDataDBModePebble.String(),
142-
"[experimental] the DB type for execution datastore. One of [badger, pebble]")
143+
"[deprecated] the DB type for execution datastore. it's been deprecated")
143144

144145
flags.BoolVar(&exeConf.onflowOnlyLNs, "temp-onflow-only-lns", false, "do not use unless required. forces node to only request collections from onflow collection nodes")
145146
flags.BoolVar(&exeConf.enableStorehouse, "enable-storehouse", false, "enable storehouse to store registers on disk, default is false")

cmd/observer/node_builder/observer_builder.go

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ type ObserverServiceConfig struct {
154154
logTxTimeToSealed bool
155155
executionDataSyncEnabled bool
156156
executionDataIndexingEnabled bool
157-
executionDataDBMode string
158157
executionDataPrunerHeightRangeTarget uint64
159158
executionDataPrunerThreshold uint64
160159
executionDataPruningInterval time.Duration
@@ -234,7 +233,6 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
234233
logTxTimeToSealed: false,
235234
executionDataSyncEnabled: false,
236235
executionDataIndexingEnabled: false,
237-
executionDataDBMode: execution_data.ExecutionDataDBModePebble.String(),
238236
executionDataPrunerHeightRangeTarget: 0,
239237
executionDataPrunerThreshold: pruner.DefaultThreshold,
240238
executionDataPruningInterval: pruner.DefaultPruningInterval,
@@ -712,10 +710,9 @@ func (builder *ObserverServiceBuilder) extraFlags() {
712710
flags.BoolVar(&builder.localServiceAPIEnabled, "local-service-api-enabled", defaultConfig.localServiceAPIEnabled, "whether to use local indexed data for api queries")
713711
flags.StringVar(&builder.registersDBPath, "execution-state-dir", defaultConfig.registersDBPath, "directory to use for execution-state database")
714712
flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file")
715-
flags.StringVar(&builder.executionDataDBMode,
716-
"execution-data-db",
717-
defaultConfig.executionDataDBMode,
718-
"[experimental] the DB type for execution datastore. One of [badger, pebble]")
713+
714+
var builderExecutionDataDBMode string
715+
flags.StringVar(&builderExecutionDataDBMode, "execution-data-db", "pebble", "[deprecated] the DB type for execution datastore.")
719716

720717
// Execution data pruner
721718
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget,
@@ -1110,7 +1107,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
11101107
var execDataDistributor *edrequester.ExecutionDataDistributor
11111108
var execDataCacheBackend *herocache.BlockExecutionData
11121109
var executionDataStoreCache *execdatacache.ExecutionDataCache
1113-
var executionDataDBMode execution_data.ExecutionDataDBMode
11141110

11151111
// setup dependency chain to ensure indexer starts after the requester
11161112
requesterDependable := module.NewProxiedReadyDoneAware()
@@ -1129,20 +1125,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
11291125
return err
11301126
}
11311127

1132-
executionDataDBMode, err = execution_data.ParseExecutionDataDBMode(builder.executionDataDBMode)
1128+
builder.ExecutionDatastoreManager, err = edstorage.NewPebbleDatastoreManager(
1129+
node.Logger.With().Str("pebbledb", "endata").Logger(),
1130+
datastoreDir, nil)
11331131
if err != nil {
1134-
return fmt.Errorf("could not parse execution data DB mode: %w", err)
1135-
}
1136-
1137-
if executionDataDBMode == execution_data.ExecutionDataDBModePebble {
1138-
builder.ExecutionDatastoreManager, err = edstorage.NewPebbleDatastoreManager(
1139-
node.Logger.With().Str("pebbledb", "endata").Logger(),
1140-
datastoreDir, nil)
1141-
if err != nil {
1142-
return fmt.Errorf("could not create PebbleDatastoreManager for execution data: %w", err)
1143-
}
1144-
} else {
1145-
return fmt.Errorf("datastore with badger has been deprecated, please use pebble instead")
1132+
return fmt.Errorf("could not create PebbleDatastoreManager for execution data: %w", err)
11461133
}
11471134
ds = builder.ExecutionDatastoreManager.Datastore()
11481135

@@ -1921,7 +1908,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
19211908
// handles block-related operations.
19221909
blockTracker, err := subscriptiontracker.NewBlockTracker(
19231910
node.State,
1924-
builder.FinalizedRootBlock.Height,
1911+
builder.SealedRootBlock.Height,
19251912
node.Storage.Headers,
19261913
broadcaster,
19271914
)

cmd/util/cmd/read-badger/cmd/transaction_results.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ var transactionResultsCmd = &cobra.Command{
2525
Short: "get transaction-result by block ID",
2626
RunE: func(cmd *cobra.Command, args []string) error {
2727
return common.WithStorage(flagDatadir, func(db storage.DB) error {
28-
transactionResults := store.NewTransactionResults(metrics.NewNoopCollector(), db, 1)
28+
transactionResults, err := store.NewTransactionResults(metrics.NewNoopCollector(), db, 1)
29+
if err != nil {
30+
return err
31+
}
2932
storages := common.InitStorages(db)
3033
log.Info().Msgf("got flag block id: %s", flagBlockID)
3134
blockID, err := flow.HexStringToIdentifier(flagBlockID)

cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ func runE(*cobra.Command, []string) error {
6767

6868
metrics := &metrics.NoopCollector{}
6969

70-
transactionResults := store.NewTransactionResults(metrics, db, badger.DefaultCacheSize)
70+
transactionResults, err := store.NewTransactionResults(metrics, db, badger.DefaultCacheSize)
71+
if err != nil {
72+
return err
73+
}
7174
commits := store.NewCommits(metrics, db)
7275
results := store.NewExecutionResults(metrics, db)
7376
receipts := store.NewExecutionReceipts(metrics, db, results, badger.DefaultCacheSize)

cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func TestReExecuteBlock(t *testing.T) {
4141
all := store.InitAll(metrics, db)
4242
headers := all.Headers
4343
blocks := all.Blocks
44-
txResults := store.NewTransactionResults(metrics, db, store.DefaultCacheSize)
44+
txResults, err := store.NewTransactionResults(metrics, db, store.DefaultCacheSize)
45+
require.NoError(t, err)
4546
commits := store.NewCommits(metrics, db)
4647
storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), store.DefaultCacheSize)
4748
chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), storedChunkDataPacks, store.NewCollections(db, store.NewTransactions(metrics, db)), store.DefaultCacheSize)
@@ -203,7 +204,8 @@ func TestReExecuteBlockWithDifferentResult(t *testing.T) {
203204
collections := store.NewCollections(db, transactions)
204205
storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), bstorage.DefaultCacheSize)
205206
chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), storedChunkDataPacks, collections, bstorage.DefaultCacheSize)
206-
txResults := store.NewTransactionResults(metrics, db, bstorage.DefaultCacheSize)
207+
txResults, err := store.NewTransactionResults(metrics, db, bstorage.DefaultCacheSize)
208+
require.NoError(t, err)
207209

208210
err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
209211
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {

engine/access/access_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package access_test
33
import (
44
"context"
55
"encoding/json"
6-
"os"
76
"testing"
87

98
"github.com/cockroachdb/pebble/v2"
@@ -93,7 +92,7 @@ func TestAccess(t *testing.T) {
9392

9493
func (suite *Suite) SetupTest() {
9594
suite.lockManager = storage.NewTestingLockManager()
96-
suite.log = zerolog.New(os.Stderr)
95+
suite.log = unittest.Logger()
9796
suite.net = new(mocknetwork.EngineRegistry)
9897
suite.state = new(protocol.State)
9998
suite.finalSnapshot = new(protocol.Snapshot)
@@ -774,6 +773,10 @@ func (suite *Suite) TestGetSealedTransaction() {
774773
ctx := irrecoverable.NewMockSignalerContext(suite.T(), background)
775774
ingestEng.Start(ctx)
776775
<-ingestEng.Ready()
776+
defer func() {
777+
cancel()
778+
<-ingestEng.Done()
779+
}()
777780

778781
// 2. Ingest engine was notified by the follower engine about a new block.
779782
// Follower engine --> Ingest engine

engine/access/handle_irrecoverable_state_test.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"io"
7-
"os"
87
"testing"
98
"time"
109

@@ -45,10 +44,12 @@ import (
4544
// IrrecoverableStateTestSuite tests that Access node indicate an inconsistent or corrupted node state
4645
type IrrecoverableStateTestSuite struct {
4746
suite.Suite
47+
log zerolog.Logger
48+
cancel context.CancelFunc
49+
4850
state *protocol.State
4951
snapshot *protocol.Snapshot
5052
epochQuery *protocol.EpochQuery
51-
log zerolog.Logger
5253
net *mocknetwork.EngineRegistry
5354
request *module.Requester
5455
collClient *accessmock.AccessAPIClient
@@ -72,7 +73,7 @@ type IrrecoverableStateTestSuite struct {
7273
}
7374

7475
func (suite *IrrecoverableStateTestSuite) SetupTest() {
75-
suite.log = zerolog.New(os.Stdout)
76+
suite.log = unittest.Logger()
7677
suite.net = mocknetwork.NewEngineRegistry(suite.T())
7778
suite.state = protocol.NewState(suite.T())
7879
suite.snapshot = protocol.NewSnapshot(suite.T())
@@ -186,13 +187,17 @@ func (suite *IrrecoverableStateTestSuite) SetupTest() {
186187
assert.NoError(suite.T(), err)
187188

188189
err = fmt.Errorf("inconsistent node's state")
190+
191+
ctx, cancel := context.WithCancel(context.Background())
192+
suite.cancel = cancel
193+
189194
signCtxErr := irrecoverable.NewExceptionf("failed to lookup sealed header: %w", err)
190-
ctx := irrecoverable.NewMockSignalerContextExpectError(suite.T(), context.Background(), signCtxErr)
195+
signalCtx := irrecoverable.NewMockSignalerContextExpectError(suite.T(), ctx, signCtxErr)
191196

192-
suite.rpcEng.Start(ctx)
197+
suite.rpcEng.Start(signalCtx)
193198

194-
suite.secureGrpcServer.Start(ctx)
195-
suite.unsecureGrpcServer.Start(ctx)
199+
suite.secureGrpcServer.Start(signalCtx)
200+
suite.unsecureGrpcServer.Start(signalCtx)
196201

197202
// wait for the servers to startup
198203
unittest.AssertClosesBefore(suite.T(), suite.secureGrpcServer.Ready(), 2*time.Second)
@@ -202,6 +207,13 @@ func (suite *IrrecoverableStateTestSuite) SetupTest() {
202207
unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Ready(), 2*time.Second)
203208
}
204209

210+
func (suite *IrrecoverableStateTestSuite) TearDownTest() {
211+
suite.cancel()
212+
unittest.AssertClosesBefore(suite.T(), suite.secureGrpcServer.Done(), 2*time.Second)
213+
unittest.AssertClosesBefore(suite.T(), suite.unsecureGrpcServer.Done(), 2*time.Second)
214+
unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Done(), 2*time.Second)
215+
}
216+
205217
func TestIrrecoverableState(t *testing.T) {
206218
suite.Run(t, new(IrrecoverableStateTestSuite))
207219
}

0 commit comments

Comments
 (0)