-
Notifications
You must be signed in to change notification settings - Fork 197
[Storage] Refactor stored chunk data pack #7983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…stored-chunk-data-pack
e3a3b6b
to
d22661a
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
} | ||
|
||
if len(chunkIDs) > 0 { | ||
err := chunks.BatchRemove(chunkIDs, writeBatch, chunkBatch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BatchRemove now takes two batch, one for batch operation on protocol db, the other on chunk data pack db.
} | ||
|
||
// SaveExecutionResults saves all data related to the execution of a block. | ||
// It is concurrent-safe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is concurrent safe because chunk data packs store is conflict-free (storing data by hash), and protocol data requires a lock to store, which will be synchronized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// It is concurrent-safe | |
// It is concurrent safe because chunk data packs store is conflict-free (storing data by hash), and protocol data requires a lock to store, which will be synchronized. |
// | ||
// The return value of this function determines whether the chunk request can be dispatched to the network. | ||
type RequestQualifierFunc func(attempts uint64, lastRequested time.Time, retryAfter time.Duration) bool | ||
type RequestQualifierFunc func(attempts uint64, lastRequested time.Time, retryAfter time.Duration) (bool, string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is an minor improvement for debugging why chunk data pack is not received on VN.
// prepare data | ||
cdps := make([]*flow.ChunkDataPack, 0, blockCount) | ||
// prepare data - create execution receipts for blocks | ||
receipts := make([]*flow.ExecutionReceipt, 0, blockCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously this tests was using chunk data pack for testing the iterator (pruner), but now the chunk data pack is more complex to store, I switched it to execution receipt, which is simpler, because it is also indexed by block ID.
|
||
// remove all chunk data packs in a single batch operation | ||
if len(chunkIDs) > 0 { | ||
err := p.chunkDataPacks.BatchRemoveStoredChunkDataPacksOnly(chunkIDs, batchWriter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chunk data pack pruner will only prune the store chunk data pack in the chunk data pack database, it won't remove the chunkID and CDP ID mapping in the protocol database.
|
||
// legacy codes (should be cleaned up) | ||
codeChunkDataPack = 100 | ||
_ = 100 // codeChunkDataPack (100) is deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are deprecating the old table, it's still backward compatible if we roll out carefully.
We need to roll out to ENs one by one, so that an updated node won't be required for the old chunks since they are already been sealed by other ENs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under the same assumptions that this is backward-compatible, I think modifying the StoredChunkDataPack.ID()
function would also be backward compatible. In either case, we are changing the way chunk data packs are stored such that we cannot retrieve pre-upgrade chunk data packs.
codeChunkDataPackID = 112 | ||
codeStoredChunkDataPack = 113 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
112 and 113 are the new tables to replace the deprecated 100 (codeChunkDataPack)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I focused my first first review pass on the core logic. I think there is a good amount of comments and suggestions for a first round of revisions, after which I would continue with the review.
My impression is that there is a subtle foot gun and the possibility to significantly simplify the ID computation of Chunk Data Pack and StoredChunkDataPack
(specifically make their ID consistent).
// StoredChunkDataPack is an in-storage representation of chunk data pack. | ||
// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining | ||
// the collection on a secondary storage. | ||
type StoredChunkDataPack struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think conceptually, this represents data that the EN made external commitments to, so it should not be modified (?) Hence, we might want to declare it as an immutable object for out linter to protect it:
Line 183 in 469d93b
//structwrite:immutable - mutations allowed only within the constructor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I think both this and ChunkDataPack
should be marked immutable
CollectionID flow.Identifier | ||
SystemChunk bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would change this to a pointer *flow.Identifier
. Mirrors the behaviour of ChunkDataPack.Collection
which is nil. Alternatively, you could use flow.ZeroID
to represent unset specifically, i.e. system chunk.
func (c StoredChunkDataPack) ID() flow.Identifier { | ||
return flow.MakeID(c) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, we are introducing redundant identifiers here, that hash exactly the same data differently. I think that is confusing.
Suggestion:
-
when computing the ID of a regular Chunk Data Pack, we should compute the collection ID first and hash that, instead of the collection's full object structure:
Lines 206 to 209 in af1d4da
// ID returns a collision-resistant hash of the ChunkDataPack struct. func (c *ChunkDataPack) ID() Identifier { return MakeID(c) } Line 267 in 469d93b
LastViewTCID: h.LastViewTC.ID(), As a result, no matter whether you have the data structure with the full collection, or the reduced representation with only the collection hash/ID, in both cases you have the same ID for the overall chunk data pack.
In a way, we are saying: it represents the same data, so it should have the same hash.
I think there is a variety of ways how we can implement this. I think the following would be very clean:
-
we could introduce a common data structure, "ChunkDataPackHeader"
type ChunkDataPackHeader struct { ChunkID Identifier // ID of the chunk this data pack is for StartState StateCommitment // commitment for starting state ProofHash Identifier // proof for all registers touched (read or written) during the chunk execution CollectionID Identifier // collection executed in this chunk; nil for system chunk // ExecutionDataRoot is the root data structure of an execution_data.BlockExecutionData. // It contains the necessary information for a verification node to validate that the // BlockExecutionData produced is valid. ExecutionDataRoot BlockExecutionDataRoot }
ProofHash
andCollectionID
have different types compared toProof
andCollection
in the original chunk data pack.ChunkDataPack.ID
would then compute the ID viaChunkDataPackHeader.ID()
Similarly,
StoredChunkDataPack
could compute It's ID also viaChunkDataPackHeader
. Thereby, it would be totally clear that both representation generate the same ID (ideally we also have a couple tests).
@jordanschalm would love your thoughts here, since I feel my proposal is very close to the malleability approach / design (uniform pattern throughout the code base)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree with making ChunkDataPack
and StoredChunkDataPack
have the same ID.
We just need to be careful that changing the ID definition here will be safe to deploy as a rolling upgrade, since Testnet has already been sporked.
// Equals compares two StoredChunkDataPack for equality. | ||
// The second return value is a string describing the first found mismatch, or empty if they are equal. | ||
func (c StoredChunkDataPack) Equals(other StoredChunkDataPack) (bool, string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this an odd method signature. How about
// Equals compares two StoredChunkDataPack for equality. | |
// The second return value is a string describing the first found mismatch, or empty if they are equal. | |
func (c StoredChunkDataPack) Equals(other StoredChunkDataPack) (bool, string) { | |
// EnsureEquals compares two StoredChunkDataPack for equality. It returns a [storage.ErrDataMismatch] if | |
// and only if they are not equal. | |
func (c StoredChunkDataPack) EnsureEquals(other StoredChunkDataPack) error { |
return sc | ||
} | ||
|
||
func ToStoredChunkDataPacks(cs []*flow.ChunkDataPack) []*StoredChunkDataPack { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func ToStoredChunkDataPacks(cs []*flow.ChunkDataPack) []*StoredChunkDataPack { | |
func ToStoredChunkDataPacks(cs []*flow.ChunkDataPack) []*StoredChunkDataPack {// ToStoredChunkDataPack converts the given ChunkDataPacks to their reduced representation, | |
// by replacing the full collection data with its ID (collision-resistant hash). | |
func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack { |
return nil, fmt.Errorf("cannot retrieve stored chunk data pack ID for chunk %x: %w", chunkID, err) | ||
} | ||
|
||
// Then retrieve the actual stored chunk data pack using the ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Then retrieve the actual stored chunk data pack using the ID | |
// Then retrieve the reduced representation of the chunk data pack via its ID. |
return nil, fmt.Errorf("cannot retrieve stored chunk data pack %x for chunk %x: %w", storedChunkDataPackID, chunkID, err) | ||
} | ||
|
||
chdp := &flow.ChunkDataPack{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please prefer to use the constructor. Those are really helpful for enforcing immutability and basic data consistency (a lot of constructors with basic data consistency checks have been added during the malleability work).
Suggestion:
- first inspect the
StoredChunkDataPack
whether it represents the system chunk - if not system chunk, retrieve the collection data
- construct
flow.ChunkDataPack
(you probably have to go through aUntrustedChunkDataPack
). Sorry, its a bit more code but helps to guarantee immutability throughout the code base.
fyi @jordanschalm for thoughts and recommendations.
|
||
cache := newCache(collector, metrics.ResourceStoredChunkDataPack, | ||
withLimit[flow.Identifier, *storage.StoredChunkDataPack](byIDCacheSize), | ||
withStore(operation.InsertStoredChunkDataPack), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withStore(operation.InsertStoredChunkDataPack), | |
withStore(operation.InsertStoredChunkDataPack), | |
withRemove[flow.Identifier, *storage.StoredChunkDataPack](operation.RemoveStoredChunkDataPack), |
and then you can implement BatchRemove
below as
func (ch *StoredChunkDataPacks) BatchRemove(ids []flow.Identifier, rw storage.ReaderBatchWriter) error {
for _, id := range ids {
err := ch.byIDCache.RemoveTx(rw, id)
if err != nil {
return fmt.Errorf("cannot remove chunk data pack: %w", err)
}
}
return nil
}
(the logic you implemented in batchRemove
is essentially already integrated into the cache)
// BatchRemove removes multiple StoredChunkDataPacks cs keyed by their IDs in a batch using the provided | ||
// No error returns are expected during normal operation, even if none of the referenced objects exist in storage. | ||
func (ch *StoredChunkDataPacks) BatchRemove(ids []flow.Identifier, rw storage.ReaderBatchWriter) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ ❓
I am not sure if I am understanding this correctly. I am concerned that this function is extremely error prone. My thinking:
- we have the chunk data pack database (which I like to call "execution database")
- and then we have the protocol database
here, we assume that ReaderBatchWriter
is for the "execution database".
Here are the two usages of this function:
flow-go/storage/store/chunk_data_packs.go
Line 158 in 8dbfdc1
err := ch.stored.BatchRemove(storedChunkDataPackIDs, chunkDataPackDBBatch)
I think the usage here is correct.flow-go/storage/store/chunk_data_packs.go
Line 200 in 8dbfdc1
err := ch.stored.BatchRemove(storedChunkDataPackIDs, chunkDataPackDBBatch)
But here, I think theReaderBatchWriter
instance it is receiving is for the protocol database (well, it should be, despite the misleading name).
I hope this illustrates that the instance of ReaderBatchWriter
is very easily mixed up.
My gut feeling:
- I think
StoredChunkDataPacks
can operate entirely withoutReaderBatchWriter
. Reasoning:- we anyway can't guarantee atomic write across two different data bases. So we might as well only focus on atomic writes for the protocol database.
- For storing chunks in the "execution database" we just use operations that do their own write batch internally, but to the outside, no write batches are exposed.
ch.byChunkIDCache.Remove(chunkID) | ||
}) | ||
return operation.RemoveChunkDataPack(rw.Writer(), chunkID) | ||
func (ch *ChunkDataPacks) BatchRemoveStoredChunkDataPacksOnly(chunkIDs []flow.Identifier, chunkDataPackDBBatch storage.ReaderBatchWriter) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️
I think this usage here of chunkDataPackDBBatch
is incorrect. If I am not mistaken, we are using the same batch to operate on the protocol database (here) and the chunk data pack database (here).
In my opinion, it would be doable to have StoredChunkDataPacks
work without ReaderBatchWriter
instances (see my previous comment). Then, for ChunkDataPacks
, you would only need a single storage.ReaderBatchWriter
instance for the protocol database (applies also to ChunkDataPacks.BatchRemove
),
} | ||
|
||
if len(chunkIDs) > 0 { | ||
err := chunks.BatchRemove(chunkIDs, writeBatch, chunkBatch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err := chunks.BatchRemove(chunkIDs, writeBatch, chunkBatch) | |
err := chunks.BatchRemove(chunkIDs, protocolDBatch, chunkDBBatch) |
} | ||
|
||
// SaveExecutionResults saves all data related to the execution of a block. | ||
// It is concurrent-safe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// It is concurrent-safe | |
// It is concurrent safe because chunk data packs store is conflict-free (storing data by hash), and protocol data requires a lock to store, which will be synchronized. |
// | ||
// The return value of this function determines whether the chunk request can be dispatched to the network. | ||
type RequestQualifierFunc func(attempts uint64, lastRequested time.Time, retryAfter time.Duration) bool | ||
type RequestQualifierFunc func(attempts uint64, lastRequested time.Time, retryAfter time.Duration) (bool, string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type RequestQualifierFunc func(attempts uint64, lastRequested time.Time, retryAfter time.Duration) (bool, string) | |
type RequestQualifierFunc func(attempts uint64, lastRequested time.Time, retryAfter time.Duration) (requestQualifies bool, reasonMsg string) |
|
||
// canDispatchRequest returns whether chunk data request for this chunk ID can be dispatched. | ||
func (e *Engine) canDispatchRequest(chunkID flow.Identifier) bool { | ||
func (e *Engine) canDispatchRequest(chunkID flow.Identifier) (bool, string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (e *Engine) canDispatchRequest(chunkID flow.Identifier) (bool, string) { | |
func (e *Engine) canDispatchRequest(chunkID flow.Identifier) (canDispatch bool, reasonMsg string) { |
ResourceChunkDataPack = "chunk_data_pack" // execution node | ||
ResourceStoredChunkDataPack = "chunk_data_pack_stored" // execution node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ResourceChunkDataPack = "chunk_data_pack" // execution node | |
ResourceStoredChunkDataPack = "chunk_data_pack_stored" // execution node | |
ResourceChunkIDToChunkDataPackIndex = "chunk_data_pack_index" // execution node | |
ResourceChunkDataPack = "chunk_data_pack" // execution node |
I would invert the naming here
// 1. First phase: Store chunk data packs (StoredChunkDataPack) by its hash (storedChunkDataPackID) in chunk data pack database. | ||
// 2. Second phase: Create index mappings from ChunkID to storedChunkDataPackID in protocol database |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// 1. First phase: Store chunk data packs (StoredChunkDataPack) by its hash (storedChunkDataPackID) in chunk data pack database. | |
// 2. Second phase: Create index mappings from ChunkID to storedChunkDataPackID in protocol database | |
// 1. First phase: Store chunk data packs (StoredChunkDataPack) by its hash (storedChunkDataPackID) in chunk data pack database. | |
// This phase occurs immediately when Store is called | |
// 2. Second phase: Create index mappings from ChunkID to storedChunkDataPackID in protocol database | |
// This phase is deferred until the caller of Store invokes the returned functor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[copied from implementation doc]
// The two-phase approach ensures that: | ||
// - Chunk data pack content is stored atomically in the chunk data pack database | ||
// - Index mappings are created within the same atomic batch update in protocol database |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the most important aspect to understand about atomicity in this function is that the two phases are NOT committed atomically w.r.t. one another. I think this section is trying to say that each write batch independently is committed atomically, but I'm worried it makes it sound like the whole operation is atomic, especially "within the same atomic batch".
// The two-phase approach ensures that: | |
// - Chunk data pack content is stored atomically in the chunk data pack database | |
// - Index mappings are created within the same atomic batch update in protocol database | |
// CAUTION: Chunk data pack content is stored immediately, and index mappings are stored later. | |
// This means that the operations performed by Store as a whole are NOT atomic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[copied from implemention godoc]
// StoredChunkDataPack is an in-storage representation of chunk data pack. | ||
// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining | ||
// the collection on a secondary storage. | ||
type StoredChunkDataPack struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I think both this and ChunkDataPack
should be marked immutable
} | ||
|
||
return storage.WithLock(s.lockManager, storage.LockInsertOwnReceipt, func(lctx lockctx.Context) error { | ||
// The batch update writes all execution result data in a single atomic operation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// The batch update writes all execution result data in a single atomic operation. | |
// The batch update writes all execution result data (except chunk data pack!) atomically. |
I think the most important detail to get across in the docs is that this operation as a whole is NOT atomic.
codeChunkDataPackID = 112 | ||
codeStoredChunkDataPack = 113 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
codeChunkDataPackID = 112 | |
codeStoredChunkDataPack = 113 | |
codeIndexChunkDataPackByChunkID = 112 | |
codeChunkDataPack = 113 |
Addressing #7939 (comment)
This PR:
StoredChunkDataPacks
store and refactorsNewChunkDataPacks
to depend on it, wiring it through node startup and CLI tools (execution builder, read-badger, rollback cmd). This splits storage of chunk packs from the protocol DB.ChunkID→StoredChunkDataPack.ID
mapping inside the protocol DB batch; only LockInsertOwnReceipt is held. Improves atomicity and clarifies failure modes.