-
Notifications
You must be signed in to change notification settings - Fork 197
[Storage] Refactor stored chunk data pack #7983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
05faf10
a75653e
82f14f0
e690d56
70f3cc6
c998e28
9ab5e0a
6a4b59e
17821a5
848c9e7
fe584bd
cbc5873
936f38b
7ad596e
e875957
044325d
d8e884d
24744c5
9d287d6
be6f1ce
5e0e6a1
c37af9c
7fa1ef6
d22661a
58cd332
8dbfdc1
9d7cddd
792c69f
d7a2241
506dd4c
64a1cbd
4a4f529
af1d4da
6389c09
944e733
f8abc95
bd59da0
b83837d
82022dc
bb7a347
c585f6b
be3df47
9467928
944cfc6
c4e9b15
fb47317
8f75eaa
74ceef6
050ac83
704e30b
49cc4e9
c4ab85d
5d63b5f
ed453bb
96ebd52
c4cd113
8da12cf
d0a810b
d9abbbc
fbf4612
903d24f
08fee4f
50fd2d9
0d924c1
713f6c9
8ebaa82
72259e4
1960afc
22f5655
613f49c
06a47ba
ed315a9
94a3825
ae9aaad
36a9e08
a6ccf29
2a8093e
39ff4a5
52b05be
28a3fae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -87,16 +87,14 @@ func runE(*cobra.Command, []string) error { | |||||||
return fmt.Errorf("could not open chunk data pack DB at %v: %w", flagChunkDataPackDir, err) | ||||||||
} | ||||||||
chunkDataPacksDB := pebbleimpl.ToDB(chunkDataPacksPebbleDB) | ||||||||
chunkDataPacks := store.NewChunkDataPacks(metrics, chunkDataPacksDB, collections, 1000) | ||||||||
chunkBatch := chunkDataPacksDB.NewBatch() | ||||||||
defer chunkBatch.Close() | ||||||||
|
||||||||
writeBatch := db.NewBatch() | ||||||||
defer writeBatch.Close() | ||||||||
|
||||||||
err = removeExecutionResultsFromHeight( | ||||||||
writeBatch, | ||||||||
chunkBatch, | ||||||||
storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, chunkDataPacksDB, 1000) | ||||||||
chunkDataPacks := store.NewChunkDataPacks(metrics, chunkDataPacksDB, storedChunkDataPacks, collections, 1000) | ||||||||
protocolDBBatch := db.NewBatch() | ||||||||
defer protocolDBBatch.Close() | ||||||||
|
||||||||
// collect chunk IDs to be removed | ||||||||
chunkIDs, err := removeExecutionResultsFromHeight( | ||||||||
protocolDBBatch, | ||||||||
state, | ||||||||
transactionResults, | ||||||||
commits, | ||||||||
|
@@ -112,12 +110,19 @@ func runE(*cobra.Command, []string) error { | |||||||
} | ||||||||
|
||||||||
// remove chunk data packs first, because otherwise the index to find chunk data pack will be removed. | ||||||||
err = chunkBatch.Commit() | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not commit chunk batch at %v: %w", flagHeight, err) | ||||||||
if len(chunkIDs) > 0 { | ||||||||
chunkDataPackIDs, err := chunkDataPacks.BatchRemove(chunkIDs, protocolDBBatch) | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not remove chunk data packs at %v: %w", flagHeight, err) | ||||||||
} | ||||||||
|
||||||||
err = storedChunkDataPacks.Remove(chunkDataPackIDs) | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not commit chunk batch at %v: %w", flagHeight, err) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
err = writeBatch.Commit() | ||||||||
err = protocolDBBatch.Commit() | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not flush write batch at %v: %w", flagHeight, err) | ||||||||
} | ||||||||
|
@@ -141,8 +146,7 @@ func runE(*cobra.Command, []string) error { | |||||||
// use badger instances directly instead of stroage interfaces so that the interface don't | ||||||||
// need to include the Remove methods | ||||||||
Comment on lines
146
to
147
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this doc could use an update, please. |
||||||||
func removeExecutionResultsFromHeight( | ||||||||
writeBatch storage.Batch, | ||||||||
chunkBatch storage.Batch, | ||||||||
protocolDBBatch storage.Batch, | ||||||||
protoState protocol.State, | ||||||||
transactionResults storage.TransactionResults, | ||||||||
commits storage.Commits, | ||||||||
|
@@ -151,40 +155,42 @@ func removeExecutionResultsFromHeight( | |||||||
myReceipts storage.MyExecutionReceipts, | ||||||||
events storage.Events, | ||||||||
serviceEvents storage.ServiceEvents, | ||||||||
fromHeight uint64) error { | ||||||||
fromHeight uint64) ([]flow.Identifier, error) { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to put the return value on the next line. Thereby, the function body is visually separated from eh header via a line without indentation. It helps human brains to quicker structure the visually seen data according to broad structural information (color, indentation, etc)
Suggested change
|
||||||||
log.Info().Msgf("removing results for blocks from height: %v", fromHeight) | ||||||||
|
||||||||
root := protoState.Params().FinalizedRoot() | ||||||||
|
||||||||
if fromHeight <= root.Height { | ||||||||
return fmt.Errorf("can only remove results for block above root block. fromHeight: %v, rootHeight: %v", fromHeight, root.Height) | ||||||||
return nil, fmt.Errorf("can only remove results for block above root block. fromHeight: %v, rootHeight: %v", fromHeight, root.Height) | ||||||||
} | ||||||||
|
||||||||
final, err := protoState.Final().Head() | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could get not finalized height: %w", err) | ||||||||
return nil, fmt.Errorf("could get not finalized height: %w", err) | ||||||||
} | ||||||||
|
||||||||
if fromHeight > final.Height { | ||||||||
return fmt.Errorf("could not remove results for unfinalized height: %v, finalized height: %v", fromHeight, final.Height) | ||||||||
return nil, fmt.Errorf("could not remove results for unfinalized height: %v, finalized height: %v", fromHeight, final.Height) | ||||||||
} | ||||||||
|
||||||||
finalRemoved := 0 | ||||||||
total := int(final.Height-fromHeight) + 1 | ||||||||
var allChunkIDs []flow.Identifier | ||||||||
|
||||||||
// removing for finalized blocks | ||||||||
for height := fromHeight; height <= final.Height; height++ { | ||||||||
head, err := protoState.AtHeight(height).Head() | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not get header at height: %w", err) | ||||||||
return nil, fmt.Errorf("could not get header at height: %w", err) | ||||||||
} | ||||||||
|
||||||||
blockID := head.ID() | ||||||||
|
||||||||
err = removeForBlockID(writeBatch, chunkBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID) | ||||||||
chunkIDs, err := removeForBlockID(protocolDBBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID) | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err) | ||||||||
return nil, fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err) | ||||||||
} | ||||||||
allChunkIDs = append(allChunkIDs, chunkIDs...) | ||||||||
|
||||||||
finalRemoved++ | ||||||||
log.Info().Msgf("result at height %v has been removed. progress (%v/%v)", height, finalRemoved, total) | ||||||||
|
@@ -193,18 +199,18 @@ func removeExecutionResultsFromHeight( | |||||||
// removing for pending blocks | ||||||||
pendings, err := protoState.Final().Descendants() | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not get pending block: %w", err) | ||||||||
return nil, fmt.Errorf("could not get pending block: %w", err) | ||||||||
} | ||||||||
|
||||||||
pendingRemoved := 0 | ||||||||
total = len(pendings) | ||||||||
|
||||||||
for _, pending := range pendings { | ||||||||
err = removeForBlockID(writeBatch, chunkBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending) | ||||||||
|
||||||||
chunkIDs, err := removeForBlockID(protocolDBBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending) | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not remove result for pending block %v: %w", pending, err) | ||||||||
return nil, fmt.Errorf("could not remove result for pending block %v: %w", pending, err) | ||||||||
} | ||||||||
allChunkIDs = append(allChunkIDs, chunkIDs...) | ||||||||
|
||||||||
pendingRemoved++ | ||||||||
log.Info().Msgf("result for pending block %v has been removed. progress (%v/%v) ", pending, pendingRemoved, total) | ||||||||
|
@@ -213,15 +219,14 @@ func removeExecutionResultsFromHeight( | |||||||
log.Info().Msgf("removed height from %v. removed for %v finalized blocks, and %v pending blocks", | ||||||||
fromHeight, finalRemoved, pendingRemoved) | ||||||||
|
||||||||
return nil | ||||||||
return allChunkIDs, nil | ||||||||
} | ||||||||
|
||||||||
// removeForBlockID remove block execution related data for a given block. | ||||||||
// All data to be removed will be removed in a batch write. | ||||||||
// It bubbles up any error encountered | ||||||||
func removeForBlockID( | ||||||||
writeBatch storage.Batch, | ||||||||
chunkBatch storage.Batch, | ||||||||
protocolDBBatch storage.Batch, | ||||||||
commits storage.Commits, | ||||||||
transactionResults storage.TransactionResults, | ||||||||
results storage.ExecutionResults, | ||||||||
|
@@ -230,74 +235,70 @@ func removeForBlockID( | |||||||
events storage.Events, | ||||||||
serviceEvents storage.ServiceEvents, | ||||||||
blockID flow.Identifier, | ||||||||
) error { | ||||||||
) ([]flow.Identifier, error) { | ||||||||
result, err := results.ByBlockID(blockID) | ||||||||
if errors.Is(err, storage.ErrNotFound) { | ||||||||
log.Info().Msgf("result not found for block %v", blockID) | ||||||||
return nil | ||||||||
return nil, nil | ||||||||
} | ||||||||
|
||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not find result for block %v: %w", blockID, err) | ||||||||
return nil, fmt.Errorf("could not find result for block %v: %w", blockID, err) | ||||||||
} | ||||||||
|
||||||||
chunkIDs := make([]flow.Identifier, 0, len(result.Chunks)) | ||||||||
for _, chunk := range result.Chunks { | ||||||||
chunkID := chunk.ID() | ||||||||
// remove chunk data pack | ||||||||
err := chunks.BatchRemove(chunkID, chunkBatch) | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not remove chunk id %v for block id %v: %w", chunkID, blockID, err) | ||||||||
} | ||||||||
|
||||||||
chunkIDs = append(chunkIDs, chunkID) | ||||||||
} | ||||||||
|
||||||||
// remove commits | ||||||||
err = commits.BatchRemoveByBlockID(blockID, writeBatch) | ||||||||
err = commits.BatchRemoveByBlockID(blockID, protocolDBBatch) | ||||||||
if err != nil { | ||||||||
if errors.Is(err, storage.ErrNotFound) { | ||||||||
return fmt.Errorf("could not remove by block ID %v: %w", blockID, err) | ||||||||
return nil, fmt.Errorf("could not remove by block ID %v: %w", blockID, err) | ||||||||
} | ||||||||
|
||||||||
log.Warn().Msgf("statecommitment not found for block %v", blockID) | ||||||||
} | ||||||||
|
||||||||
// remove transaction results | ||||||||
err = transactionResults.BatchRemoveByBlockID(blockID, writeBatch) | ||||||||
err = transactionResults.BatchRemoveByBlockID(blockID, protocolDBBatch) | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not remove transaction results by BlockID %v: %w", blockID, err) | ||||||||
return nil, fmt.Errorf("could not remove transaction results by BlockID %v: %w", blockID, err) | ||||||||
} | ||||||||
|
||||||||
// remove own execution results index | ||||||||
err = myReceipts.BatchRemoveIndexByBlockID(blockID, writeBatch) | ||||||||
err = myReceipts.BatchRemoveIndexByBlockID(blockID, protocolDBBatch) | ||||||||
if err != nil { | ||||||||
if !errors.Is(err, storage.ErrNotFound) { | ||||||||
return fmt.Errorf("could not remove own receipt by BlockID %v: %w", blockID, err) | ||||||||
return nil, fmt.Errorf("could not remove own receipt by BlockID %v: %w", blockID, err) | ||||||||
} | ||||||||
|
||||||||
log.Warn().Msgf("own receipt not found for block %v", blockID) | ||||||||
} | ||||||||
|
||||||||
// remove events | ||||||||
err = events.BatchRemoveByBlockID(blockID, writeBatch) | ||||||||
err = events.BatchRemoveByBlockID(blockID, protocolDBBatch) | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not remove events by BlockID %v: %w", blockID, err) | ||||||||
return nil, fmt.Errorf("could not remove events by BlockID %v: %w", blockID, err) | ||||||||
} | ||||||||
|
||||||||
// remove service events | ||||||||
err = serviceEvents.BatchRemoveByBlockID(blockID, writeBatch) | ||||||||
err = serviceEvents.BatchRemoveByBlockID(blockID, protocolDBBatch) | ||||||||
if err != nil { | ||||||||
return fmt.Errorf("could not remove service events by blockID %v: %w", blockID, err) | ||||||||
return nil, fmt.Errorf("could not remove service events by blockID %v: %w", blockID, err) | ||||||||
} | ||||||||
|
||||||||
// remove execution result index | ||||||||
err = results.BatchRemoveIndexByBlockID(blockID, writeBatch) | ||||||||
err = results.BatchRemoveIndexByBlockID(blockID, protocolDBBatch) | ||||||||
if err != nil { | ||||||||
if !errors.Is(err, storage.ErrNotFound) { | ||||||||
return fmt.Errorf("could not remove result by BlockID %v: %w", blockID, err) | ||||||||
return nil, fmt.Errorf("could not remove result by BlockID %v: %w", blockID, err) | ||||||||
} | ||||||||
|
||||||||
log.Warn().Msgf("result not found for block %v", blockID) | ||||||||
} | ||||||||
|
||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [This might be more of a suggestion for your follow up PR you mentioned above.] I would include the chunk data pack index here:
|
||||||||
return nil | ||||||||
return chunkIDs, nil | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chunkDataPacks.BatchRemove
internally also callsstoredChunkDataPacks.Remove