diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 767975cae8e..5439fda1205 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -762,8 +762,12 @@ func (exeNode *ExecutionNode) LoadExecutionState( } return nil }) + + chunkDB := pebbleimpl.ToDB(chunkDataPackDB) + storedChunkDataPacks := store.NewStoredChunkDataPacks( + node.Metrics.Cache, chunkDB, exeNode.exeConf.chunkDataPackCacheSize) chunkDataPacks := store.NewChunkDataPacks(node.Metrics.Cache, - pebbleimpl.ToDB(chunkDataPackDB), exeNode.collections, exeNode.exeConf.chunkDataPackCacheSize) + chunkDB, storedChunkDataPacks, exeNode.collections, exeNode.exeConf.chunkDataPackCacheSize) getLatestFinalized := func() (uint64, error) { final, err := node.State.Final().Head() diff --git a/cmd/util/cmd/read-badger/cmd/chunk_data_pack.go b/cmd/util/cmd/read-badger/cmd/chunk_data_pack.go index 72dcb9aeadb..3cf522851f4 100644 --- a/cmd/util/cmd/read-badger/cmd/chunk_data_pack.go +++ b/cmd/util/cmd/read-badger/cmd/chunk_data_pack.go @@ -35,8 +35,9 @@ var chunkDataPackCmd = &cobra.Command{ metrics := metrics.NewNoopCollector() collections := store.NewCollections(db, store.NewTransactions(metrics, db)) + storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, db, 1) chunkDataPacks := store.NewChunkDataPacks(metrics, - db, collections, 1) + db, storedChunkDataPacks, collections, 1) log.Info().Msgf("getting chunk data pack by chunk id: %v", chunkID) chunkDataPack, err := chunkDataPacks.ByChunkID(chunkID) diff --git a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go index c2fc729fa21..65c811845eb 100644 --- a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go +++ b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go @@ -87,16 +87,14 @@ func runE(*cobra.Command, []string) error { return fmt.Errorf("could not open chunk data pack DB at %v: %w", flagChunkDataPackDir, err) } chunkDataPacksDB := pebbleimpl.ToDB(chunkDataPacksPebbleDB) - chunkDataPacks := store.NewChunkDataPacks(metrics, chunkDataPacksDB, collections, 1000) - chunkBatch := chunkDataPacksDB.NewBatch() - defer chunkBatch.Close() - - writeBatch := db.NewBatch() - defer writeBatch.Close() - - err = removeExecutionResultsFromHeight( - writeBatch, - chunkBatch, + storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, chunkDataPacksDB, 1000) + chunkDataPacks := store.NewChunkDataPacks(metrics, chunkDataPacksDB, storedChunkDataPacks, collections, 1000) + protocolDBBatch := db.NewBatch() + defer protocolDBBatch.Close() + + // collect chunk IDs to be removed + chunkIDs, err := removeExecutionResultsFromHeight( + protocolDBBatch, state, transactionResults, commits, @@ -112,12 +110,14 @@ func runE(*cobra.Command, []string) error { } // remove chunk data packs first, because otherwise the index to find chunk data pack will be removed. - err = chunkBatch.Commit() - if err != nil { - return fmt.Errorf("could not commit chunk batch at %v: %w", flagHeight, err) + if len(chunkIDs) > 0 { + _, err := chunkDataPacks.BatchRemove(chunkIDs, protocolDBBatch) + if err != nil { + return fmt.Errorf("could not remove chunk data packs at %v: %w", flagHeight, err) + } } - err = writeBatch.Commit() + err = protocolDBBatch.Commit() if err != nil { return fmt.Errorf("could not flush write batch at %v: %w", flagHeight, err) } @@ -138,11 +138,13 @@ func runE(*cobra.Command, []string) error { }) } -// use badger instances directly instead of stroage interfaces so that the interface don't -// need to include the Remove methods +// removeExecutionResultsFromHeight removes all execution results and related data +// from the specified block height onward to roll back the protocol state. +// It returns the chunk IDs removed from the protocol state DB, +// which can then be used to delete the corresponding chunk data packs from chunk +// data pack database. func removeExecutionResultsFromHeight( - writeBatch storage.Batch, - chunkBatch storage.Batch, + protocolDBBatch storage.Batch, protoState protocol.State, transactionResults storage.TransactionResults, commits storage.Commits, @@ -151,40 +153,43 @@ func removeExecutionResultsFromHeight( myReceipts storage.MyExecutionReceipts, events storage.Events, serviceEvents storage.ServiceEvents, - fromHeight uint64) error { + fromHeight uint64, +) ([]flow.Identifier, error) { log.Info().Msgf("removing results for blocks from height: %v", fromHeight) root := protoState.Params().FinalizedRoot() if fromHeight <= root.Height { - return fmt.Errorf("can only remove results for block above root block. fromHeight: %v, rootHeight: %v", fromHeight, root.Height) + return nil, fmt.Errorf("can only remove results for block above root block. fromHeight: %v, rootHeight: %v", fromHeight, root.Height) } final, err := protoState.Final().Head() if err != nil { - return fmt.Errorf("could get not finalized height: %w", err) + return nil, fmt.Errorf("could get not finalized height: %w", err) } if fromHeight > final.Height { - return fmt.Errorf("could not remove results for unfinalized height: %v, finalized height: %v", fromHeight, final.Height) + return nil, fmt.Errorf("could not remove results for unfinalized height: %v, finalized height: %v", fromHeight, final.Height) } finalRemoved := 0 total := int(final.Height-fromHeight) + 1 + var allChunkIDs []flow.Identifier // removing for finalized blocks for height := fromHeight; height <= final.Height; height++ { head, err := protoState.AtHeight(height).Head() if err != nil { - return fmt.Errorf("could not get header at height: %w", err) + return nil, fmt.Errorf("could not get header at height: %w", err) } blockID := head.ID() - err = removeForBlockID(writeBatch, chunkBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID) + chunkIDs, err := removeForBlockID(protocolDBBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID) if err != nil { - return fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err) + return nil, fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err) } + allChunkIDs = append(allChunkIDs, chunkIDs...) finalRemoved++ log.Info().Msgf("result at height %v has been removed. progress (%v/%v)", height, finalRemoved, total) @@ -193,18 +198,18 @@ func removeExecutionResultsFromHeight( // removing for pending blocks pendings, err := protoState.Final().Descendants() if err != nil { - return fmt.Errorf("could not get pending block: %w", err) + return nil, fmt.Errorf("could not get pending block: %w", err) } pendingRemoved := 0 total = len(pendings) for _, pending := range pendings { - err = removeForBlockID(writeBatch, chunkBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending) - + chunkIDs, err := removeForBlockID(protocolDBBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending) if err != nil { - return fmt.Errorf("could not remove result for pending block %v: %w", pending, err) + return nil, fmt.Errorf("could not remove result for pending block %v: %w", pending, err) } + allChunkIDs = append(allChunkIDs, chunkIDs...) pendingRemoved++ log.Info().Msgf("result for pending block %v has been removed. progress (%v/%v) ", pending, pendingRemoved, total) @@ -213,15 +218,14 @@ func removeExecutionResultsFromHeight( log.Info().Msgf("removed height from %v. removed for %v finalized blocks, and %v pending blocks", fromHeight, finalRemoved, pendingRemoved) - return nil + return allChunkIDs, nil } // removeForBlockID remove block execution related data for a given block. // All data to be removed will be removed in a batch write. // It bubbles up any error encountered func removeForBlockID( - writeBatch storage.Batch, - chunkBatch storage.Batch, + protocolDBBatch storage.Batch, commits storage.Commits, transactionResults storage.TransactionResults, results storage.ExecutionResults, @@ -230,74 +234,70 @@ func removeForBlockID( events storage.Events, serviceEvents storage.ServiceEvents, blockID flow.Identifier, -) error { +) ([]flow.Identifier, error) { result, err := results.ByBlockID(blockID) if errors.Is(err, storage.ErrNotFound) { log.Info().Msgf("result not found for block %v", blockID) - return nil + return nil, nil } if err != nil { - return fmt.Errorf("could not find result for block %v: %w", blockID, err) + return nil, fmt.Errorf("could not find result for block %v: %w", blockID, err) } + chunkIDs := make([]flow.Identifier, 0, len(result.Chunks)) for _, chunk := range result.Chunks { chunkID := chunk.ID() - // remove chunk data pack - err := chunks.BatchRemove(chunkID, chunkBatch) - if err != nil { - return fmt.Errorf("could not remove chunk id %v for block id %v: %w", chunkID, blockID, err) - } - + chunkIDs = append(chunkIDs, chunkID) } // remove commits - err = commits.BatchRemoveByBlockID(blockID, writeBatch) + err = commits.BatchRemoveByBlockID(blockID, protocolDBBatch) if err != nil { if errors.Is(err, storage.ErrNotFound) { - return fmt.Errorf("could not remove by block ID %v: %w", blockID, err) + return nil, fmt.Errorf("could not remove by block ID %v: %w", blockID, err) } log.Warn().Msgf("statecommitment not found for block %v", blockID) } // remove transaction results - err = transactionResults.BatchRemoveByBlockID(blockID, writeBatch) + err = transactionResults.BatchRemoveByBlockID(blockID, protocolDBBatch) if err != nil { - return fmt.Errorf("could not remove transaction results by BlockID %v: %w", blockID, err) + return nil, fmt.Errorf("could not remove transaction results by BlockID %v: %w", blockID, err) } // remove own execution results index - err = myReceipts.BatchRemoveIndexByBlockID(blockID, writeBatch) + err = myReceipts.BatchRemoveIndexByBlockID(blockID, protocolDBBatch) if err != nil { if !errors.Is(err, storage.ErrNotFound) { - return fmt.Errorf("could not remove own receipt by BlockID %v: %w", blockID, err) + return nil, fmt.Errorf("could not remove own receipt by BlockID %v: %w", blockID, err) } log.Warn().Msgf("own receipt not found for block %v", blockID) } // remove events - err = events.BatchRemoveByBlockID(blockID, writeBatch) + err = events.BatchRemoveByBlockID(blockID, protocolDBBatch) if err != nil { - return fmt.Errorf("could not remove events by BlockID %v: %w", blockID, err) + return nil, fmt.Errorf("could not remove events by BlockID %v: %w", blockID, err) } // remove service events - err = serviceEvents.BatchRemoveByBlockID(blockID, writeBatch) + err = serviceEvents.BatchRemoveByBlockID(blockID, protocolDBBatch) if err != nil { - return fmt.Errorf("could not remove service events by blockID %v: %w", blockID, err) + return nil, fmt.Errorf("could not remove service events by blockID %v: %w", blockID, err) } // remove execution result index - err = results.BatchRemoveIndexByBlockID(blockID, writeBatch) + err = results.BatchRemoveIndexByBlockID(blockID, protocolDBBatch) if err != nil { if !errors.Is(err, storage.ErrNotFound) { - return fmt.Errorf("could not remove result by BlockID %v: %w", blockID, err) + return nil, fmt.Errorf("could not remove result by BlockID %v: %w", blockID, err) } log.Warn().Msgf("result not found for block %v", blockID) } - return nil + return chunkIDs, nil } diff --git a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go index daab8797e86..386acefd145 100644 --- a/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go +++ b/cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "errors" "testing" "github.com/cockroachdb/pebble/v2" @@ -44,7 +45,8 @@ func TestReExecuteBlock(t *testing.T) { txResults, err := store.NewTransactionResults(metrics, db, store.DefaultCacheSize) require.NoError(t, err) commits := store.NewCommits(metrics, db) - chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), store.NewCollections(db, store.NewTransactions(metrics, db)), store.DefaultCacheSize) + storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), store.DefaultCacheSize) + chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), storedChunkDataPacks, store.NewCollections(db, store.NewTransactions(metrics, db)), store.DefaultCacheSize) results := all.Results receipts := all.Receipts myReceipts := store.NewMyExecutionReceipts(metrics, db, receipts) @@ -101,13 +103,10 @@ func TestReExecuteBlock(t *testing.T) { batch := db.NewBatch() defer batch.Close() - chunkBatch := pebbleimpl.ToDB(pdb).NewBatch() - defer chunkBatch.Close() - // remove execution results - err = removeForBlockID( + var cdpIDs []flow.Identifier + cdpIDs, err = removeForBlockID( batch, - chunkBatch, commits, txResults, results, @@ -121,9 +120,8 @@ func TestReExecuteBlock(t *testing.T) { require.NoError(t, err) // remove again, to make sure missing entires are handled properly - err = removeForBlockID( + additionalCdpIDs, err := removeForBlockID( batch, - chunkBatch, commits, txResults, results, @@ -135,21 +133,26 @@ func TestReExecuteBlock(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, chunkBatch.Commit()) + // combine chunk data pack IDs from both calls + cdpIDs = append(cdpIDs, additionalCdpIDs...) + require.NoError(t, storedChunkDataPacks.Remove(cdpIDs)) err2 := batch.Commit() require.NoError(t, err2) + // verify that chunk data packs are no longer in stored chunk data pack database + for _, cdpID := range cdpIDs { + _, err := storedChunkDataPacks.ByID(cdpID) + require.Error(t, err) + require.True(t, errors.Is(err, storage.ErrNotFound)) + } + batch = db.NewBatch() defer batch.Close() - chunkBatch = pebbleimpl.ToDB(pdb).NewBatch() - defer chunkBatch.Close() - // remove again after flushing - err = removeForBlockID( + cdpIDs, err = removeForBlockID( batch, - chunkBatch, commits, txResults, results, @@ -160,12 +163,18 @@ func TestReExecuteBlock(t *testing.T) { header.ID(), ) require.NoError(t, err) - - require.NoError(t, chunkBatch.Commit()) + require.NoError(t, storedChunkDataPacks.Remove(cdpIDs)) err2 = batch.Commit() require.NoError(t, err2) + // verify that chunk data packs are no longer in stored chunk data pack database + for _, cdpID := range cdpIDs { + _, err := storedChunkDataPacks.ByID(cdpID) + require.Error(t, err) + require.True(t, errors.Is(err, storage.ErrNotFound)) + } + // re execute result err = es.SaveExecutionResults(context.Background(), computationResult) require.NoError(t, err) @@ -201,7 +210,8 @@ func TestReExecuteBlockWithDifferentResult(t *testing.T) { serviceEvents := store.NewServiceEvents(metrics, db) transactions := store.NewTransactions(metrics, db) collections := store.NewCollections(db, transactions) - chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), collections, bstorage.DefaultCacheSize) + storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), bstorage.DefaultCacheSize) + chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), storedChunkDataPacks, collections, bstorage.DefaultCacheSize) txResults, err := store.NewTransactionResults(metrics, db, bstorage.DefaultCacheSize) require.NoError(t, err) @@ -262,13 +272,9 @@ func TestReExecuteBlockWithDifferentResult(t *testing.T) { batch := db.NewBatch() defer batch.Close() - chunkBatch := db.NewBatch() - defer chunkBatch.Close() - // remove execution results - err = removeForBlockID( + cdpIDs, err := removeForBlockID( batch, - chunkBatch, commits, txResults, results, @@ -280,20 +286,23 @@ func TestReExecuteBlockWithDifferentResult(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, chunkBatch.Commit()) + require.NoError(t, storedChunkDataPacks.Remove(cdpIDs)) err2 := batch.Commit() require.NoError(t, err2) + // verify that chunk data packs are no longer in stored chunk data pack database + for _, cdpID := range cdpIDs { + _, err := storedChunkDataPacks.ByID(cdpID) + require.Error(t, err) + require.True(t, errors.Is(err, storage.ErrNotFound)) + } + batch = db.NewBatch() defer batch.Close() - chunkBatch = db.NewBatch() - defer chunkBatch.Close() - // remove again to test for duplicates handling - err = removeForBlockID( + additionalCdpIDs, err := removeForBlockID( batch, - chunkBatch, commits, txResults, results, @@ -305,11 +314,20 @@ func TestReExecuteBlockWithDifferentResult(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, chunkBatch.Commit()) + // combine chunk data pack IDs from both calls + cdpIDs = append(cdpIDs, additionalCdpIDs...) + require.NoError(t, storedChunkDataPacks.Remove(cdpIDs)) err2 = batch.Commit() require.NoError(t, err2) + // verify that chunk data packs are no longer in stored chunk data pack database + for _, cdpID := range cdpIDs { + _, err := storedChunkDataPacks.ByID(cdpID) + require.Error(t, err) + require.True(t, errors.Is(err, storage.ErrNotFound)) + } + computationResult2 := testutil.ComputationResultFixture(t) computationResult2.ExecutableBlock = executableBlock computationResult2.ExecutionReceipt.ExecutionResult.BlockID = blockID diff --git a/engine/execution/pruner/core_test.go b/engine/execution/pruner/core_test.go index 16d1eaa5540..20c0b49b5cb 100644 --- a/engine/execution/pruner/core_test.go +++ b/engine/execution/pruner/core_test.go @@ -40,7 +40,8 @@ func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) { transactions := store.NewTransactions(metrics, db) collections := store.NewCollections(db, transactions) - chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), collections, 1000) + storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, db, 1000) + chunkDataPacks := store.NewChunkDataPacks(metrics, db, storedChunkDataPacks, collections, 1000) lastSealedHeight := 30 lastFinalizedHeight := lastSealedHeight + 2 // 2 finalized but unsealed @@ -75,8 +76,14 @@ func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) { require.NoError(t, err) require.NoError(t, results.Store(chunk.Result)) require.NoError(t, results.Index(chunk.Result.BlockID, chunk.Result.ID())) - require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error { - return chunkDataPacks.StoreByChunkID(lctx, []*flow.ChunkDataPack{chunk.ChunkDataPack}) + require.NoError(t, unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + storeFunc, err := chunkDataPacks.Store([]*flow.ChunkDataPack{chunk.ChunkDataPack}) + if err != nil { + return err + } + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return storeFunc(lctx, rw) + }) })) _, storeErr := collections.Store(chunk.ChunkDataPack.Collection) require.NoError(t, storeErr) diff --git a/engine/execution/state/state.go b/engine/execution/state/state.go index 47a7accb8fc..7ad41b8efb7 100644 --- a/engine/execution/state/state.go +++ b/engine/execution/state/state.go @@ -346,7 +346,7 @@ func (s *state) StateCommitmentByBlockID(blockID flow.Identifier) (flow.StateCom func (s *state) ChunkDataPackByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { chunkDataPack, err := s.chunkDataPacks.ByChunkID(chunkID) if err != nil { - return nil, fmt.Errorf("could not retrieve chunk data pack: %w", err) + return nil, fmt.Errorf("could not retrieve chunk data pack for chunk ID %v: %w", chunkID, err) } return chunkDataPack, nil @@ -363,6 +363,9 @@ func (s *state) GetExecutionResultID(ctx context.Context, blockID flow.Identifie return result.ID(), nil } +// SaveExecutionResults saves all data related to the execution of a block. +// It is concurrent safe because chunk data packs store is conflict-free (storing data by hash), and protocol data requires a lock to store, which will be synchronized. +// It returns [storage.ErrDataMismatch] if there is data already stored for the same block ID but with different content. func (s *state) SaveExecutionResults( ctx context.Context, result *execution.ComputationResult, @@ -410,31 +413,46 @@ func (s *state) saveExecutionResults( return fmt.Errorf("can not retrieve chunk data packs: %w", err) } - // Acquire both locks to ensure it's concurrent safe when inserting the execution results and chunk data packs. - return storage.WithLocks(s.lockManager, []string{storage.LockInsertOwnReceipt, storage.LockInsertChunkDataPack}, func(lctx lockctx.Context) error { - err := s.chunkDataPacks.StoreByChunkID(lctx, chunks) - if err != nil { - return fmt.Errorf("can not store multiple chunk data pack: %w", err) - } - - // Save entire execution result (including all chunk data packs) within one batch to minimize - // the number of database interactions. + // Within the following `Store` call, the chunk data packs themselves are going to be persisted into their + // dedicated database. However, we have not yet persisted that this execution node is committing to the + // result represented by the chunk data packs. Populating the index from chunk ID to chunk data pack ID + // in the protocol database (signifying the node's slashable commitment to the respective result) is + // done by the functor returned by `Store`. The functor's is invoked as part of the atomic batch update + // of the protocol database below. + storeChunkDataPacksFunc, err := s.chunkDataPacks.Store(chunks) + if err != nil { + return fmt.Errorf("can not store chunk data packs for block ID: %v: %w", blockID, err) + } + + return storage.WithLocks(s.lockManager, []string{ + storage.LockIndexChunkDataPackByChunkID, + storage.LockInsertOwnReceipt, + }, func(lctx lockctx.Context) error { + // The batch update writes all execution result data (except chunk data pack!) atomically. + // Since the chunk data pack itself was already stored in a separate database (s.chunkDataPacks) + // during the previous step, this step stores only the mapping between chunk ID + // and chunk data pack ID together with the execution result data in the same batch. + // + // This design guarantees consistency in two scenarios: + // + // Case 1: If the batch update is interrupted, the mapping has not yet been saved. + // Later, if we attempt to store another execution result that references a different + // chunk data pack but the same chunk ID, there is no conflict, because no previous mapping + // exists. By convention, a node should only share information once it has persisted its + // commitment in the database. Therefore, if the database write was interrupted, none of the + // information is stored and no binding commitment to a different result could have been made. + // + // Case 2: If the batch update succeeds, the mapping is saved. Later, if we + // attempt to store another execution result that references a different + // chunk data pack with the same chunk ID, the conflict is detected, preventing + // overwriting of the previously stored mapping. return s.db.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error { - batch.AddCallback(func(err error) { - // Rollback if an error occurs during batch operations - // Chunk data packs are saved in a separate database, there is a chance - // that execution result was failed to save, but chunk data packs was saved and - // didnt get removed. - // TODO(leo): when retrieving chunk data packs, we need to add a check to ensure the block - // has been executed before returning chunk data packs - if err != nil { - chunkIDs := make([]flow.Identifier, 0, len(chunks)) - for _, chunk := range chunks { - chunkIDs = append(chunkIDs, chunk.ChunkID) - } - _ = s.chunkDataPacks.Remove(chunkIDs) - } - }) + // store the ChunkID -> StoredChunkDataPack.ID() mapping + // in s.db (protocol database along with other execution data in a single batch) + err := storeChunkDataPacksFunc(lctx, batch) + if err != nil { + return fmt.Errorf("cannot store chunk data packs: %w", err) + } err = s.events.BatchStore(blockID, []flow.EventsList{result.AllEvents()}, batch) if err != nil { diff --git a/engine/execution/state/state_storehouse_test.go b/engine/execution/state/state_storehouse_test.go index 59b618e4031..cae0abeae3a 100644 --- a/engine/execution/state/state_storehouse_test.go +++ b/engine/execution/state/state_storehouse_test.go @@ -61,7 +61,7 @@ func prepareStorehouseTest(f func(t *testing.T, es state.ExecutionState, l *ledg txResults := storagemock.NewTransactionResults(t) txResults.On("BatchStore", mock.Anything, mock.Anything, mock.Anything).Return(nil) chunkDataPacks := storagemock.NewChunkDataPacks(t) - chunkDataPacks.On("StoreByChunkID", mock.Anything, mock.Anything).Return(nil) + chunkDataPacks.On("Store", mock.Anything).Return(func(lockctx.Proof, storage.ReaderBatchWriter) error { return nil }, nil) results := storagemock.NewExecutionResults(t) results.On("BatchIndex", mock.Anything, mock.Anything, mock.Anything).Return(nil) myReceipts := storagemock.NewMyExecutionReceipts(t) diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index c0f969b43c9..37717cf8fbc 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -551,7 +551,8 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ide txResultStorage, err := store.NewTransactionResults(node.Metrics, db, storagebadger.DefaultCacheSize) require.NoError(t, err) commitsStorage := store.NewCommits(node.Metrics, db) - chunkDataPackStorage := store.NewChunkDataPacks(node.Metrics, db, collectionsStorage, 100) + storedChunkDataPacks := store.NewStoredChunkDataPacks(node.Metrics, db, 100) + chunkDataPackStorage := store.NewChunkDataPacks(node.Metrics, db, storedChunkDataPacks, collectionsStorage, 100) results := store.NewExecutionResults(node.Metrics, db) receipts := store.NewExecutionReceipts(node.Metrics, db, results, storagebadger.DefaultCacheSize) myReceipts := store.NewMyExecutionReceipts(node.Metrics, db, receipts) diff --git a/engine/verification/requester/qualifier.go b/engine/verification/requester/qualifier.go index 7ec62a19742..d0018feb22d 100644 --- a/engine/verification/requester/qualifier.go +++ b/engine/verification/requester/qualifier.go @@ -1,6 +1,7 @@ package requester import ( + "fmt" "time" ) @@ -8,18 +9,27 @@ import ( // the last time it has been requested, and the duration at which the chunk can be retried after, returns either true or false. // // The return value of this function determines whether the chunk request can be dispatched to the network. -type RequestQualifierFunc func(attempts uint64, lastRequested time.Time, retryAfter time.Duration) bool +type RequestQualifierFunc func(attempts uint64, lastRequested time.Time, retryAfter time.Duration) (requestQualifies bool, reasonMsg string) // MaxAttemptQualifier only qualifies a chunk request if it has been requested less than the specified number of attempts. func MaxAttemptQualifier(maxAttempts uint64) RequestQualifierFunc { - return func(attempts uint64, _ time.Time, _ time.Duration) bool { - return attempts < maxAttempts + return func(attempts uint64, _ time.Time, _ time.Duration) (bool, string) { + qualified := attempts < maxAttempts + if qualified { + return true, "" + } + return false, fmt.Sprintf("max attempts (%d) reached: %d", maxAttempts, attempts) } } // RetryAfterQualifier only qualifies a chunk request if its retryAfter duration has been elapsed since the last time this // request has been dispatched. -func RetryAfterQualifier(_ uint64, lastAttempt time.Time, retryAfter time.Duration) bool { +func RetryAfterQualifier(_ uint64, lastAttempt time.Time, retryAfter time.Duration) (bool, string) { nextTry := lastAttempt.Add(retryAfter) - return nextTry.Before(time.Now()) + qualified := nextTry.Before(time.Now()) + if qualified { + return true, "" + } + return false, fmt.Sprintf("retry after not elapsed: next try at %s, in %v", + nextTry.Format(time.RFC3339), time.Until(nextTry)) } diff --git a/engine/verification/requester/requester.go b/engine/verification/requester/requester.go index 0d166dbc63c..938e1752dad 100644 --- a/engine/verification/requester/requester.go +++ b/engine/verification/requester/requester.go @@ -298,9 +298,9 @@ func (e *Engine) handleChunkDataPackRequest(ctx context.Context, request *verifi return 0 } - qualified := e.canDispatchRequest(request.ChunkID) + qualified, reason := e.canDispatchRequest(request.ChunkID) if !qualified { - lg.Debug().Msg("chunk data pack request is not qualified for dispatching at this round") + lg.Debug().Msg("chunk data pack request is not qualified for dispatching at this round: " + reason) return 0 } @@ -351,10 +351,10 @@ func (e *Engine) requestChunkDataPack(request *verification.ChunkDataPackRequest } // canDispatchRequest returns whether chunk data request for this chunk ID can be dispatched. -func (e *Engine) canDispatchRequest(chunkID flow.Identifier) bool { +func (e *Engine) canDispatchRequest(chunkID flow.Identifier) (canDispatch bool, reasonMsg string) { attempts, lastAttempt, retryAfter, exists := e.pendingRequests.RequestHistory(chunkID) if !exists { - return false + return false, fmt.Sprintf("not found in mempool for chunk id: %s", chunkID) } return e.reqQualifierFunc(attempts, lastAttempt, retryAfter) diff --git a/engine/verification/verifier/verifiers.go b/engine/verification/verifier/verifiers.go index afb49aa401a..8119c2be17e 100644 --- a/engine/verification/verifier/verifiers.go +++ b/engine/verification/verifier/verifiers.go @@ -253,8 +253,9 @@ func initStorages( if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("could not open chunk data pack DB: %w", err) } + storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics.NewNoopCollector(), pebbleimpl.ToDB(chunkDataPackDB), 1000) chunkDataPacks := store.NewChunkDataPacks(metrics.NewNoopCollector(), - pebbleimpl.ToDB(chunkDataPackDB), storages.Collections, 1000) + pebbleimpl.ToDB(chunkDataPackDB), storedChunkDataPacks, storages.Collections, 1000) verifier := makeVerifier(log.Logger, chainID, storages.Headers, transactionFeesDisabled, scheduledCallbacksEnabled) closer := func() error { diff --git a/model/flow/chunk.go b/model/flow/chunk.go index f54caf754e5..76064753847 100644 --- a/model/flow/chunk.go +++ b/model/flow/chunk.go @@ -129,6 +129,39 @@ func (ch *Chunk) ID() Identifier { return MakeID(ch) } +// ChunkDataPackHeader is a reduced representation of ChunkDataPack. In a nutshell, we substitute +// the larger [ChunkDataPack.Proof] and [ChunkDataPack.Collection] with their collision-resistant hashes. +// Note, ChunkDataPackHeader.ID() is the same as ChunkDataPack.ID(). +// +//structwrite:immutable - mutations allowed only within the constructor +type ChunkDataPackHeader struct { + ChunkID Identifier // ID of the chunk this data pack is for + StartState StateCommitment // commitment for starting state + Proof Identifier // Hash of the proof for all registers touched (read or written) during the chunk execution + Collection Identifier // ID of collection executed in this chunk; [flow.ZeroID] for system chunk + + // ExecutionDataRoot is the root data structure of an execution_data.BlockExecutionData. + // It contains the necessary information for a verification node to validate that the + // BlockExecutionData produced is valid. + ExecutionDataRoot BlockExecutionDataRoot +} + +// NewChunkDataPackHeader instantiates an "immutable" ChunkDataPackHeader. +// The `CollectionID` field is set to [flow.ZeroID] for system chunks. +func NewChunkDataPackHeader(ChunkID Identifier, StartState StateCommitment, ProofID Identifier, CollectionID Identifier, ExecutionDataRoot BlockExecutionDataRoot) *ChunkDataPackHeader { + return &ChunkDataPackHeader{ + ChunkID: ChunkID, + StartState: StartState, + Proof: ProofID, + Collection: CollectionID, + ExecutionDataRoot: ExecutionDataRoot, + } +} + +func (c *ChunkDataPackHeader) ID() Identifier { + return MakeID(c) +} + // ChunkDataPack holds all register touches (any read, or write). // // Note that we have to include merkle paths as storage proof for all registers touched (read or written) for @@ -146,11 +179,13 @@ func (ch *Chunk) ID() Identifier { // during the execution of the chunk. // Register proofs order must not be correlated to the order of register reads during // the chunk execution in order to enforce the SPoCK secret high entropy. +// +//structwrite:immutable - mutations allowed only within the constructor type ChunkDataPack struct { ChunkID Identifier // ID of the chunk this data pack is for StartState StateCommitment // commitment for starting state Proof StorageProof // proof for all registers touched (read or written) during the chunk execution - Collection *Collection // collection executed in this chunk + Collection *Collection // collection executed in this chunk; nil for system chunk // ExecutionDataRoot is the root data structure of an execution_data.BlockExecutionData. // It contains the necessary information for a verification node to validate that the @@ -169,8 +204,9 @@ type ChunkDataPack struct { // a trusted ChunkDataPack using NewChunkDataPack constructor. type UntrustedChunkDataPack ChunkDataPack -// NewChunkDataPack returns an initialized chunk data pack. -// Construction ChunkDataPack allowed only within the constructor. +// NewChunkDataPack converts a chunk data pack from an untrusted source +// into its canonical representation. Here, basic structural validation is performed. +// Construction of ChunkDataPacks is ONLY allowed via THIS CONSTRUCTOR. // // All errors indicate a valid ChunkDataPack cannot be constructed from the input. func NewChunkDataPack(untrusted UntrustedChunkDataPack) (*ChunkDataPack, error) { @@ -205,7 +241,14 @@ func NewChunkDataPack(untrusted UntrustedChunkDataPack) (*ChunkDataPack, error) // ID returns a collision-resistant hash of the ChunkDataPack struct. func (c *ChunkDataPack) ID() Identifier { - return MakeID(c) + var collectionID Identifier + if c.Collection != nil { + collectionID = c.Collection.ID() + } else { + collectionID = ZeroID + } + + return NewChunkDataPackHeader(c.ChunkID, c.StartState, MakeID(c.Proof), collectionID, c.ExecutionDataRoot).ID() } // TODO: This is the basic version of the list, we need to substitute it with something like Merkle tree at some point @@ -365,12 +408,10 @@ func stringsToCids(strs []string) ([]cid.Cid, error) { // Equals returns true if and only if receiver BlockExecutionDataRoot is equal to the `other`. func (b BlockExecutionDataRoot) Equals(other BlockExecutionDataRoot) bool { - // Compare BlockID fields if b.BlockID != other.BlockID { return false } - // Compare ChunkExecutionDataIDs slices if len(b.ChunkExecutionDataIDs) != len(other.ChunkExecutionDataIDs) { return false } diff --git a/model/flow/chunk_test.go b/model/flow/chunk_test.go index 2b472fd96e0..6455f1c79bd 100644 --- a/model/flow/chunk_test.go +++ b/model/flow/chunk_test.go @@ -173,7 +173,7 @@ func TestChunkDataPackMalleability(t *testing.T) { // // 9. Empty ExecutionDataRoot.ChunkExecutionDataIDs: // - Ensures an error is returned when ChunkExecutionDataIDs is empty. -func TestNewChunkDataPack(t *testing.T) { +func TestFromUntrustedChunkDataPack(t *testing.T) { chunkID := unittest.IdentifierFixture() startState := unittest.StateCommitmentFixture() proof := []byte{0x1, 0x2} diff --git a/module/block_iterator/executor/executor_test.go b/module/block_iterator/executor/executor_test.go index 92bc9f86684..ae72b55ea6b 100644 --- a/module/block_iterator/executor/executor_test.go +++ b/module/block_iterator/executor/executor_test.go @@ -16,8 +16,8 @@ import ( "github.com/onflow/flow-go/module/block_iterator/executor" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/operation" "github.com/onflow/flow-go/storage/operation/pebbleimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -27,33 +27,39 @@ func TestExecute(t *testing.T) { lockManager := storage.NewTestingLockManager() blockCount := 10 - // prepare data - cdps := make([]*flow.ChunkDataPack, 0, blockCount) + // prepare data - create execution receipts for blocks + receipts := make([]*flow.ExecutionReceipt, 0, blockCount) bs := make([]flow.Identifier, 0, blockCount) for i := 0; i < blockCount; i++ { - cdp := unittest.ChunkDataPackFixture(unittest.IdentifierFixture()) - cdps = append(cdps, cdp) - bs = append(bs, cdp.ChunkID) + block := unittest.BlockFixture() + receipt := unittest.ReceiptForBlockFixture(block) + receipts = append(receipts, receipt) + bs = append(bs, block.ID()) } pdb := pebbleimpl.ToDB(db) - // store the chunk data packs to be pruned later - for _, cdp := range cdps { - sc := storage.ToStoredChunkDataPack(cdp) - require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error { + // create my execution receipts storage + metrics := metrics.NewNoopCollector() + results := store.NewExecutionResults(metrics, pdb) + executionReceipts := store.NewExecutionReceipts(metrics, pdb, results, 100) + myReceipts := store.NewMyExecutionReceipts(metrics, pdb, executionReceipts) + + // store the execution receipts to be pruned later + for _, receipt := range receipts { + require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertOwnReceipt, func(lctx lockctx.Context) error { return pdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - return operation.InsertChunkDataPack(lctx, rw, sc) + return myReceipts.BatchStoreMyReceipt(lctx, receipt, rw) }) })) } - // it's ok the chunk ids is used as block ids, because the iterator + // it's ok the block ids are used as identifiers, because the iterator // basically just iterate over identifiers iter := &iterator{blocks: bs} pr := &testExecutor{ executeByBlockID: func(id flow.Identifier, batch storage.ReaderBatchWriter) error { - return operation.RemoveChunkDataPack(batch.Writer(), id) + return myReceipts.BatchRemoveIndexByBlockID(id, batch) }, } @@ -61,13 +67,12 @@ func TestExecute(t *testing.T) { batchSize := uint(3) nosleep := time.Duration(0) require.NoError(t, executor.IterateExecuteAndCommitInBatch( - context.Background(), unittest.Logger(), metrics.NewNoopCollector(), iter, pr, pdb, batchSize, nosleep)) + context.Background(), unittest.Logger(), metrics, iter, pr, pdb, batchSize, nosleep)) // expect all blocks are pruned for _, b := range bs { // verify they are pruned - var c storage.StoredChunkDataPack - err := operation.RetrieveChunkDataPack(pdb.Reader(), b, &c) + _, err := myReceipts.MyReceipt(b) require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound but got %v", err) } }) @@ -79,27 +84,34 @@ func TestExecuteCanBeResumed(t *testing.T) { lockManager := storage.NewTestingLockManager() blockCount := 10 - cdps := make([]*flow.ChunkDataPack, 0, blockCount) + // prepare data - create execution receipts for blocks + receipts := make([]*flow.ExecutionReceipt, 0, blockCount) bs := make([]flow.Identifier, 0, blockCount) for i := 0; i < blockCount; i++ { - cdp := unittest.ChunkDataPackFixture(unittest.IdentifierFixture()) - cdps = append(cdps, cdp) - bs = append(bs, cdp.ChunkID) + block := unittest.BlockFixture() + receipt := unittest.ReceiptForBlockFixture(block) + receipts = append(receipts, receipt) + bs = append(bs, block.ID()) } pdb := pebbleimpl.ToDB(db) - // store the chunk data packs to be pruned later - for _, cdp := range cdps { - sc := storage.ToStoredChunkDataPack(cdp) - require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error { + // create my execution receipts storage + metrics := metrics.NewNoopCollector() + results := store.NewExecutionResults(metrics, pdb) + executionReceipts := store.NewExecutionReceipts(metrics, pdb, results, 100) + myReceipts := store.NewMyExecutionReceipts(metrics, pdb, executionReceipts) + + // store the execution receipts to be pruned later + for _, receipt := range receipts { + require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertOwnReceipt, func(lctx lockctx.Context) error { return pdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - return operation.InsertChunkDataPack(lctx, rw, sc) + return myReceipts.BatchStoreMyReceipt(lctx, receipt, rw) }) })) } - // it's ok the chunk ids is used as block ids, because the iterator + // it's ok the block ids are used as identifiers, because the iterator // basically just iterate over identifiers iter := &iterator{blocks: bs} interrupted := fmt.Errorf("interrupted") @@ -112,7 +124,7 @@ func TestExecuteCanBeResumed(t *testing.T) { if id == bs[5] { return interrupted // return sentinel error to interrupt the pruning } - return operation.RemoveChunkDataPack(batch.Writer(), id) + return myReceipts.BatchRemoveIndexByBlockID(id, batch) }, } @@ -120,24 +132,22 @@ func TestExecuteCanBeResumed(t *testing.T) { batchSize := uint(3) nosleep := time.Duration(0) err := executor.IterateExecuteAndCommitInBatch( - context.Background(), unittest.Logger(), metrics.NewNoopCollector(), iter, pruneUntilInterrupted, pdb, batchSize, nosleep) + context.Background(), unittest.Logger(), metrics, iter, pruneUntilInterrupted, pdb, batchSize, nosleep) require.True(t, errors.Is(err, interrupted), fmt.Errorf("expected %v but got %v", interrupted, err)) // expect all blocks are pruned for i, b := range bs { - // verify they are pruned - var c storage.StoredChunkDataPack - if i < 3 { // the first 3 blocks in the first batch are pruned - err = operation.RetrieveChunkDataPack(pdb.Reader(), b, &c) + _, err := myReceipts.MyReceipt(b) require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound for block %v but got %v", i, err) continue } // verify the remaining blocks are not pruned yet - require.NoError(t, operation.RetrieveChunkDataPack(pdb.Reader(), b, &c)) + _, err := myReceipts.MyReceipt(b) + require.NoError(t, err) } // now resume the pruning @@ -145,18 +155,17 @@ func TestExecuteCanBeResumed(t *testing.T) { pr := &testExecutor{ executeByBlockID: func(id flow.Identifier, batch storage.ReaderBatchWriter) error { - return operation.RemoveChunkDataPack(batch.Writer(), id) + return myReceipts.BatchRemoveIndexByBlockID(id, batch) }, } require.NoError(t, executor.IterateExecuteAndCommitInBatch( - context.Background(), unittest.Logger(), metrics.NewNoopCollector(), iterToAll, pr, pdb, batchSize, nosleep)) + context.Background(), unittest.Logger(), metrics, iterToAll, pr, pdb, batchSize, nosleep)) // verify all blocks are pruned for _, b := range bs { - var c storage.StoredChunkDataPack // the first 5 blocks are pruned - err = operation.RetrieveChunkDataPack(pdb.Reader(), b, &c) + _, err := myReceipts.MyReceipt(b) require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound but got %v", err) } }) diff --git a/module/mempool/stdmap/chunk_requests_test.go b/module/mempool/stdmap/chunk_requests_test.go index 9143d1ad76f..6d9f3a897fd 100644 --- a/module/mempool/stdmap/chunk_requests_test.go +++ b/module/mempool/stdmap/chunk_requests_test.go @@ -26,8 +26,9 @@ func TestChunkRequests_UpdateRequestHistory(t *testing.T) { expectedAttempts := 10 withUpdaterScenario(t, chunks, expectedAttempts, incUpdater, func(t *testing.T, attempts uint64, lastTried time.Time, retryAfter time.Duration) { - require.Equal(t, expectedAttempts, int(attempts)) // each chunk request should be attempted 10 times. - require.True(t, qualifier(attempts, lastTried, retryAfter)) // request should be immediately qualified for retrial. + require.Equal(t, expectedAttempts, int(attempts)) // each chunk request should be attempted 10 times. + qualified, _ := qualifier(attempts, lastTried, retryAfter) + require.True(t, qualified) // request should be immediately qualified for retrial. }) }) @@ -44,7 +45,8 @@ func TestChunkRequests_UpdateRequestHistory(t *testing.T) { require.Equal(t, expectedAttempts, int(attempts)) // each chunk request should be attempted 10 times. // request should NOT be immediately qualified for retrial due to exponential backoff. - require.True(t, !qualifier(attempts, lastTried, retryAfter)) + qualified, _ := qualifier(attempts, lastTried, retryAfter) + require.True(t, !qualified) // retryAfter should be equal to 2^(attempts-1) * minInterval. // note that after the first attempt, retry after is set to minInterval. @@ -69,7 +71,8 @@ func TestChunkRequests_UpdateRequestHistory(t *testing.T) { require.Equal(t, expectedAttempts, int(attempts)) // each chunk request should be attempted 10 times. // request should NOT be immediately qualified for retrial due to exponential backoff. - require.True(t, !qualifier(attempts, lastTried, retryAfter)) + qualified, _ := qualifier(attempts, lastTried, retryAfter) + require.True(t, !qualified) // expected retry after should be equal to the min interval, since updates should always underflow due // to the very small multiplier. @@ -91,7 +94,8 @@ func TestChunkRequests_UpdateRequestHistory(t *testing.T) { require.Equal(t, expectedAttempts, int(attempts)) // each chunk request should be attempted 10 times. // request should NOT be immediately qualified for retrial due to exponential backoff. - require.True(t, !qualifier(attempts, lastTried, retryAfter)) + qualified, _ := qualifier(attempts, lastTried, retryAfter) + require.True(t, !qualified) // expected retry after should be equal to the maxInterval, since updates should eventually overflow due // to the very small maxInterval and quite noticeable multiplier (2). diff --git a/module/metrics/labels.go b/module/metrics/labels.go index b79f3dbd785..53ca7da6307 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -128,6 +128,7 @@ const ( ResourceBlockVoteQueue = "vote_aggregator_queue" // consensus/collection node, vote aggregator ResourceTimeoutObjectQueue = "timeout_aggregator_queue" // consensus/collection node, timeout aggregator ResourceCollectionGuaranteesQueue = "ingestion_col_guarantee_queue" // consensus node, ingestion engine + ResourceChunkIDToChunkDataPackIndex = "chunk_data_pack_index" // execution node ResourceChunkDataPack = "chunk_data_pack" // execution node ResourceChunkDataPackRequests = "chunk_data_pack_request" // execution node ResourceEvents = "events" // execution node diff --git a/module/pruner/pruners/chunk_data_pack.go b/module/pruner/pruners/chunk_data_pack.go index 71517a4522a..70f8cc8734e 100644 --- a/module/pruner/pruners/chunk_data_pack.go +++ b/module/pruner/pruners/chunk_data_pack.go @@ -32,16 +32,17 @@ func (p *ChunkDataPackPruner) PruneByBlockID(blockID flow.Identifier, batchWrite return fmt.Errorf("failed to get execution result by block ID: %w", err) } + // collect all chunk IDs to remove + chunkIDs := make([]flow.Identifier, 0, len(result.Chunks)) for _, chunk := range result.Chunks { - chunkID := chunk.ID() - // remove chunk data pack - err := p.chunkDataPacks.BatchRemove(chunkID, batchWriter) - if errors.Is(err, storage.ErrNotFound) { - continue - } + chunkIDs = append(chunkIDs, chunk.ID()) + } + // remove all chunk data packs in a single batch operation + if len(chunkIDs) > 0 { + err := p.chunkDataPacks.BatchRemoveChunkDataPacksOnly(chunkIDs, batchWriter) if err != nil { - return fmt.Errorf("could not remove chunk id %v for block id %v: %w", chunkID, blockID, err) + return fmt.Errorf("could not remove chunk data packs for block id %v: %w", blockID, err) } } diff --git a/module/pruner/pruners/chunk_data_pack_test.go b/module/pruner/pruners/chunk_data_pack_test.go index f595ed30bcd..10c0d347609 100644 --- a/module/pruner/pruners/chunk_data_pack_test.go +++ b/module/pruner/pruners/chunk_data_pack_test.go @@ -26,13 +26,20 @@ func TestChunkDataPackPruner(t *testing.T) { transactions := store.NewTransactions(m, db) collections := store.NewCollections(db, transactions) byChunkIDCacheSize := uint(10) - chunks := store.NewChunkDataPacks(m, db, collections, byChunkIDCacheSize) + storedChunkDataPacks := store.NewStoredChunkDataPacks(m, db, byChunkIDCacheSize) + chunks := store.NewChunkDataPacks(m, db, storedChunkDataPacks, collections, byChunkIDCacheSize) // store the chunks cdp1, result1 := unittest.ChunkDataPacksFixtureAndResult() require.NoError(t, results.Store(result1)) - require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error { - return chunks.StoreByChunkID(lctx, cdp1) + require.NoError(t, unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + storeFunc, err := chunks.Store(cdp1) + if err != nil { + return err + } + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return storeFunc(lctx, rw) + }) })) pruner := NewChunkDataPackPruner(chunks, results) diff --git a/storage/chunk_data_packs.go b/storage/chunk_data_packs.go index 4371b27b16a..01b8e368b27 100644 --- a/storage/chunk_data_packs.go +++ b/storage/chunk_data_packs.go @@ -1,8 +1,6 @@ package storage import ( - "bytes" - "github.com/jordanschalm/lockctx" "github.com/onflow/flow-go/model/flow" @@ -11,72 +9,64 @@ import ( // ChunkDataPacks represents persistent storage for chunk data packs. type ChunkDataPacks interface { - // Store stores multiple ChunkDataPacks cs keyed by their ChunkIDs in a batch. - // No errors are expected during normal operation, but it may return generic error - StoreByChunkID(lctx lockctx.Proof, cs []*flow.ChunkDataPack) error - - // Remove removes multiple ChunkDataPacks cs keyed by their ChunkIDs in a batch. - // No errors are expected during normal operation, but it may return generic error - Remove(cs []flow.Identifier) error + // Store persists multiple ChunkDataPacks in a two-phase process: + // 1. Store chunk data packs (StoredChunkDataPack) by its hash (chunkDataPackID) in chunk data pack database. + // 2. Populate index mapping from ChunkID to chunkDataPackID in protocol database. + // + // Reasoning for two-phase approach: the chunk data pack and the other execution data are stored in different databases. + // - Chunk data pack content is stored in the chunk data pack database by its hash (ID). Conceptually, it would be possible + // to store multiple different (disagreeing) chunk data packs here. Each chunk data pack is stored using its own collision + // resistant hash as key, so different chunk data packs will be stored under different keys. So from the perspective of the + // storage layer, we _could_ in phase 1 store all known chunk data packs. However, an Execution Node may only commit to a single + // chunk data pack (or it will get slashed). This mapping from chunk ID to the ID of the chunk data pack that the Execution Node + // actually committed to is stored in the protocol database, in the following phase 2. + // - In the second phase, we populate the index mappings from ChunkID to one "distinguished" chunk data pack ID. This mapping + // is stored in the protocol database. Typically, an Execution Node uses this for indexing its own chunk data packs which it + // publicly committed to. + // + // ATOMICITY: + // [ChunkDataPacks.Store] executes phase 1 immediately, persisting the chunk data packs in their dedicated database. However, + // the index mappings in phase 2 is deferred to the caller, who must invoke the returned functor to perform phase 2. This + // approach has the following benefits: + // - Our API reflects that we are writing to two different databases here, with the chunk data pack database containing largely + // specialized data subject to pruning. In contrast, the protocol database persists the commitments a node make (subject to + // slashing). The caller receives the ability to persist this commitment in the form of the returned functor. The functor + // may be discarded by the caller without corrupting the state (if anything, we have just stored some additional chunk data + // packs). + // - The serialization and storage of the comparatively large chunk data packs is separated from the protocol database writes. + // - The locking duration of the protocol database is reduced. + // + // The Store method returns: + // - func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error: Function for populating the index mapping from chunkID + // to chunk data pack ID in the protocol database. This mapping persists that the Execution Node committed to the result + // represented by this chunk data pack. This function returns [storage.ErrDataMismatch] when a _different_ chunk data pack + // ID for the same chunk ID has already been stored (changing which result an execution Node committed to would be a + // slashable protocol violation). The caller must acquire [storage.LockInsertChunkDataPack] and hold it until the database + // write has been committed. + // - error: No error should be returned during normal operation. Any error indicates a failure in the first phase. + Store(cs []*flow.ChunkDataPack) (func(lctx lockctx.Proof, protocolDBBatch ReaderBatchWriter) error, error) - // ByChunkID returns the chunk data for the given a chunk ID. + // ByChunkID returns the chunk data for the given chunk ID. + // It returns [storage.ErrNotFound] if no entry exists for the given chunk ID. ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) - // BatchRemove removes ChunkDataPack c keyed by its ChunkID in provided batch + // BatchRemove schedules all ChunkDataPacks with the given IDs to be deleted from the databases, + // part of the provided write batches. Unknown IDs are silently ignored. + // It returns the list of chunk data pack IDs (chunkDataPackID) that were scheduled for removal from the chunk data pack database. + // It performs a two-phase removal: + // 1. First phase: Remove index mappings from ChunkID to chunkDataPackID in the protocol database + // 2. Second phase: Remove chunk data packs (StoredChunkDataPack) by its hash (chunkDataPackID) in chunk data pack database. + // This phase is deferred until the caller of BatchRemove invokes the returned functor. + // + // Note: it does not remove the collection referred by the chunk data pack. + // This method is useful for the rollback execution tool to batch remove chunk data packs associated with a set of blocks. // No errors are expected during normal operation, even if no entries are matched. - // If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned. - BatchRemove(chunkID flow.Identifier, batch ReaderBatchWriter) error -} - -// StoredChunkDataPack is an in-storage representation of chunk data pack. -// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining -// the collection on a secondary storage. -type StoredChunkDataPack struct { - ChunkID flow.Identifier - StartState flow.StateCommitment - Proof flow.StorageProof - CollectionID flow.Identifier - SystemChunk bool - ExecutionDataRoot flow.BlockExecutionDataRoot -} + BatchRemove(chunkIDs []flow.Identifier, rw ReaderBatchWriter) (chunkDataPackIDs []flow.Identifier, err error) -func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack { - sc := &StoredChunkDataPack{ - ChunkID: c.ChunkID, - StartState: c.StartState, - Proof: c.Proof, - SystemChunk: false, - ExecutionDataRoot: c.ExecutionDataRoot, - } - - if c.Collection != nil { - // non system chunk - sc.CollectionID = c.Collection.ID() - } else { - sc.SystemChunk = true - } - - return sc -} - -func (c StoredChunkDataPack) Equals(other StoredChunkDataPack) error { - if c.ChunkID != other.ChunkID { - return ErrDataMismatch - } - if c.StartState != other.StartState { - return ErrDataMismatch - } - if !c.ExecutionDataRoot.Equals(other.ExecutionDataRoot) { - return ErrDataMismatch - } - if c.SystemChunk != other.SystemChunk { - return ErrDataMismatch - } - if !bytes.Equal(c.Proof, other.Proof) { - return ErrDataMismatch - } - if c.CollectionID != other.CollectionID { - return ErrDataMismatch - } - return nil + // BatchRemoveChunkDataPacksOnly removes multiple ChunkDataPacks with the given chunk IDs from chunk data pack database only. + // It does not remove the index mappings from ChunkID to chunkDataPackID in the protocol database. + // This method is useful for the runtime chunk data pack pruner to batch remove chunk data packs associated with a set of blocks. + // CAUTION: the chunk data pack batch is for chunk data pack database only, DO NOT pass a batch writer for protocol database. + // No errors are expected during normal operation, even if no entries are matched. + BatchRemoveChunkDataPacksOnly(chunkIDs []flow.Identifier, chunkDataPackBatch ReaderBatchWriter) error } diff --git a/storage/chunk_data_packs_stored.go b/storage/chunk_data_packs_stored.go new file mode 100644 index 00000000000..8ddad5d1312 --- /dev/null +++ b/storage/chunk_data_packs_stored.go @@ -0,0 +1,120 @@ +package storage + +import ( + "bytes" + "fmt" + + "github.com/onflow/flow-go/model/flow" +) + +// StoredChunkDataPacks represents persistent storage for chunk data packs. +// It works with the reduced representation `StoredChunkDataPack` for chunk data packs, +// where instead of the full collection data, only the collection's hash (ID) is contained. +type StoredChunkDataPacks interface { + // StoreChunkDataPacks stores multiple StoredChunkDataPacks cs in a batch. + // It returns the chunk data pack IDs + // No error returns are expected during normal operation. + StoreChunkDataPacks(cs []*StoredChunkDataPack) ([]flow.Identifier, error) + + // ByID returns the StoredChunkDataPack for the given ID. + // It returns [storage.ErrNotFound] if no entry exists for the given ID. + ByID(id flow.Identifier) (*StoredChunkDataPack, error) + + // Remove removes multiple ChunkDataPacks cs keyed by their IDs in a batch. + // No error returns are expected during normal operation, even if none of the referenced objects exist in storage. + Remove(chunkDataPackIDs []flow.Identifier) error + + // BatchRemove removes multiple ChunkDataPacks with the given IDs from storage as part of the provided write batch. + // No error returns are expected during normal operation, even if no entries are matched. + BatchRemove(chunkDataPackIDs []flow.Identifier, rw ReaderBatchWriter) error +} + +// StoredChunkDataPack is an in-storage representation of chunk data pack. Its prime difference is instead of an +// actual collection, it keeps a collection ID hence relying on maintaining the collection on a secondary storage. +// Note, StoredChunkDataPack.ID() is the same as ChunkDataPack.ID() +// +//structwrite:immutable - mutations allowed only within the constructor +type StoredChunkDataPack struct { + ChunkID flow.Identifier + StartState flow.StateCommitment + Proof flow.StorageProof + CollectionID flow.Identifier // flow.ZeroID for system chunks + ExecutionDataRoot flow.BlockExecutionDataRoot +} + +// NewStoredChunkDataPack instantiates an "immutable" [StoredChunkDataPack]. +// The `collectionID` field is set to [flow.ZeroID] for system chunks. +func NewStoredChunkDataPack( + chunkID flow.Identifier, + startState flow.StateCommitment, + proof flow.StorageProof, + collectionID flow.Identifier, + executionDataRoot flow.BlockExecutionDataRoot, +) *StoredChunkDataPack { + return &StoredChunkDataPack{ + ChunkID: chunkID, + StartState: startState, + Proof: proof, + CollectionID: collectionID, + ExecutionDataRoot: executionDataRoot, + } +} + +// IsSystemChunk returns true if this chunk data pack is for a system chunk. +func (s *StoredChunkDataPack) IsSystemChunk() bool { + return s.CollectionID == flow.ZeroID +} + +// ToStoredChunkDataPack converts the given Chunk Data Pack to its reduced representation. +// (Collections are stored separately and don't need to be included again here). +func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack { + collectionID := flow.ZeroID + if c.Collection != nil { + collectionID = c.Collection.ID() + } + return NewStoredChunkDataPack( + c.ChunkID, + c.StartState, + c.Proof, + collectionID, + c.ExecutionDataRoot, + ) +} + +// ToStoredChunkDataPacks converts the given Chunk Data Packs to their reduced representation. +// (Collections are stored separately and don't need to be included again here). +func ToStoredChunkDataPacks(cs []*flow.ChunkDataPack) []*StoredChunkDataPack { + scs := make([]*StoredChunkDataPack, 0, len(cs)) + for _, c := range cs { + scs = append(scs, ToStoredChunkDataPack(c)) + } + return scs +} + +// Equals compares two StoredChunkDataPack for equality. +// It returns (true, "") if they are equal, otherwise (false, reason) where reason is the first +// found reason for the mismatch. +func (c StoredChunkDataPack) Equals(other StoredChunkDataPack) (equal bool, diffReason string) { + if c.ChunkID != other.ChunkID { + return false, fmt.Errorf("chunk ID mismatch: %s != %s", c.ChunkID, other.ChunkID).Error() + } + if c.StartState != other.StartState { + return false, fmt.Errorf("start state mismatch: %s != %s", c.StartState, other.StartState).Error() + } + if !c.ExecutionDataRoot.Equals(other.ExecutionDataRoot) { + return false, fmt.Sprintf("execution data root mismatch: %s != %s", c.ExecutionDataRoot, other.ExecutionDataRoot) + } + if !bytes.Equal(c.Proof, other.Proof) { + return false, "storage proof mismatch" + } + if c.CollectionID != other.CollectionID { + return false, fmt.Sprintf("collection ID mismatch: %s != %s", c.CollectionID, other.CollectionID) + } + return true, "" +} + +// ID returns the identifier of the chunk data pack, which is derived from its contents. +// Note, StoredChunkDataPack.ID() is the same as ChunkDataPack.ID() +func (c StoredChunkDataPack) ID() flow.Identifier { + return flow.NewChunkDataPackHeader(c.ChunkID, c.StartState, flow.MakeID(c.Proof), c.CollectionID, c.ExecutionDataRoot).ID() +} diff --git a/storage/locks.go b/storage/locks.go index da3c2532fba..c01827c4eb5 100644 --- a/storage/locks.go +++ b/storage/locks.go @@ -27,8 +27,8 @@ const ( LockInsertCollection = "lock_insert_collection" // LockBootstrapping protects data that is *exclusively* written during bootstrapping. LockBootstrapping = "lock_bootstrapping" - // LockInsertChunkDataPack protects the insertion of chunk data packs (not yet used anywhere) - LockInsertChunkDataPack = "lock_insert_chunk_data_pack" + // LockIndexChunkDataPackByChunkID protects the insertion of chunk data packs + LockIndexChunkDataPackByChunkID = "lock_index_chunk_data_pack_by_chunk_id" // LockInsertTransactionResultErrMessage protects the insertion of transaction result error messages LockInsertTransactionResultErrMessage = "lock_insert_transaction_result_message" // LockInsertLightTransactionResult protects the insertion of light transaction results @@ -51,7 +51,7 @@ func Locks() []string { LockInsertOwnReceipt, LockInsertCollection, LockBootstrapping, - LockInsertChunkDataPack, + LockIndexChunkDataPackByChunkID, LockInsertTransactionResultErrMessage, LockInsertLightTransactionResult, LockInsertExecutionForkEvidence, @@ -84,7 +84,7 @@ func makeLockPolicy() lockctx.Policy { Add(LockBootstrapping, LockInsertSafetyData). Add(LockInsertSafetyData, LockInsertLivenessData). Add(LockInsertOrFinalizeClusterBlock, LockInsertSafetyData). - Add(LockInsertOwnReceipt, LockInsertChunkDataPack). + Add(LockIndexChunkDataPackByChunkID, LockInsertOwnReceipt). // module/executiondatasync/optimistic_sync/persisters/block.go#Persist Add(LockInsertCollection, LockInsertLightTransactionResult). diff --git a/storage/mock/chunk_data_packs.go b/storage/mock/chunk_data_packs.go index 653eba59cd5..6faa2c09cd4 100644 --- a/storage/mock/chunk_data_packs.go +++ b/storage/mock/chunk_data_packs.go @@ -16,17 +16,47 @@ type ChunkDataPacks struct { mock.Mock } -// BatchRemove provides a mock function with given fields: chunkID, batch -func (_m *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.ReaderBatchWriter) error { - ret := _m.Called(chunkID, batch) +// BatchRemove provides a mock function with given fields: chunkIDs, rw +func (_m *ChunkDataPacks) BatchRemove(chunkIDs []flow.Identifier, rw storage.ReaderBatchWriter) ([]flow.Identifier, error) { + ret := _m.Called(chunkIDs, rw) if len(ret) == 0 { panic("no return value specified for BatchRemove") } + var r0 []flow.Identifier + var r1 error + if rf, ok := ret.Get(0).(func([]flow.Identifier, storage.ReaderBatchWriter) ([]flow.Identifier, error)); ok { + return rf(chunkIDs, rw) + } + if rf, ok := ret.Get(0).(func([]flow.Identifier, storage.ReaderBatchWriter) []flow.Identifier); ok { + r0 = rf(chunkIDs, rw) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]flow.Identifier) + } + } + + if rf, ok := ret.Get(1).(func([]flow.Identifier, storage.ReaderBatchWriter) error); ok { + r1 = rf(chunkIDs, rw) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// BatchRemoveChunkDataPacksOnly provides a mock function with given fields: chunkIDs, chunkDataPackBatch +func (_m *ChunkDataPacks) BatchRemoveChunkDataPacksOnly(chunkIDs []flow.Identifier, chunkDataPackBatch storage.ReaderBatchWriter) error { + ret := _m.Called(chunkIDs, chunkDataPackBatch) + + if len(ret) == 0 { + panic("no return value specified for BatchRemoveChunkDataPacksOnly") + } + var r0 error - if rf, ok := ret.Get(0).(func(flow.Identifier, storage.ReaderBatchWriter) error); ok { - r0 = rf(chunkID, batch) + if rf, ok := ret.Get(0).(func([]flow.Identifier, storage.ReaderBatchWriter) error); ok { + r0 = rf(chunkIDs, chunkDataPackBatch) } else { r0 = ret.Error(0) } @@ -64,40 +94,34 @@ func (_m *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPac return r0, r1 } -// Remove provides a mock function with given fields: cs -func (_m *ChunkDataPacks) Remove(cs []flow.Identifier) error { +// Store provides a mock function with given fields: cs +func (_m *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) (func(lockctx.Proof, storage.ReaderBatchWriter) error, error) { ret := _m.Called(cs) if len(ret) == 0 { - panic("no return value specified for Remove") + panic("no return value specified for Store") } - var r0 error - if rf, ok := ret.Get(0).(func([]flow.Identifier) error); ok { + var r0 func(lockctx.Proof, storage.ReaderBatchWriter) error + var r1 error + if rf, ok := ret.Get(0).(func([]*flow.ChunkDataPack) (func(lockctx.Proof, storage.ReaderBatchWriter) error, error)); ok { + return rf(cs) + } + if rf, ok := ret.Get(0).(func([]*flow.ChunkDataPack) func(lockctx.Proof, storage.ReaderBatchWriter) error); ok { r0 = rf(cs) } else { - r0 = ret.Error(0) - } - - return r0 -} - -// StoreByChunkID provides a mock function with given fields: lctx, cs -func (_m *ChunkDataPacks) StoreByChunkID(lctx lockctx.Proof, cs []*flow.ChunkDataPack) error { - ret := _m.Called(lctx, cs) - - if len(ret) == 0 { - panic("no return value specified for StoreByChunkID") + if ret.Get(0) != nil { + r0 = ret.Get(0).(func(lockctx.Proof, storage.ReaderBatchWriter) error) + } } - var r0 error - if rf, ok := ret.Get(0).(func(lockctx.Proof, []*flow.ChunkDataPack) error); ok { - r0 = rf(lctx, cs) + if rf, ok := ret.Get(1).(func([]*flow.ChunkDataPack) error); ok { + r1 = rf(cs) } else { - r0 = ret.Error(0) + r1 = ret.Error(1) } - return r0 + return r0, r1 } // NewChunkDataPacks creates a new instance of ChunkDataPacks. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. diff --git a/storage/mock/stored_chunk_data_packs.go b/storage/mock/stored_chunk_data_packs.go new file mode 100644 index 00000000000..a88c298e91a --- /dev/null +++ b/storage/mock/stored_chunk_data_packs.go @@ -0,0 +1,125 @@ +// Code generated by mockery. DO NOT EDIT. + +package mock + +import ( + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" + + storage "github.com/onflow/flow-go/storage" +) + +// StoredChunkDataPacks is an autogenerated mock type for the StoredChunkDataPacks type +type StoredChunkDataPacks struct { + mock.Mock +} + +// BatchRemove provides a mock function with given fields: chunkDataPackIDs, rw +func (_m *StoredChunkDataPacks) BatchRemove(chunkDataPackIDs []flow.Identifier, rw storage.ReaderBatchWriter) error { + ret := _m.Called(chunkDataPackIDs, rw) + + if len(ret) == 0 { + panic("no return value specified for BatchRemove") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]flow.Identifier, storage.ReaderBatchWriter) error); ok { + r0 = rf(chunkDataPackIDs, rw) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ByID provides a mock function with given fields: id +func (_m *StoredChunkDataPacks) ByID(id flow.Identifier) (*storage.StoredChunkDataPack, error) { + ret := _m.Called(id) + + if len(ret) == 0 { + panic("no return value specified for ByID") + } + + var r0 *storage.StoredChunkDataPack + var r1 error + if rf, ok := ret.Get(0).(func(flow.Identifier) (*storage.StoredChunkDataPack, error)); ok { + return rf(id) + } + if rf, ok := ret.Get(0).(func(flow.Identifier) *storage.StoredChunkDataPack); ok { + r0 = rf(id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage.StoredChunkDataPack) + } + } + + if rf, ok := ret.Get(1).(func(flow.Identifier) error); ok { + r1 = rf(id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Remove provides a mock function with given fields: chunkDataPackIDs +func (_m *StoredChunkDataPacks) Remove(chunkDataPackIDs []flow.Identifier) error { + ret := _m.Called(chunkDataPackIDs) + + if len(ret) == 0 { + panic("no return value specified for Remove") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]flow.Identifier) error); ok { + r0 = rf(chunkDataPackIDs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StoreChunkDataPacks provides a mock function with given fields: cs +func (_m *StoredChunkDataPacks) StoreChunkDataPacks(cs []*storage.StoredChunkDataPack) ([]flow.Identifier, error) { + ret := _m.Called(cs) + + if len(ret) == 0 { + panic("no return value specified for StoreChunkDataPacks") + } + + var r0 []flow.Identifier + var r1 error + if rf, ok := ret.Get(0).(func([]*storage.StoredChunkDataPack) ([]flow.Identifier, error)); ok { + return rf(cs) + } + if rf, ok := ret.Get(0).(func([]*storage.StoredChunkDataPack) []flow.Identifier); ok { + r0 = rf(cs) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]flow.Identifier) + } + } + + if rf, ok := ret.Get(1).(func([]*storage.StoredChunkDataPack) error); ok { + r1 = rf(cs) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewStoredChunkDataPacks creates a new instance of StoredChunkDataPacks. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewStoredChunkDataPacks(t interface { + mock.TestingT + Cleanup(func()) +}) *StoredChunkDataPacks { + mock := &StoredChunkDataPacks{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/operation/chunk_data_packs.go b/storage/operation/chunk_data_packs.go index 6502ab49f59..bc791db8fb8 100644 --- a/storage/operation/chunk_data_packs.go +++ b/storage/operation/chunk_data_packs.go @@ -10,50 +10,61 @@ import ( "github.com/onflow/flow-go/storage" ) -// InsertChunkDataPack inserts a [storage.StoredChunkDataPack] into the database, keyed by its chunk ID. -// The function ensures data integrity by first checking if a chunk data pack already exists for the given -// chunk ID and rejecting overwrites with different values. This function is idempotent, i.e. repeated calls -// with the *initially* stored value are no-ops. -// -// CAUTION: -// - Confirming that no value is already stored and the subsequent write must be atomic to prevent data corruption. -// The caller must acquire the [storage.LockInsertChunkDataPack] and hold it until the database write has been committed. -// -// Expected error returns during normal operations: -// - [storage.ErrDataMismatch] if a *different* chunk data pack is already stored for the same chunk ID -func InsertChunkDataPack(lctx lockctx.Proof, rw storage.ReaderBatchWriter, c *storage.StoredChunkDataPack) error { - if !lctx.HoldsLock(storage.LockInsertChunkDataPack) { - return fmt.Errorf("InsertChunkDataPack requires lock: %s", storage.LockInsertChunkDataPack) +// IndexChunkDataPackByChunkID inserts a mapping from chunk ID to stored chunk data pack ID. It requires +// the [storage.LockIndexChunkDataPackByChunkID] lock to be acquired by the caller and held until the write batch has been committed. +// Returns [storage.ErrDataMismatch] if a different chunk data pack ID already exists for the given chunk ID. +func IndexChunkDataPackByChunkID(lctx lockctx.Proof, rw storage.ReaderBatchWriter, chunkID flow.Identifier, chunkDataPackID flow.Identifier) error { + if !lctx.HoldsLock(storage.LockIndexChunkDataPackByChunkID) { + return fmt.Errorf("missing required lock: %s", storage.LockIndexChunkDataPackByChunkID) } - - key := MakePrefix(codeChunkDataPack, c.ChunkID) - - var existing storage.StoredChunkDataPack + key := MakePrefix(codeIndexChunkDataPackByChunkID, chunkID) + var existing flow.Identifier err := RetrieveByKey(rw.GlobalReader(), key, &existing) if err == nil { - err := c.Equals(existing) - if err != nil { - return fmt.Errorf("attempting to store conflicting chunk data pack (chunk ID: %v): storing: %+v, stored: %+v, err: %s. %w", - c.ChunkID, c, &existing, err, storage.ErrDataMismatch) + if existing == chunkDataPackID { + // already exists, nothing to do + return nil } - return nil // already stored, nothing to do + return fmt.Errorf("cannot insert chunk data pack ID for chunk %s, different one exist: existing: %v, new: %v: %w", + chunkID, existing, chunkDataPackID, storage.ErrDataMismatch) + } else if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("cannot check existing chunk data pack ID for chunk %s: %w", chunkID, err) } - if !errors.Is(err, storage.ErrNotFound) { - return fmt.Errorf("checking for existing chunk data pack (chunk ID: %v): %w", c.ChunkID, err) - } + return UpsertByKey(rw.Writer(), key, &chunkDataPackID) +} - return UpsertByKey(rw.Writer(), key, c) +// RetrieveChunkDataPackID retrieves the stored chunk data pack ID for a given chunk ID. +// Returns [storage.ErrNotFound] if no chunk data pack has been indexed as result for the given chunk ID. +func RetrieveChunkDataPackID(r storage.Reader, chunkID flow.Identifier, chunkDataPackID *flow.Identifier) error { + return RetrieveByKey(r, MakePrefix(codeIndexChunkDataPackByChunkID, chunkID), chunkDataPackID) +} + +// RemoveChunkDataPackID removes the mapping from chunk ID to stored chunk data pack ID. +// Non-existing keys are no-ops. Any errors are exceptions. +func RemoveChunkDataPackID(w storage.Writer, chunkID flow.Identifier) error { + return RemoveByKey(w, MakePrefix(codeIndexChunkDataPackByChunkID, chunkID)) +} + +// InsertChunkDataPack inserts a [storage.StoredChunkDataPack] into the database, keyed by its own ID. +// +// CAUTION: The caller must ensure `storeChunkDataPackID` is the same as `c.ID()`, ie. a collision-resistant +// hash of the chunk data pack! This method silently overrides existing data, which is safe only if for the +// same key, we always write the same value. +// +// No error returns expected during normal operations. +func InsertChunkDataPack(rw storage.ReaderBatchWriter, storeChunkDataPackID flow.Identifier, c *storage.StoredChunkDataPack) error { + return UpsertByKey(rw.Writer(), MakePrefix(codeChunkDataPack, storeChunkDataPackID), c) } -// RetrieveChunkDataPack retrieves a chunk data pack by chunk ID. -// it returns storage.ErrNotFound if the chunk data pack is not found -func RetrieveChunkDataPack(r storage.Reader, chunkID flow.Identifier, c *storage.StoredChunkDataPack) error { - return RetrieveByKey(r, MakePrefix(codeChunkDataPack, chunkID), c) +// RetrieveChunkDataPack retrieves a chunk data pack by stored chunk data pack ID. +// It returns [storage.ErrNotFound] if no chunk data pack with the given ID is known. +func RetrieveChunkDataPack(r storage.Reader, storeChunkDataPackID flow.Identifier, c *storage.StoredChunkDataPack) error { + return RetrieveByKey(r, MakePrefix(codeChunkDataPack, storeChunkDataPackID), c) } -// RemoveChunkDataPack removes the chunk data pack with the given chunk ID. -// any error are exceptions -func RemoveChunkDataPack(w storage.Writer, chunkID flow.Identifier) error { - return RemoveByKey(w, MakePrefix(codeChunkDataPack, chunkID)) +// RemoveChunkDataPack removes the chunk data pack with the given stored chunk data pack ID. +// Non-existing keys are no-ops. Any errors are exceptions. +func RemoveChunkDataPack(w storage.Writer, chunkDataPackID flow.Identifier) error { + return RemoveByKey(w, MakePrefix(codeChunkDataPack, chunkDataPackID)) } diff --git a/storage/operation/chunk_data_packs_test.go b/storage/operation/chunk_data_packs_test.go index b2843154a2c..96ba338e57d 100644 --- a/storage/operation/chunk_data_packs_test.go +++ b/storage/operation/chunk_data_packs_test.go @@ -7,15 +7,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" "github.com/onflow/flow-go/storage/operation/dbtest" "github.com/onflow/flow-go/utils/unittest" ) +// TestChunkDataPack tests basic operation for storing ChunkDataPacks themselves. +// The data is typically stored in the chunk data packs database. func TestChunkDataPack(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { - lockManager := storage.NewTestingLockManager() collectionID := unittest.IdentifierFixture() expected := &storage.StoredChunkDataPack{ ChunkID: unittest.IdentifierFixture(), @@ -26,19 +28,17 @@ func TestChunkDataPack(t *testing.T) { t.Run("Retrieve non-existent", func(t *testing.T) { var actual storage.StoredChunkDataPack - err := operation.RetrieveChunkDataPack(db.Reader(), expected.ChunkID, &actual) + err := operation.RetrieveChunkDataPack(db.Reader(), expected.ID(), &actual) assert.Error(t, err) }) t.Run("Save", func(t *testing.T) { - require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error { - return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - return operation.InsertChunkDataPack(lctx, rw, expected) - }) + require.NoError(t, db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.InsertChunkDataPack(rw, expected.ID(), expected) })) var actual storage.StoredChunkDataPack - err := operation.RetrieveChunkDataPack(db.Reader(), expected.ChunkID, &actual) + err := operation.RetrieveChunkDataPack(db.Reader(), expected.ID(), &actual) assert.NoError(t, err) assert.Equal(t, *expected, actual) @@ -46,13 +46,239 @@ func TestChunkDataPack(t *testing.T) { t.Run("Remove", func(t *testing.T) { err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - return operation.RemoveChunkDataPack(rw.Writer(), expected.ChunkID) + return operation.RemoveChunkDataPack(rw.Writer(), expected.ID()) }) require.NoError(t, err) var actual storage.StoredChunkDataPack - err = operation.RetrieveChunkDataPack(db.Reader(), expected.ChunkID, &actual) + err = operation.RetrieveChunkDataPack(db.Reader(), expected.ID(), &actual) assert.Error(t, err) }) }) } + +// TestIndexChunkDataPackByChunkID tests populating the index mapping from chunkID to the resulting chunk data pack's ID. +// Essentially, the chunk ID describes the work to be done, and the chunk data pack describes the result of that work. +// The index from chunk ID to chunk data pack ID is typically stored in the protocol database. +func TestIndexChunkDataPackByChunkID(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + chunkID := unittest.IdentifierFixture() + chunkDataPackID := unittest.IdentifierFixture() + + t.Run("successful insert", func(t *testing.T) { + err := unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.IndexChunkDataPackByChunkID(lctx, rw, chunkID, chunkDataPackID) + }) + }) + require.NoError(t, err) + + // Verify the chunk data pack ID was indexed + var retrievedID flow.Identifier + err = operation.RetrieveChunkDataPackID(db.Reader(), chunkID, &retrievedID) + require.NoError(t, err) + assert.Equal(t, chunkDataPackID, retrievedID) + }) + + t.Run("idempotent insert", func(t *testing.T) { + // Insert the same chunk data pack ID again should be idempotent + err := unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.IndexChunkDataPackByChunkID(lctx, rw, chunkID, chunkDataPackID) + }) + }) + require.NoError(t, err) + + // Verify the chunk data pack ID is still the same + var retrievedID flow.Identifier + err = operation.RetrieveChunkDataPackID(db.Reader(), chunkID, &retrievedID) + require.NoError(t, err) + assert.Equal(t, chunkDataPackID, retrievedID) + }) + + t.Run("data mismatch error", func(t *testing.T) { + differentStoredChunkDataPackID := unittest.IdentifierFixture() + + err := unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.IndexChunkDataPackByChunkID(lctx, rw, chunkID, differentStoredChunkDataPackID) + }) + }) + require.Error(t, err) + assert.ErrorIs(t, err, storage.ErrDataMismatch) + assert.Contains(t, err.Error(), "different one exist") + + // Verify the chunk data pack ID is unchanged + var retrievedID flow.Identifier + err = operation.RetrieveChunkDataPackID(db.Reader(), chunkID, &retrievedID) + require.NoError(t, err) + assert.Equal(t, chunkDataPackID, retrievedID) + }) + + t.Run("missing required lock", func(t *testing.T) { + newChunkID := unittest.IdentifierFixture() + newStoredChunkDataPackID := unittest.IdentifierFixture() + + // Create a context without any locks + lctx := lockManager.NewContext() + defer lctx.Release() + + err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.IndexChunkDataPackByChunkID(lctx, rw, newChunkID, newStoredChunkDataPackID) + }) + require.Error(t, err) + require.NotErrorIs(t, err, storage.ErrDataMismatch) // missing lock should not be erroneously represented as mismatching data + assert.Contains(t, err.Error(), "missing required lock") + assert.Contains(t, err.Error(), storage.LockIndexChunkDataPackByChunkID) + }) + + t.Run("wrong lock type", func(t *testing.T) { + newChunkID := unittest.IdentifierFixture() + newStoredChunkDataPackID := unittest.IdentifierFixture() + + // Use wrong lock type + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.IndexChunkDataPackByChunkID(lctx, rw, newChunkID, newStoredChunkDataPackID) + }) + }) + require.Error(t, err) + require.NotErrorIs(t, err, storage.ErrDataMismatch) // wrong lock should not be erroneously represented as mismatching data + assert.Contains(t, err.Error(), "missing required lock") + assert.Contains(t, err.Error(), storage.LockIndexChunkDataPackByChunkID) + }) + }) +} + +// TestRetrieveChunkDataPackID tests reading the index mapping from chunkID to the resulting chunk data pack's ID. +// Essentially, the chunk ID describes the work to be done, and the chunk data pack describes the result of that work. +// The index from chunk ID to chunk data pack ID is typically stored in the protocol database. +func TestRetrieveChunkDataPackID(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + chunkID := unittest.IdentifierFixture() + chunkDataPackID := unittest.IdentifierFixture() + + t.Run("retrieve non-existent", func(t *testing.T) { + var retrievedID flow.Identifier + err := operation.RetrieveChunkDataPackID(db.Reader(), chunkID, &retrievedID) + require.Error(t, err) + assert.ErrorIs(t, err, storage.ErrNotFound) + }) + + t.Run("retrieve existing", func(t *testing.T) { + // First insert a chunk data pack ID + err := unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.IndexChunkDataPackByChunkID(lctx, rw, chunkID, chunkDataPackID) + }) + }) + require.NoError(t, err) + + // Then retrieve it + var retrievedID flow.Identifier + err = operation.RetrieveChunkDataPackID(db.Reader(), chunkID, &retrievedID) + require.NoError(t, err) + assert.Equal(t, chunkDataPackID, retrievedID) + }) + + t.Run("retrieve after removal", func(t *testing.T) { + // Insert a chunk data pack ID + err := unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.IndexChunkDataPackByChunkID(lctx, rw, chunkID, chunkDataPackID) + }) + }) + require.NoError(t, err) + + // Remove it + err = db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.RemoveChunkDataPackID(rw.Writer(), chunkID) + }) + require.NoError(t, err) + + // Try to retrieve it again - should fail + var retrievedID flow.Identifier + err = operation.RetrieveChunkDataPackID(db.Reader(), chunkID, &retrievedID) + require.Error(t, err) + assert.ErrorIs(t, err, storage.ErrNotFound) + }) + }) +} + +// TestRemoveChunkDataPackID tests the RemoveChunkDataPackID operation +func TestRemoveChunkDataPackID(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + chunkID := unittest.IdentifierFixture() + chunkDataPackID := unittest.IdentifierFixture() + + t.Run("remove non-existent", func(t *testing.T) { + // Removing a non-existent chunk data pack ID should not error + err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.RemoveChunkDataPackID(rw.Writer(), chunkID) + }) + require.NoError(t, err) + }) + + t.Run("remove existing", func(t *testing.T) { + // First insert a chunk data pack ID + err := unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.IndexChunkDataPackByChunkID(lctx, rw, chunkID, chunkDataPackID) + }) + }) + require.NoError(t, err) + + // Verify it exists + var retrievedID flow.Identifier + err = operation.RetrieveChunkDataPackID(db.Reader(), chunkID, &retrievedID) + require.NoError(t, err) + assert.Equal(t, chunkDataPackID, retrievedID) + + // Remove it + err = db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.RemoveChunkDataPackID(rw.Writer(), chunkID) + }) + require.NoError(t, err) + + // Verify it's gone + err = operation.RetrieveChunkDataPackID(db.Reader(), chunkID, &retrievedID) + require.Error(t, err) + assert.ErrorIs(t, err, storage.ErrNotFound) + }) + + t.Run("remove multiple", func(t *testing.T) { + chunkIDs := unittest.IdentifierListFixture(3) + chunkDataPackIDs := unittest.IdentifierListFixture(3) + + // Insert multiple chunk data pack IDs + for i := 0; i < 3; i++ { + + err := unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.IndexChunkDataPackByChunkID(lctx, rw, chunkIDs[i], chunkDataPackIDs[i]) + }) + }) + require.NoError(t, err) + } + + // Remove all of them + for i := 0; i < 3; i++ { + err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.RemoveChunkDataPackID(rw.Writer(), chunkIDs[i]) + }) + require.NoError(t, err) + } + + // Verify all are gone + for i := 0; i < 3; i++ { + var retrievedID flow.Identifier + err := operation.RetrieveChunkDataPackID(db.Reader(), chunkIDs[i], &retrievedID) + require.Error(t, err) + assert.ErrorIs(t, err, storage.ErrNotFound) + } + }) + }) +} diff --git a/storage/operation/prefix.go b/storage/operation/prefix.go index ce6f438c4c9..5ff5c79a411 100644 --- a/storage/operation/prefix.go +++ b/storage/operation/prefix.go @@ -98,8 +98,15 @@ const ( codeTransactionIDByScheduledTransactionID = 82 // index of transaction ID by scheduled transaction ID codeBlockIDByScheduledTransactionID = 83 // index of block ID by scheduled transaction ID + // The storage prefixes `codeChunkDataPack` and `codeIndexChunkDataPackByChunkID` are used primarily by execution nodes + // to persist their own results for chunks they executed. + // - `codeIndexChunkDataPackByChunkID` stores the chunkID → chunkDataPackID index, and + // - `codeChunkDataPack` stores the chunk data pack by its own ID. + // This breakup allows us to store chunk data packs in a different database in a concurrent safe way. + codeIndexChunkDataPackByChunkID = 99 + codeChunkDataPack = 100 + // legacy codes (should be cleaned up) - codeChunkDataPack = 100 codeCommit = 101 codeEvent = 102 codeExecutionStateInteractions = 103 diff --git a/storage/operation/stats_test.go b/storage/operation/stats_test.go index ff05671b1c3..27e233b15fa 100644 --- a/storage/operation/stats_test.go +++ b/storage/operation/stats_test.go @@ -3,7 +3,6 @@ package operation_test import ( "testing" - "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/storage" @@ -14,8 +13,6 @@ import ( func TestSummarizeKeysByFirstByteConcurrent(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { - lockManager := storage.NewTestingLockManager() - err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { // insert random events b := unittest.IdentifierFixture() @@ -36,9 +33,10 @@ func TestSummarizeKeysByFirstByteConcurrent(t *testing.T) { Proof: []byte{'p'}, CollectionID: collectionID, } - require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error { - return operation.InsertChunkDataPack(lctx, rw, cdp) - })) + err := operation.InsertChunkDataPack(rw, cdp.ID(), cdp) + if err != nil { + return err + } } // insert 20 results @@ -63,11 +61,11 @@ func TestSummarizeKeysByFirstByteConcurrent(t *testing.T) { for i := 0; i < 256; i++ { count := 0 - if i == 102 { // events + if i == 102 { // events (codeEvent) count = 30 - } else if i == 100 { // CDP + } else if i == 100 { // CDP (codeChunkDataPack) count = 100 - } else if i == 36 { // results + } else if i == 36 { // results (codeExecutionResult) count = 20 } require.Equal(t, count, stats[byte(i)].Count, "byte %d", i) diff --git a/storage/store/chunk_data_packs.go b/storage/store/chunk_data_packs.go index 0568dd8f5c9..1aa7adf5b9f 100644 --- a/storage/store/chunk_data_packs.go +++ b/storage/store/chunk_data_packs.go @@ -1,9 +1,11 @@ package store import ( + "errors" "fmt" "github.com/jordanschalm/lockctx" + "github.com/rs/zerolog/log" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" @@ -12,119 +14,271 @@ import ( "github.com/onflow/flow-go/storage/operation" ) +// ChunkDataPacks manages storage and retrieval of ChunkDataPacks, primarily serving the use case of EXECUTION NODES persisting +// and indexing chunk data packs for their OWN RESULTS. Essentially, the chunk describes a batch of work to be done, and the +// chunk data pack describes the result of that work. The storage of chunk data packs is segregated across different +// storage components for efficiency and modularity reasons: +// 0. Usually (ignoring the system chunk for a moment), the batch of work is given by the collection referenced in the chunk +// data pack. For any chunk data pack being stored, we assume that the executed collection has *previously* been persisted +// in [storage.Collections]. It is useful to persist the collections individually, so we can individually retrieve them. +// 1. The actual chunk data pack itself is stored in a dedicated storage component `cdpStorage`. Note that for this storage +// component, no atomicity is required, as we are storing chunk data packs by their collision-resistant hashes, so +// different chunk data packs will be stored under different keys. +// Theoretically, nodes could store persist multiple different (disagreeing) chunk data packs for the same +// chunk in this step. However, for efficiency, Execution Nodes only store their own chunk data packs. +// 2. The index mapping from ChunkID to chunkDataPackID is stored in the protocol database for fast retrieval. +// This index is intended to be populated by execution nodes when they commit to a specific result represented by the chunk +// data pack. Here, we require atomicity, as an execution node should not be changing / overwriting which chunk data pack +// it committed to (during normal operations). +// +// Since the executed collections are stored separately (step 0, above), we can just use the collection ID in context of the +// chunk data pack storage (step 1, above). Therefore, we utilize the reduced representation [storage.StoredChunkDataPack] +// internally. While removing redundant data from storage, it takes 3 look-ups to return chunk data pack by chunk ID: +// +// i. a lookup for chunkID -> chunkDataPackID +// ii. a lookup for chunkDataPackID -> StoredChunkDataPack (only has CollectionID, no collection data) +// iii. a lookup for CollectionID -> Collection, then reconstruct the chunk data pack from the collection and the StoredChunkDataPack type ChunkDataPacks struct { - db storage.DB - collections storage.Collections - byChunkIDCache *Cache[flow.Identifier, *storage.StoredChunkDataPack] + // the protocol DB is used for storing index mappings from chunk ID to chunk data pack ID + protocolDB storage.DB + + // the actual chunk data pack is stored here, which is a separate storage from protocol DB + cdpStorage storage.StoredChunkDataPacks + + // We assume that for every chunk data pack not for a system chunk, the executed collection has + // previously been persisted in `storage.Collections`. We use this storage abstraction here only for + // retrieving collections. We assume that `storage.Collections` has its own caching already built in. + collections storage.Collections + + // cache chunkID -> chunkDataPackID + chunkIDToChunkDataPackIDCache *Cache[flow.Identifier, flow.Identifier] } var _ storage.ChunkDataPacks = (*ChunkDataPacks)(nil) -func NewChunkDataPacks(collector module.CacheMetrics, db storage.DB, collections storage.Collections, byChunkIDCacheSize uint) *ChunkDataPacks { +func NewChunkDataPacks(collector module.CacheMetrics, db storage.DB, cdpStorage storage.StoredChunkDataPacks, collections storage.Collections, chunkIDToChunkDataPackIDCacheSize uint) *ChunkDataPacks { - storeWithLock := func(lctx lockctx.Proof, rw storage.ReaderBatchWriter, key flow.Identifier, val *storage.StoredChunkDataPack) error { - return operation.InsertChunkDataPack(lctx, rw, val) + retrieve := func(r storage.Reader, chunkID flow.Identifier) (flow.Identifier, error) { + var chunkDataPackID flow.Identifier + err := operation.RetrieveChunkDataPackID(r, chunkID, &chunkDataPackID) + return chunkDataPackID, err } - retrieve := func(r storage.Reader, key flow.Identifier) (*storage.StoredChunkDataPack, error) { - var c storage.StoredChunkDataPack - err := operation.RetrieveChunkDataPack(r, key, &c) - return &c, err + remove := func(rw storage.ReaderBatchWriter, chunkID flow.Identifier) error { + return operation.RemoveChunkDataPackID(rw.Writer(), chunkID) } - cache := newCache(collector, metrics.ResourceChunkDataPack, - withLimit[flow.Identifier, *storage.StoredChunkDataPack](byChunkIDCacheSize), - withStoreWithLock(storeWithLock), + cache := newCache(collector, metrics.ResourceChunkIDToChunkDataPackIndex, + withLimit[flow.Identifier, flow.Identifier](chunkIDToChunkDataPackIDCacheSize), + withStoreWithLock(operation.IndexChunkDataPackByChunkID), + withRemove[flow.Identifier, flow.Identifier](remove), withRetrieve(retrieve), ) ch := ChunkDataPacks{ - db: db, - byChunkIDCache: cache, - collections: collections, + protocolDB: db, + chunkIDToChunkDataPackIDCache: cache, + cdpStorage: cdpStorage, + collections: collections, } return &ch } -// Remove removes multiple ChunkDataPacks cs keyed by their ChunkIDs in a batch. -// No errors are expected during normal operation, even if no entries are matched. -func (ch *ChunkDataPacks) Remove(chunkIDs []flow.Identifier) error { - return ch.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - for _, c := range chunkIDs { - err := ch.BatchRemove(c, rw) +// Store persists multiple ChunkDataPacks in a two-phase process: +// 1. Store chunk data packs (StoredChunkDataPack) by its hash (chunkDataPackID) in chunk data pack database. +// 2. Populate index mapping from ChunkID to chunkDataPackID in protocol database. +// +// Reasoning for two-phase approach: the chunk data pack and the other execution data are stored in different databases. +// - Chunk data pack content is stored in the chunk data pack database by its hash (ID). Conceptually, it would be possible +// to store multiple different (disagreeing) chunk data packs here. Each chunk data pack is stored using its own collision +// resistant hash as key, so different chunk data packs will be stored under different keys. So from the perspective of the +// storage layer, we _could_ in phase 1 store all known chunk data packs. However, an Execution Node may only commit to a single +// chunk data pack (or it will get slashed). This mapping from chunk ID to the ID of the chunk data pack that the Execution Node +// actually committed to is stored in the protocol database, in the following phase 2. +// - In the second phase, we populate the index mappings from ChunkID to one "distinguished" chunk data pack ID. This mapping +// is stored in the protocol database. Typically, an Execution Node uses this for indexing its own chunk data packs which it +// publicly committed to. +// +// ATOMICITY: +// [ChunkDataPacks.Store] executes phase 1 immediately, persisting the chunk data packs in their dedicated database. However, +// the index mappings in phase 2 is deferred to the caller, who must invoke the returned functor to perform phase 2. This +// approach has the following benefits: +// - Our API reflects that we are writing to two different databases here, with the chunk data pack database containing largely +// specialized data subject to pruning. In contrast, the protocol database persists the commitments a node make (subject to +// slashing). The caller receives the ability to persist this commitment in the form of the returned functor. The functor +// may be discarded by the caller without corrupting the state (if anything, we have just stored some additional chunk data +// packs). +// - The serialization and storage of the comparatively large chunk data packs is separated from the protocol database writes. +// - The locking duration of the protocol database is reduced. +// +// The Store method returns: +// - func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error: Function for populating the index mapping from chunkID +// to chunk data pack ID in the protocol database. This mapping persists that the Execution Node committed to the result +// represented by this chunk data pack. This function returns [storage.ErrDataMismatch] when a _different_ chunk data pack +// ID for the same chunk ID has already been stored (changing which result an execution Node committed to would be a +// slashable protocol violation). The caller must acquire [storage.LockInsertChunkDataPack] and hold it until the database +// write has been committed. +// - error: No error should be returned during normal operation. Any error indicates a failure in the first phase. +func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) ( + func(lctx lockctx.Proof, protocolDBBatch storage.ReaderBatchWriter) error, + error, +) { + + // Phase 1: Store chunk data packs in dedicated (separate) database. This converts the + // ChunkDataPacks to the reduced StoredChunkDataPacks representation and stores them. + storedChunkDataPacks := storage.ToStoredChunkDataPacks(cs) + + // Store the chunk data packs and get back their unique IDs + chunkDataPackIDs, err := ch.cdpStorage.StoreChunkDataPacks(storedChunkDataPacks) + if err != nil { + return nil, fmt.Errorf("cannot store chunk data packs: %w", err) + } + + // Sanity check: validate that we received the expected number of IDs + if len(cs) != len(chunkDataPackIDs) { + return nil, fmt.Errorf("stored chunk data pack IDs count mismatch: expected: %d, got: %d", + len(cs), len(chunkDataPackIDs)) + } + + // Phase 2: Create the function that will index chunkID -> chunkDataPackID mappings + storeChunkDataPacksFunc := func(lctx lockctx.Proof, protocolDBBatch storage.ReaderBatchWriter) error { + protocolDBBatch.AddCallback(func(err error) { if err != nil { - return fmt.Errorf("cannot remove chunk data pack: %w", err) + log.Warn().Err(err).Msgf("indexing chunkID -> chunkDataPackID mapping failed, chunkDataPackIDs: %v", chunkDataPackIDs) } - } - - return nil - }) -} + }) -// StoreByChunkID stores multiple ChunkDataPacks cs keyed by their ChunkIDs in a batch. -// No errors are expected during normal operation, but it may return generic error -func (ch *ChunkDataPacks) StoreByChunkID(lctx lockctx.Proof, cs []*flow.ChunkDataPack) error { - return ch.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - for _, c := range cs { - sc := storage.ToStoredChunkDataPack(c) - err := ch.byChunkIDCache.PutWithLockTx(lctx, rw, sc.ChunkID, sc) + // Create index mappings for each chunk data pack + for i, c := range cs { + chunkDataPackID := chunkDataPackIDs[i] + // Index the stored chunk data pack ID by chunk ID for fast retrieval + err := ch.chunkIDToChunkDataPackIDCache.PutWithLockTx( + lctx, protocolDBBatch, c.ChunkID, chunkDataPackID) if err != nil { - return err + return fmt.Errorf("cannot index stored chunk data pack ID by chunk ID: %w", err) } } return nil - }) + } + + // Returned Functor: when invoked, will add the deferred storage operations to the provided ReaderBatchWriter + // NOTE: until this functor is called, only the chunk data packs are stored by their respective IDs. + return storeChunkDataPacksFunc, nil } -// BatchRemove removes ChunkDataPack c keyed by its ChunkID in provided batch +// BatchRemove remove multiple ChunkDataPacks with the given chunk IDs. +// It performs a two-phase removal: +// 1. First phase: Remove index mappings from ChunkID to chunkDataPackID in the protocol database +// 2. Second phase: Remove chunk data packs (StoredChunkDataPack) by its hash (chunkDataPackID) in chunk data pack database. +// Note: it does not remove the collection referred by the chunk data pack. +// This method is useful for the rollback execution tool to batch remove chunk data packs associated with a set of blocks. // No errors are expected during normal operation, even if no entries are matched. -// If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned. -func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, rw storage.ReaderBatchWriter) error { - storage.OnCommitSucceed(rw, func() { - ch.byChunkIDCache.Remove(chunkID) - }) - return operation.RemoveChunkDataPack(rw.Writer(), chunkID) -} +func (ch *ChunkDataPacks) BatchRemove( + chunkIDs []flow.Identifier, + protocolDBBatch storage.ReaderBatchWriter, +) ([]flow.Identifier, error) { + // First, collect all stored chunk data pack IDs that need to be removed + chunkDataPackIDs := make([]flow.Identifier, 0, len(chunkIDs)) + for _, chunkID := range chunkIDs { + chunkDataPackID, err := ch.chunkIDToChunkDataPackIDCache.Get(protocolDBBatch.GlobalReader(), chunkID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + // If we can't find the stored chunk data pack ID, continue with other removals + // This handles the case where the chunk data pack was never properly stored + continue + } -func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { - schdp, err := ch.byChunkID(chunkID) - if err != nil { - return nil, err + return nil, fmt.Errorf("cannot retrieve stored chunk data pack ID for chunk %x: %w", chunkID, err) + } + chunkDataPackIDs = append(chunkDataPackIDs, chunkDataPackID) } - chdp := &flow.ChunkDataPack{ - ChunkID: schdp.ChunkID, - StartState: schdp.StartState, - Proof: schdp.Proof, - ExecutionDataRoot: schdp.ExecutionDataRoot, + // Remove the stored chunk data packs + if len(chunkDataPackIDs) > 0 { + storage.OnCommitSucceed(protocolDBBatch, func() { + // no errors expected during normal operation, even if no entries exist with the given IDs + err := ch.cdpStorage.Remove(chunkDataPackIDs) + if err != nil { + log.Fatal().Msgf("cannot remove stored chunk data packs: %v", err) + } + }) + } + + // Remove the chunk data pack ID mappings and update cache + for _, chunkID := range chunkIDs { + err := ch.chunkIDToChunkDataPackIDCache.RemoveTx(protocolDBBatch, chunkID) + if err != nil { + return nil, err + } } + return chunkDataPackIDs, nil +} - if !schdp.SystemChunk { - collection, err := ch.collections.ByID(schdp.CollectionID) +// BatchRemoveChunkDataPacksOnly removes multiple ChunkDataPacks with the given chunk IDs from chunk data pack database only. +// It does not remove the index mappings from ChunkID to chunkDataPackID in the protocol database. +// This method is useful for the runtime chunk data pack pruner to batch remove chunk data packs associated with a set of blocks. +// CAUTION: the chunk data pack batch is for chunk data pack database only, DO NOT pass a batch writer for protocol database. +// No errors are expected during normal operation, even if no entries are matched. +func (ch *ChunkDataPacks) BatchRemoveChunkDataPacksOnly(chunkIDs []flow.Identifier, chunkDataPackBatch storage.ReaderBatchWriter) error { + // First, collect all stored chunk data pack IDs that need to be removed + var chunkDataPackIDs []flow.Identifier + for _, chunkID := range chunkIDs { + chunkDataPackID, err := ch.chunkIDToChunkDataPackIDCache.Get(ch.protocolDB.Reader(), chunkID) // remove from cache optimistically if err != nil { - return nil, fmt.Errorf("could not retrive collection (id: %x) for stored chunk data pack: %w", schdp.CollectionID, err) + if errors.Is(err, storage.ErrNotFound) { + // If we can't find the stored chunk data pack ID, continue with other removals + // This handles the case where the chunk data pack was never properly stored + continue + } + + return fmt.Errorf("cannot retrieve stored chunk data pack ID for chunk %x: %w", chunkID, err) } + chunkDataPackIDs = append(chunkDataPackIDs, chunkDataPackID) + } - chdp.Collection = collection + // Remove the stored chunk data packs + if len(chunkDataPackIDs) > 0 { + err := ch.cdpStorage.BatchRemove(chunkDataPackIDs, chunkDataPackBatch) + if err != nil { + return fmt.Errorf("cannot remove stored chunk data packs: %w", err) + } } - return chdp, nil + // The chunk data pack pruner only removes the stored chunk data packs (in ch.stored). + // It does not delete the corresponding index mappings from the protocol database. + // These mappings should be cleaned up by the protocol DB pruner, which will be implemented later. + return nil } -func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*storage.StoredChunkDataPack, error) { - schdp, err := ch.retrieveCHDP(chunkID) +// ByChunkID returns the chunk data for the given chunk ID. +// It returns [storage.ErrNotFound] if no entry exists for the given chunk ID. +func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { + // First, retrieve the chunk data pack ID (using cache if available) + chunkDataPackID, err := ch.chunkIDToChunkDataPackIDCache.Get(ch.protocolDB.Reader(), chunkID) if err != nil { - return nil, fmt.Errorf("could not retrive stored chunk data pack: %w", err) + return nil, fmt.Errorf("cannot retrieve stored chunk data pack ID for chunk %x: %w", chunkID, err) } - return schdp, nil -} - -func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) (*storage.StoredChunkDataPack, error) { - val, err := ch.byChunkIDCache.Get(ch.db.Reader(), chunkID) + // Then retrieve the reduced representation of the chunk data pack via its ID. + schdp, err := ch.cdpStorage.ByID(chunkDataPackID) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot retrieve stored chunk data pack %x for chunk %x: %w", chunkDataPackID, chunkID, err) + } + + var collection *flow.Collection // nil by default, which only represents system chunk + if schdp.CollectionID != flow.ZeroID { + collection, err = ch.collections.ByID(schdp.CollectionID) + if err != nil { + return nil, fmt.Errorf("could not retrieve collection (id: %x) for stored chunk data pack: %w", schdp.CollectionID, err) + } } - return val, nil + + return flow.NewChunkDataPack(flow.UntrustedChunkDataPack{ + ChunkID: schdp.ChunkID, + StartState: schdp.StartState, + Proof: schdp.Proof, + Collection: collection, + ExecutionDataRoot: schdp.ExecutionDataRoot, + }) } diff --git a/storage/store/chunk_data_packs_stored.go b/storage/store/chunk_data_packs_stored.go new file mode 100644 index 00000000000..376e1cc2a77 --- /dev/null +++ b/storage/store/chunk_data_packs_stored.go @@ -0,0 +1,112 @@ +package store + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" +) + +// StoredChunkDataPacks represents persistent storage for chunk data packs. +// It works with the reduced representation `StoredChunkDataPack` for chunk data packs, +// where instead of the full collection data, only the collection's hash (ID) is contained. +type StoredChunkDataPacks struct { + db storage.DB + byIDCache *Cache[flow.Identifier, *storage.StoredChunkDataPack] +} + +var _ storage.StoredChunkDataPacks = (*StoredChunkDataPacks)(nil) + +func NewStoredChunkDataPacks(collector module.CacheMetrics, db storage.DB, byIDCacheSize uint) *StoredChunkDataPacks { + + retrieve := func(r storage.Reader, key flow.Identifier) (*storage.StoredChunkDataPack, error) { + var c storage.StoredChunkDataPack + err := operation.RetrieveChunkDataPack(r, key, &c) + return &c, err + } + + cache := newCache(collector, metrics.ResourceChunkDataPack, + withLimit[flow.Identifier, *storage.StoredChunkDataPack](byIDCacheSize), + withStore(operation.InsertChunkDataPack), + withRemove[flow.Identifier, *storage.StoredChunkDataPack](func(rw storage.ReaderBatchWriter, id flow.Identifier) error { + return operation.RemoveChunkDataPack(rw.Writer(), id) + }), + withRetrieve(retrieve), + ) + + ch := StoredChunkDataPacks{ + db: db, + byIDCache: cache, + } + return &ch +} + +// ByID returns the StoredChunkDataPack for the given ID. +// It returns [storage.ErrNotFound] if no entry exists for the given ID. +func (ch *StoredChunkDataPacks) ByID(chunkDataPackID flow.Identifier) (*storage.StoredChunkDataPack, error) { + val, err := ch.byIDCache.Get(ch.db.Reader(), chunkDataPackID) + if err != nil { + return nil, err + } + return val, nil +} + +// Remove removes multiple StoredChunkDataPacks cs keyed by their IDs in a batch. +// No error returns are expected during normal operation, even if none of the referenced objects exist in storage. +func (ch *StoredChunkDataPacks) Remove(ids []flow.Identifier) error { + return ch.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + for _, id := range ids { + err := ch.batchRemove(id, rw) + if err != nil { + return err + } + } + return nil + }) +} + +// BatchRemove removes multiple ChunkDataPacks with the given IDs from storage as part of the provided write batch. +// No error returns are expected during normal operation, even if no entries are matched. +func (ch *StoredChunkDataPacks) BatchRemove(chunkDataPackIDs []flow.Identifier, rw storage.ReaderBatchWriter) error { + for _, id := range chunkDataPackIDs { + err := ch.batchRemove(id, rw) + if err != nil { + return err + } + } + return nil +} + +func (ch *StoredChunkDataPacks) batchRemove(chunkDataPackID flow.Identifier, rw storage.ReaderBatchWriter) error { + return ch.byIDCache.RemoveTx(rw, chunkDataPackID) +} + +// StoreChunkDataPacks stores multiple StoredChunkDataPacks cs in a batch. +// It returns the chunk data pack IDs +// No error returns are expected during normal operation. +func (ch *StoredChunkDataPacks) StoreChunkDataPacks(cs []*storage.StoredChunkDataPack) ([]flow.Identifier, error) { + if len(cs) == 0 { + return nil, nil + } + ids := make([]flow.Identifier, 0, len(cs)) + + err := ch.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + for _, sc := range cs { + id := sc.ID() + err := ch.byIDCache.PutTx(rw, id, sc) + if err != nil { + return err + } + ids = append(ids, id) + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("cannot store chunk data packs: %w", err) + } + return ids, nil +} diff --git a/storage/store/chunk_data_packs_test.go b/storage/store/chunk_data_packs_test.go index acc5f77cb20..13055c88923 100644 --- a/storage/store/chunk_data_packs_test.go +++ b/storage/store/chunk_data_packs_test.go @@ -20,113 +20,263 @@ import ( // TestChunkDataPacks_Store evaluates correct storage and retrieval of chunk data packs in the storage. // It also evaluates that re-inserting is idempotent. func TestChunkDataPacks_Store(t *testing.T) { - WithChunkDataPacks(t, 100, func(t *testing.T, chunkDataPacks []*flow.ChunkDataPack, chunkDataPackStore *store.ChunkDataPacks, _ *pebble.DB, lockManager storage.LockManager) { - require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error { - require.NoError(t, chunkDataPackStore.StoreByChunkID(lctx, chunkDataPacks)) - return chunkDataPackStore.StoreByChunkID(lctx, chunkDataPacks) + WithChunkDataPacks(t, 100, func(t *testing.T, chunkDataPacks []*flow.ChunkDataPack, chunkDataPackStore *store.ChunkDataPacks, protocolDB storage.DB, chunkDataPackDB storage.DB, lockManager storage.LockManager) { + require.NoError(t, unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + + storeFunc, err := chunkDataPackStore.Store(chunkDataPacks) + if err != nil { + return err + } + err = protocolDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return storeFunc(lctx, rw) + }) + if err != nil { + return err + } + + // Verify chunk data packs are stored + for i, chunkDataPack := range chunkDataPacks { + stored, err := chunkDataPackStore.ByChunkID(chunkDataPack.ChunkID) + require.NoError(t, err) + require.Equal(t, chunkDataPack, stored, "mismatched chunk data pack at index %d", i) + } + + // Store again is idempotent + storeFunc, err = chunkDataPackStore.Store(chunkDataPacks) + if err != nil { + return err + } + err = protocolDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return storeFunc(lctx, rw) + }) + require.NoError(t, err) + return nil })) }) } -func TestChunkDataPack_Remove(t *testing.T) { - unittest.RunWithPebbleDB(t, func(pdb *pebble.DB) { - lockManager := storage.NewTestingLockManager() - db := pebbleimpl.ToDB(pdb) - transactions := store.NewTransactions(&metrics.NoopCollector{}, db) - collections := store.NewCollections(db, transactions) - // keep the cache size at 1 to make sure that entries are written and read from storage itself. - chunkDataPackStore := store.NewChunkDataPacks(&metrics.NoopCollector{}, db, collections, 1) - - chunkDataPacks := unittest.ChunkDataPacksFixture(10) - for _, chunkDataPack := range chunkDataPacks { - // store collection in Collections storage (which ChunkDataPacks store uses internally) - _, err := collections.Store(chunkDataPack.Collection) - require.NoError(t, err) - } +// TestChunkDataPacks_MissingItem evaluates querying a missing item returns a storage.ErrNotFound error. +func TestChunkDataPacks_MissingItem(t *testing.T) { + unittest.RunWithPebbleDB(t, func(protocolPdb *pebble.DB) { + unittest.RunWithPebbleDB(t, func(chunkDataPackPdb *pebble.DB) { + protocolDB := pebbleimpl.ToDB(protocolPdb) + chunkDataPackDB := pebbleimpl.ToDB(chunkDataPackPdb) - chunkIDs := make([]flow.Identifier, 0, len(chunkDataPacks)) - for _, chunk := range chunkDataPacks { - chunkIDs = append(chunkIDs, chunk.ChunkID) - } + transactions := store.NewTransactions(&metrics.NoopCollector{}, protocolDB) + collections := store.NewCollections(protocolDB, transactions) + stored := store.NewStoredChunkDataPacks(&metrics.NoopCollector{}, chunkDataPackDB, 10) + store1 := store.NewChunkDataPacks(&metrics.NoopCollector{}, protocolDB, stored, collections, 1) - require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error { - return chunkDataPackStore.StoreByChunkID(lctx, chunkDataPacks) - })) - require.NoError(t, chunkDataPackStore.Remove(chunkIDs)) + // attempt to get an invalid + _, err := store1.ByChunkID(unittest.IdentifierFixture()) + assert.ErrorIs(t, err, storage.ErrNotFound) + }) + }) +} + +// TestChunkDataPacks_BatchRemove tests the BatchRemove method which removes both protocol DB mappings and chunk data pack DB content. +func TestChunkDataPacks_BatchRemove(t *testing.T) { + unittest.RunWithPebbleDB(t, func(protocolPdb *pebble.DB) { + unittest.RunWithPebbleDB(t, func(chunkDataPackPdb *pebble.DB) { + lockManager := storage.NewTestingLockManager() + protocolDB := pebbleimpl.ToDB(protocolPdb) + chunkDataPackDB := pebbleimpl.ToDB(chunkDataPackPdb) - // verify it has been removed - _, err := chunkDataPackStore.ByChunkID(chunkIDs[0]) - assert.ErrorIs(t, err, storage.ErrNotFound) + transactions := store.NewTransactions(&metrics.NoopCollector{}, protocolDB) + collections := store.NewCollections(protocolDB, transactions) + stored := store.NewStoredChunkDataPacks(&metrics.NoopCollector{}, chunkDataPackDB, 10) + chunkDataPackStore := store.NewChunkDataPacks(&metrics.NoopCollector{}, protocolDB, stored, collections, 1) - // Removing again should not error - require.NoError(t, chunkDataPackStore.Remove(chunkIDs)) + chunkDataPacks := unittest.ChunkDataPacksFixture(5) + for _, chunkDataPack := range chunkDataPacks { + // store collection in Collections storage + _, err := collections.Store(chunkDataPack.Collection) + require.NoError(t, err) + } + + // Store chunk data packs + require.NoError(t, unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + storeFunc, err := chunkDataPackStore.Store(chunkDataPacks) + if err != nil { + return err + } + return protocolDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return storeFunc(lctx, rw) + }) + })) + + // Verify chunk data packs are stored + for _, chunkDataPack := range chunkDataPacks { + _, err := chunkDataPackStore.ByChunkID(chunkDataPack.ChunkID) + require.NoError(t, err) + } + + // Prepare chunk IDs for removal + chunkIDs := make([]flow.Identifier, len(chunkDataPacks)) + for i, chunkDataPack := range chunkDataPacks { + chunkIDs[i] = chunkDataPack.ChunkID + } + + // Test BatchRemove + require.NoError(t, protocolDB.WithReaderBatchWriter(func(protocolDBBatch storage.ReaderBatchWriter) error { + _, err := chunkDataPackStore.BatchRemove(chunkIDs, protocolDBBatch) + return err + })) + + // Verify chunk data packs are removed from both protocol and chunk data pack DBs + for _, chunkID := range chunkIDs { + _, err := chunkDataPackStore.ByChunkID(chunkID) + assert.ErrorIs(t, err, storage.ErrNotFound) + } + }) }) } -// TestChunkDataPacks_MissingItem evaluates querying a missing item returns a storage.ErrNotFound error. -func TestChunkDataPacks_MissingItem(t *testing.T) { - unittest.RunWithPebbleDB(t, func(pdb *pebble.DB) { - db := pebbleimpl.ToDB(pdb) - transactions := store.NewTransactions(&metrics.NoopCollector{}, db) - collections := store.NewCollections(db, transactions) - store1 := store.NewChunkDataPacks(&metrics.NoopCollector{}, db, collections, 1) - - // attempt to get an invalid - _, err := store1.ByChunkID(unittest.IdentifierFixture()) - assert.ErrorIs(t, err, storage.ErrNotFound) +// TestChunkDataPacks_BatchRemoveChunkDataPacksOnly tests the BatchRemoveChunkDataPacksOnly method +// which removes only from chunk data pack DB, leaving protocol DB mappings intact. +func TestChunkDataPacks_BatchRemoveChunkDataPacksOnly(t *testing.T) { + unittest.RunWithPebbleDB(t, func(protocolPdb *pebble.DB) { + unittest.RunWithPebbleDB(t, func(chunkDataPackPdb *pebble.DB) { + lockManager := storage.NewTestingLockManager() + protocolDB := pebbleimpl.ToDB(protocolPdb) + chunkDataPackDB := pebbleimpl.ToDB(chunkDataPackPdb) + + transactions := store.NewTransactions(&metrics.NoopCollector{}, protocolDB) + collections := store.NewCollections(protocolDB, transactions) + stored := store.NewStoredChunkDataPacks(&metrics.NoopCollector{}, chunkDataPackDB, 10) + chunkDataPackStore := store.NewChunkDataPacks(&metrics.NoopCollector{}, protocolDB, stored, collections, 1) + + chunkDataPacks := unittest.ChunkDataPacksFixture(5) + for _, chunkDataPack := range chunkDataPacks { + // store collection in Collections storage + _, err := collections.Store(chunkDataPack.Collection) + require.NoError(t, err) + } + + // Store chunk data packs + require.NoError(t, unittest.WithLock(t, lockManager, storage.LockIndexChunkDataPackByChunkID, func(lctx lockctx.Context) error { + storeFunc, err := chunkDataPackStore.Store(chunkDataPacks) + if err != nil { + return err + } + return protocolDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return storeFunc(lctx, rw) + }) + })) + + // Verify chunk data packs are stored + for _, chunkDataPack := range chunkDataPacks { + _, err := chunkDataPackStore.ByChunkID(chunkDataPack.ChunkID) + require.NoError(t, err) + } + + // Prepare chunk IDs for removal + chunkIDs := make([]flow.Identifier, len(chunkDataPacks)) + chunkDataPackIDs := make([]flow.Identifier, len(chunkDataPacks)) + for i, chunkDataPack := range chunkDataPacks { + chunkIDs[i] = chunkDataPack.ChunkID + chunkDataPackIDs = append(chunkDataPackIDs, chunkDataPack.ID()) + } + + // Test BatchRemoveChunkDataPacksOnly - verify it can be called without error + require.NoError(t, chunkDataPackDB.WithReaderBatchWriter(func(chunkDataPackDBBatch storage.ReaderBatchWriter) error { + return chunkDataPackStore.BatchRemoveChunkDataPacksOnly(chunkIDs, chunkDataPackDBBatch) + })) + + // Verify the chunk data packs have been removed from chunk data pack DB + for _, id := range chunkDataPackIDs { + _, err := stored.ByID(id) + assert.ErrorIs(t, err, storage.ErrNotFound) + } + }) }) } -// TestChunkDataPacks_StoreTwice evaluates that storing the same chunk data pack twice -// does not result in an error. -func TestChunkDataPacks_StoreTwice(t *testing.T) { - WithChunkDataPacks(t, 2, func(t *testing.T, chunkDataPacks []*flow.ChunkDataPack, chunkDataPackStore *store.ChunkDataPacks, pdb *pebble.DB, lockManager storage.LockManager) { - db := pebbleimpl.ToDB(pdb) - transactions := store.NewTransactions(&metrics.NoopCollector{}, db) - collections := store.NewCollections(db, transactions) - store1 := store.NewChunkDataPacks(&metrics.NoopCollector{}, db, collections, 1) - require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error { - require.NoError(t, store1.StoreByChunkID(lctx, chunkDataPacks)) - - // sanity-check first that chunk data packs are stored, before attempting to store them again. - for _, c := range chunkDataPacks { - c2, err := store1.ByChunkID(c.ChunkID) - require.NoError(t, err) - require.Equal(t, c, c2) +// TestChunkDataPacks_BatchRemoveNonExistent tests that BatchRemove handles non-existent chunk IDs gracefully. +func TestChunkDataPacks_BatchRemoveNonExistent(t *testing.T) { + unittest.RunWithPebbleDB(t, func(protocolPdb *pebble.DB) { + unittest.RunWithPebbleDB(t, func(chunkDataPackPdb *pebble.DB) { + protocolDB := pebbleimpl.ToDB(protocolPdb) + chunkDataPackDB := pebbleimpl.ToDB(chunkDataPackPdb) + + transactions := store.NewTransactions(&metrics.NoopCollector{}, protocolDB) + collections := store.NewCollections(protocolDB, transactions) + stored := store.NewStoredChunkDataPacks(&metrics.NoopCollector{}, chunkDataPackDB, 10) + chunkDataPackStore := store.NewChunkDataPacks(&metrics.NoopCollector{}, protocolDB, stored, collections, 1) + + // Create some non-existent chunk IDs + nonExistentChunkIDs := make([]flow.Identifier, 3) + for i := range nonExistentChunkIDs { + nonExistentChunkIDs[i] = unittest.IdentifierFixture() } - return store1.StoreByChunkID(lctx, chunkDataPacks) - })) + // Test BatchRemove with non-existent chunk IDs should not error + require.NoError(t, protocolDB.WithReaderBatchWriter(func(protocolDBBatch storage.ReaderBatchWriter) error { + _, err := chunkDataPackStore.BatchRemove(nonExistentChunkIDs, protocolDBBatch) + return err + })) + }) + }) +} + +// TestChunkDataPacks_BatchRemoveChunkDataPacksOnlyNonExistent tests that BatchRemoveChunkDataPacksOnly handles non-existent chunk IDs gracefully. +func TestChunkDataPacks_BatchRemoveChunkDataPacksOnlyNonExistent(t *testing.T) { + unittest.RunWithPebbleDB(t, func(protocolPdb *pebble.DB) { + unittest.RunWithPebbleDB(t, func(chunkDataPackPdb *pebble.DB) { + protocolDB := pebbleimpl.ToDB(protocolPdb) + chunkDataPackDB := pebbleimpl.ToDB(chunkDataPackPdb) + + transactions := store.NewTransactions(&metrics.NoopCollector{}, protocolDB) + collections := store.NewCollections(protocolDB, transactions) + stored := store.NewStoredChunkDataPacks(&metrics.NoopCollector{}, chunkDataPackDB, 10) + chunkDataPackStore := store.NewChunkDataPacks(&metrics.NoopCollector{}, protocolDB, stored, collections, 1) + + // Create some non-existent chunk IDs + nonExistentChunkIDs := make([]flow.Identifier, 3) + for i := range nonExistentChunkIDs { + nonExistentChunkIDs[i] = unittest.IdentifierFixture() + } + + // Test BatchRemoveChunkDataPacksOnly with non-existent chunk IDs should not error + require.NoError(t, chunkDataPackDB.WithReaderBatchWriter(func(chunkDataPackDBBatch storage.ReaderBatchWriter) error { + return chunkDataPackStore.BatchRemoveChunkDataPacksOnly(nonExistentChunkIDs, chunkDataPackDBBatch) + })) + }) }) } // WithChunkDataPacks is a test helper that generates specified number of chunk data packs, store1 them using the storeFunc, and // then evaluates whether they are successfully retrieved from storage. -func WithChunkDataPacks(t *testing.T, chunks int, storeFunc func(*testing.T, []*flow.ChunkDataPack, *store.ChunkDataPacks, *pebble.DB, storage.LockManager)) { - unittest.RunWithPebbleDB(t, func(pdb *pebble.DB) { - lockManager := storage.NewTestingLockManager() - db := pebbleimpl.ToDB(pdb) - transactions := store.NewTransactions(&metrics.NoopCollector{}, db) - collections := store.NewCollections(db, transactions) - // keep the cache size at 1 to make sure that entries are written and read from storage itself. - store1 := store.NewChunkDataPacks(&metrics.NoopCollector{}, db, collections, 1) - - chunkDataPacks := unittest.ChunkDataPacksFixture(chunks) - for _, chunkDataPack := range chunkDataPacks { - // store collection in Collections storage (which ChunkDataPacks store uses internally) - _, err := collections.Store(chunkDataPack.Collection) - require.NoError(t, err) - } +func WithChunkDataPacks(t *testing.T, chunks int, storeFunc func(*testing.T, []*flow.ChunkDataPack, *store.ChunkDataPacks, storage.DB, storage.DB, storage.LockManager)) { + unittest.RunWithPebbleDB(t, func(protocolPdb *pebble.DB) { + unittest.RunWithPebbleDB(t, func(chunkDataPackPdb *pebble.DB) { + lockManager := storage.NewTestingLockManager() + protocolDB := pebbleimpl.ToDB(protocolPdb) + chunkDataPackDB := pebbleimpl.ToDB(chunkDataPackPdb) - // store chunk data packs in the memory using provided store function. - storeFunc(t, chunkDataPacks, store1, pdb, lockManager) + transactions := store.NewTransactions(&metrics.NoopCollector{}, protocolDB) + collections := store.NewCollections(protocolDB, transactions) + // keep the cache size at 1 to make sure that entries are written and read from storage itself. + stored := store.NewStoredChunkDataPacks(&metrics.NoopCollector{}, chunkDataPackDB, 10) + store1 := store.NewChunkDataPacks(&metrics.NoopCollector{}, protocolDB, stored, collections, 1) - // store1d chunk data packs should be retrieved successfully. - for _, expected := range chunkDataPacks { - actual, err := store1.ByChunkID(expected.ChunkID) - require.NoError(t, err) + chunkDataPacks := unittest.ChunkDataPacksFixture(chunks) + for _, chunkDataPack := range chunkDataPacks { + // store collection in Collections storage (which ChunkDataPacks store uses internally) + _, err := collections.Store(chunkDataPack.Collection) + require.NoError(t, err) + } + + // store chunk data packs in the memory using provided store function. + storeFunc(t, chunkDataPacks, store1, protocolDB, chunkDataPackDB, lockManager) - assert.Equal(t, expected, actual) - } + // store1d chunk data packs should be retrieved successfully. + for _, expected := range chunkDataPacks { + actual, err := store1.ByChunkID(expected.ChunkID) + require.NoError(t, err) + + assert.Equal(t, expected, actual) + } + }) }) }