Skip to content

Commit ca73541

Browse files
authored
Merge pull request #7939 from onflow/leo/refactor-insert-chunk-data-pack
[Storage] Refactor insert chunk data pack
2 parents 97f6643 + c05cee2 commit ca73541

File tree

14 files changed

+246
-118
lines changed

14 files changed

+246
-118
lines changed

engine/execution/pruner/core_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) {
7575
require.NoError(t, err)
7676
require.NoError(t, results.Store(chunk.Result))
7777
require.NoError(t, results.Index(chunk.Result.BlockID, chunk.Result.ID()))
78-
require.NoError(t, chunkDataPacks.Store([]*flow.ChunkDataPack{chunk.ChunkDataPack}))
78+
require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error {
79+
return chunkDataPacks.StoreByChunkID(lctx, []*flow.ChunkDataPack{chunk.ChunkDataPack})
80+
}))
7981
_, storeErr := collections.Store(chunk.ChunkDataPack.Collection)
8082
require.NoError(t, storeErr)
8183
// verify that chunk data pack fixture can be found by the result

engine/execution/state/state.go

Lines changed: 58 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,8 @@ func (s *state) SaveExecutionResults(
397397
return nil
398398
}
399399

400+
// saveExecutionResults saves all data related to the execution of a block.
401+
// It is concurrent-safe
400402
func (s *state) saveExecutionResults(
401403
ctx context.Context,
402404
result *execution.ComputationResult,
@@ -408,73 +410,72 @@ func (s *state) saveExecutionResults(
408410
return fmt.Errorf("can not retrieve chunk data packs: %w", err)
409411
}
410412

411-
err = s.chunkDataPacks.Store(chunks)
412-
if err != nil {
413-
return fmt.Errorf("can not store multiple chunk data pack: %w", err)
414-
}
413+
// Acquire both locks to ensure it's concurrent safe when inserting the execution results and chunk data packs.
414+
return storage.WithLocks(s.lockManager, []string{storage.LockInsertOwnReceipt, storage.LockInsertChunkDataPack}, func(lctx lockctx.Context) error {
415+
err := s.chunkDataPacks.StoreByChunkID(lctx, chunks)
416+
if err != nil {
417+
return fmt.Errorf("can not store multiple chunk data pack: %w", err)
418+
}
415419

416-
lctx := s.lockManager.NewContext()
417-
defer lctx.Release()
418-
err = lctx.AcquireLock(storage.LockInsertOwnReceipt)
419-
if err != nil {
420-
return err
421-
}
420+
// Save entire execution result (including all chunk data packs) within one batch to minimize
421+
// the number of database interactions.
422+
return s.db.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error {
423+
batch.AddCallback(func(err error) {
424+
// Rollback if an error occurs during batch operations
425+
// Chunk data packs are saved in a separate database, there is a chance
426+
// that execution result was failed to save, but chunk data packs was saved and
427+
// didnt get removed.
428+
// TODO(leo): when retrieving chunk data packs, we need to add a check to ensure the block
429+
// has been executed before returning chunk data packs
430+
if err != nil {
431+
chunkIDs := make([]flow.Identifier, 0, len(chunks))
432+
for _, chunk := range chunks {
433+
chunkIDs = append(chunkIDs, chunk.ChunkID)
434+
}
435+
_ = s.chunkDataPacks.Remove(chunkIDs)
436+
}
437+
})
422438

423-
// Save entire execution result (including all chunk data packs) within one batch to minimize
424-
// the number of database interactions. This is a large batch of data, which might not be
425-
// committed within a single operation (e.g. if using Badger DB as storage backend, which has
426-
// a size limit for its transactions).
427-
return s.db.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error {
428-
batch.AddCallback(func(err error) {
429-
// Rollback if an error occurs during batch operations
439+
err = s.events.BatchStore(blockID, []flow.EventsList{result.AllEvents()}, batch)
430440
if err != nil {
431-
chunkIDs := make([]flow.Identifier, 0, len(chunks))
432-
for _, chunk := range chunks {
433-
chunkIDs = append(chunkIDs, chunk.ChunkID)
434-
}
435-
_ = s.chunkDataPacks.Remove(chunkIDs)
441+
return fmt.Errorf("cannot store events: %w", err)
436442
}
437-
})
438443

439-
err = s.events.BatchStore(blockID, []flow.EventsList{result.AllEvents()}, batch)
440-
if err != nil {
441-
return fmt.Errorf("cannot store events: %w", err)
442-
}
443-
444-
err = s.serviceEvents.BatchStore(blockID, result.AllServiceEvents(), batch)
445-
if err != nil {
446-
return fmt.Errorf("cannot store service events: %w", err)
447-
}
444+
err = s.serviceEvents.BatchStore(blockID, result.AllServiceEvents(), batch)
445+
if err != nil {
446+
return fmt.Errorf("cannot store service events: %w", err)
447+
}
448448

449-
err = s.transactionResults.BatchStore(
450-
blockID,
451-
result.AllTransactionResults(),
452-
batch)
453-
if err != nil {
454-
return fmt.Errorf("cannot store transaction result: %w", err)
455-
}
449+
err = s.transactionResults.BatchStore(
450+
blockID,
451+
result.AllTransactionResults(),
452+
batch)
453+
if err != nil {
454+
return fmt.Errorf("cannot store transaction result: %w", err)
455+
}
456456

457-
executionResult := &result.ExecutionReceipt.ExecutionResult
458-
// saving my receipts will also save the execution result
459-
err = s.myReceipts.BatchStoreMyReceipt(lctx, result.ExecutionReceipt, batch)
460-
if err != nil {
461-
return fmt.Errorf("could not persist execution result: %w", err)
462-
}
457+
executionResult := &result.ExecutionReceipt.ExecutionResult
458+
// saving my receipts will also save the execution result
459+
err = s.myReceipts.BatchStoreMyReceipt(lctx, result.ExecutionReceipt, batch)
460+
if err != nil {
461+
return fmt.Errorf("could not persist execution result: %w", err)
462+
}
463463

464-
err = s.results.BatchIndex(blockID, executionResult.ID(), batch)
465-
if err != nil {
466-
return fmt.Errorf("cannot index execution result: %w", err)
467-
}
464+
err = s.results.BatchIndex(blockID, executionResult.ID(), batch)
465+
if err != nil {
466+
return fmt.Errorf("cannot index execution result: %w", err)
467+
}
468468

469-
// the state commitment is the last data item to be stored, so that
470-
// IsBlockExecuted can be implemented by checking whether state commitment exists
471-
// in the database
472-
err = s.commits.BatchStore(lctx, blockID, result.CurrentEndState(), batch)
473-
if err != nil {
474-
return fmt.Errorf("cannot store state commitment: %w", err)
475-
}
469+
// the state commitment is the last data item to be stored, so that
470+
// IsBlockExecuted can be implemented by checking whether state commitment exists
471+
// in the database
472+
err = s.commits.BatchStore(lctx, blockID, result.CurrentEndState(), batch)
473+
if err != nil {
474+
return fmt.Errorf("cannot store state commitment: %w", err)
475+
}
476476

477-
return nil
477+
return nil
478+
})
478479
})
479480
}
480481

engine/execution/state/state_storehouse_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func prepareStorehouseTest(f func(t *testing.T, es state.ExecutionState, l *ledg
6161
txResults := storagemock.NewTransactionResults(t)
6262
txResults.On("BatchStore", mock.Anything, mock.Anything, mock.Anything).Return(nil)
6363
chunkDataPacks := storagemock.NewChunkDataPacks(t)
64-
chunkDataPacks.On("Store", mock.Anything).Return(nil)
64+
chunkDataPacks.On("StoreByChunkID", mock.Anything, mock.Anything).Return(nil)
6565
results := storagemock.NewExecutionResults(t)
6666
results.On("BatchIndex", mock.Anything, mock.Anything, mock.Anything).Return(nil)
6767
myReceipts := storagemock.NewMyExecutionReceipts(t)

model/flow/chunk.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,3 +362,23 @@ func stringsToCids(strs []string) ([]cid.Cid, error) {
362362
}
363363
return cids, nil
364364
}
365+
366+
// Equals returns true if and only if receiver BlockExecutionDataRoot is equal to the `other`.
367+
func (b BlockExecutionDataRoot) Equals(other BlockExecutionDataRoot) bool {
368+
// Compare BlockID fields
369+
if b.BlockID != other.BlockID {
370+
return false
371+
}
372+
373+
// Compare ChunkExecutionDataIDs slices
374+
if len(b.ChunkExecutionDataIDs) != len(other.ChunkExecutionDataIDs) {
375+
return false
376+
}
377+
for i, cid := range b.ChunkExecutionDataIDs {
378+
if !cid.Equals(other.ChunkExecutionDataIDs[i]) {
379+
return false
380+
}
381+
}
382+
383+
return true
384+
}

module/block_iterator/executor/executor_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/cockroachdb/pebble/v2"
11+
"github.com/jordanschalm/lockctx"
1112
"github.com/stretchr/testify/require"
1213

1314
"github.com/onflow/flow-go/model/flow"
@@ -23,6 +24,7 @@ import (
2324
// verify the executor is able to iterate through all blocks from the iterator.
2425
func TestExecute(t *testing.T) {
2526
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
27+
lockManager := storage.NewTestingLockManager()
2628
blockCount := 10
2729

2830
// prepare data
@@ -39,8 +41,10 @@ func TestExecute(t *testing.T) {
3941
// store the chunk data packs to be pruned later
4042
for _, cdp := range cdps {
4143
sc := storage.ToStoredChunkDataPack(cdp)
42-
require.NoError(t, pdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
43-
return operation.InsertChunkDataPack(rw.Writer(), sc)
44+
require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error {
45+
return pdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
46+
return operation.InsertChunkDataPack(lctx, rw, sc)
47+
})
4448
}))
4549
}
4650

@@ -72,6 +76,7 @@ func TestExecute(t *testing.T) {
7276
// verify the pruning can be interrupted and resumed
7377
func TestExecuteCanBeResumed(t *testing.T) {
7478
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
79+
lockManager := storage.NewTestingLockManager()
7580
blockCount := 10
7681

7782
cdps := make([]*flow.ChunkDataPack, 0, blockCount)
@@ -87,8 +92,10 @@ func TestExecuteCanBeResumed(t *testing.T) {
8792
// store the chunk data packs to be pruned later
8893
for _, cdp := range cdps {
8994
sc := storage.ToStoredChunkDataPack(cdp)
90-
require.NoError(t, pdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
91-
return operation.InsertChunkDataPack(rw.Writer(), sc)
95+
require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error {
96+
return pdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
97+
return operation.InsertChunkDataPack(lctx, rw, sc)
98+
})
9299
}))
93100
}
94101

module/pruner/pruners/chunk_data_pack_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77

88
"github.com/cockroachdb/pebble/v2"
9+
"github.com/jordanschalm/lockctx"
910
"github.com/stretchr/testify/require"
1011

1112
"github.com/onflow/flow-go/module/metrics"
@@ -18,6 +19,7 @@ import (
1819
func TestChunkDataPackPruner(t *testing.T) {
1920

2021
unittest.RunWithPebbleDB(t, func(pebbleDB *pebble.DB) {
22+
lockManager := storage.NewTestingLockManager()
2123
m := metrics.NewNoopCollector()
2224
db := pebbleimpl.ToDB(pebbleDB)
2325
results := store.NewExecutionResults(m, db)
@@ -29,7 +31,9 @@ func TestChunkDataPackPruner(t *testing.T) {
2931
// store the chunks
3032
cdp1, result1 := unittest.ChunkDataPacksFixtureAndResult()
3133
require.NoError(t, results.Store(result1))
32-
require.NoError(t, chunks.Store(cdp1))
34+
require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertChunkDataPack, func(lctx lockctx.Context) error {
35+
return chunks.StoreByChunkID(lctx, cdp1)
36+
}))
3337

3438
pruner := NewChunkDataPackPruner(chunks, results)
3539

storage/chunk_data_packs.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package storage
22

33
import (
4+
"bytes"
5+
6+
"github.com/jordanschalm/lockctx"
7+
48
"github.com/onflow/flow-go/model/flow"
59
)
610

@@ -9,7 +13,7 @@ type ChunkDataPacks interface {
913

1014
// Store stores multiple ChunkDataPacks cs keyed by their ChunkIDs in a batch.
1115
// No errors are expected during normal operation, but it may return generic error
12-
Store(cs []*flow.ChunkDataPack) error
16+
StoreByChunkID(lctx lockctx.Proof, cs []*flow.ChunkDataPack) error
1317

1418
// Remove removes multiple ChunkDataPacks cs keyed by their ChunkIDs in a batch.
1519
// No errors are expected during normal operation, but it may return generic error
@@ -54,3 +58,25 @@ func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack {
5458

5559
return sc
5660
}
61+
62+
func (c StoredChunkDataPack) Equals(other StoredChunkDataPack) error {
63+
if c.ChunkID != other.ChunkID {
64+
return ErrDataMismatch
65+
}
66+
if c.StartState != other.StartState {
67+
return ErrDataMismatch
68+
}
69+
if !c.ExecutionDataRoot.Equals(other.ExecutionDataRoot) {
70+
return ErrDataMismatch
71+
}
72+
if c.SystemChunk != other.SystemChunk {
73+
return ErrDataMismatch
74+
}
75+
if !bytes.Equal(c.Proof, other.Proof) {
76+
return ErrDataMismatch
77+
}
78+
if c.CollectionID != other.CollectionID {
79+
return ErrDataMismatch
80+
}
81+
return nil
82+
}

storage/locks.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
LockInsertCollection = "lock_insert_collection"
2727
// LockBootstrapping protects data that is *exclusively* written during bootstrapping.
2828
LockBootstrapping = "lock_bootstrapping"
29+
// LockInsertChunkDataPack protects the insertion of chunk data packs (not yet used anywhere
30+
LockInsertChunkDataPack = "lock_insert_chunk_data_pack"
2931
)
3032

3133
// Locks returns a list of all named locks used by the storage layer.
@@ -38,6 +40,7 @@ func Locks() []string {
3840
LockInsertOwnReceipt,
3941
LockInsertCollection,
4042
LockBootstrapping,
43+
LockInsertChunkDataPack,
4144
}
4245
}
4346

@@ -61,6 +64,7 @@ func makeLockPolicy() lockctx.Policy {
6164
return lockctx.NewDAGPolicyBuilder().
6265
Add(LockInsertBlock, LockFinalizeBlock).
6366
Add(LockFinalizeBlock, LockBootstrapping).
67+
Add(LockInsertOwnReceipt, LockInsertChunkDataPack).
6468
Build()
6569
}
6670

@@ -110,3 +114,24 @@ func MakeSingletonLockManager() lockctx.Manager {
110114
func NewTestingLockManager() lockctx.Manager {
111115
return lockctx.NewManager(Locks(), makeLockPolicy())
112116
}
117+
118+
// WithLock is a helper function that creates a new lock context, acquires the specified lock,
119+
// and executes the provided function within that context.
120+
// This function passes through any errors returned by fn.
121+
func WithLock(manager lockctx.Manager, lockID string, fn func(lctx lockctx.Context) error) error {
122+
return WithLocks(manager, []string{lockID}, fn)
123+
}
124+
125+
// WithLocks is a helper function that creates a new lock context, acquires the specified locks,
126+
// and executes the provided function within that context.
127+
// This function passes through any errors returned by fn.
128+
func WithLocks(manager lockctx.Manager, lockIDs []string, fn func(lctx lockctx.Context) error) error {
129+
lctx := manager.NewContext()
130+
defer lctx.Release()
131+
for _, lockID := range lockIDs {
132+
if err := lctx.AcquireLock(lockID); err != nil {
133+
return err
134+
}
135+
}
136+
return fn(lctx)
137+
}

storage/mock/chunk_data_packs.go

Lines changed: 8 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)