Skip to content

Commit db8b6be

Browse files
authored
Data column custody info (#7648)
#7647 Introduces a new record in the blobs db `DataColumnCustodyInfo` When `DataColumnCustodyInfo` exists in the db this indicates that a recent cgc change has occurred and/or that a custody backfill sync is currently in progress (custody backfill will be added as a separate PR). When a cgc change has occurred `earliest_available_slot` will be equal to the slot at which the cgc change occured. During custody backfill sync`earliest_available_slot` should be updated incrementally as it progresses. ~~Note that if `advertise_false_custody_group_count` is enabled we do not add a `DataColumnCustodyInfo` record in the db as that would affect the status v2 response.~~ (See comment #7648 (comment)) ~~If `DataColumnCustodyInfo` doesn't exist in the db this indicates that we have fulfilled our custody requirements up to the DA window.~~ (It now always exist, and the slot will be set to `None` once backfill is complete) StatusV2 now uses `DataColumnCustodyInfo` to calculate the `earliest_available_slot` if a `DataColumnCustodyInfo` record exists in the db, if it's `None`, then we return the `oldest_block_slot`.
1 parent b48879a commit db8b6be

File tree

11 files changed

+185
-13
lines changed

11 files changed

+185
-13
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6807,6 +6807,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
68076807
.map(|duration| (next_digest_epoch, duration))
68086808
}
68096809

6810+
/// Update data column custody info with the slot at which cgc was changed.
6811+
pub fn update_data_column_custody_info(&self, slot: Option<Slot>) {
6812+
self.store
6813+
.put_data_column_custody_info(slot)
6814+
.unwrap_or_else(
6815+
|e| tracing::error!(error = ?e, "Failed to update data column custody info"),
6816+
);
6817+
}
6818+
68106819
/// This method serves to get a sense of the current chain health. It is used in block proposal
68116820
/// to determine whether we should outsource payload production duties.
68126821
///

beacon_node/beacon_chain/src/schema_change.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod migration_schema_v23;
33
mod migration_schema_v24;
44
mod migration_schema_v25;
55
mod migration_schema_v26;
6+
mod migration_schema_v27;
67

78
use crate::beacon_chain::BeaconChainTypes;
89
use std::sync::Arc;
@@ -67,6 +68,17 @@ pub fn migrate_schema<T: BeaconChainTypes>(
6768
let ops = migration_schema_v26::downgrade_from_v26::<T>(db.clone())?;
6869
db.store_schema_version_atomically(to, ops)
6970
}
71+
(SchemaVersion(26), SchemaVersion(27)) => {
72+
// This migration updates the blobs db. The schema version
73+
// is bumped inside upgrade_to_v27.
74+
migration_schema_v27::upgrade_to_v27::<T>(db.clone())
75+
}
76+
(SchemaVersion(27), SchemaVersion(26)) => {
77+
// Downgrading is essentially a no-op and is only possible
78+
// if peer das isn't scheduled.
79+
migration_schema_v27::downgrade_from_v27::<T>(db.clone())?;
80+
db.store_schema_version_atomically(to, vec![])
81+
}
7082
// Anything else is an error.
7183
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
7284
target_version: to,
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use crate::BeaconChainTypes;
2+
use std::sync::Arc;
3+
use store::{metadata::SchemaVersion, Error, HotColdDB};
4+
5+
/// Add `DataColumnCustodyInfo` entry to v27.
6+
pub fn upgrade_to_v27<T: BeaconChainTypes>(
7+
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
8+
) -> Result<(), Error> {
9+
if db.spec.is_peer_das_scheduled() {
10+
db.put_data_column_custody_info(None)?;
11+
db.store_schema_version_atomically(SchemaVersion(27), vec![])?;
12+
}
13+
14+
Ok(())
15+
}
16+
17+
pub fn downgrade_from_v27<T: BeaconChainTypes>(
18+
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
19+
) -> Result<(), Error> {
20+
if db.spec.is_peer_das_scheduled() {
21+
return Err(Error::MigrationError(
22+
"Cannot downgrade from v27 if peerDAS is scheduled".to_string(),
23+
));
24+
}
25+
Ok(())
26+
}

beacon_node/beacon_chain/src/validator_custody.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ impl CustodyContext {
217217
new_custody_group_count: updated_cgc,
218218
sampling_count: self
219219
.num_of_custody_groups_to_sample(Some(effective_epoch), spec),
220+
effective_epoch,
220221
});
221222
}
222223
}
@@ -287,6 +288,7 @@ impl CustodyContext {
287288
pub struct CustodyCountChanged {
288289
pub new_custody_group_count: u64,
289290
pub sampling_count: u64,
291+
pub effective_epoch: Epoch,
290292
}
291293

292294
/// The custody information that gets persisted across runs.

beacon_node/beacon_chain/tests/schema_stability.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ use operation_pool::PersistedOperationPool;
99
use ssz::Encode;
1010
use std::sync::{Arc, LazyLock};
1111
use store::{
12-
database::interface::BeaconNodeBackend, hot_cold_store::Split, metadata::DataColumnInfo,
12+
database::interface::BeaconNodeBackend,
13+
hot_cold_store::Split,
14+
metadata::{DataColumnCustodyInfo, DataColumnInfo},
1315
DBColumn, HotColdDB, StoreConfig, StoreItem,
1416
};
1517
use strum::IntoEnumIterator;
1618
use tempfile::{tempdir, TempDir};
17-
use types::{ChainSpec, Hash256, Keypair, MainnetEthSpec};
19+
use types::{ChainSpec, Hash256, Keypair, MainnetEthSpec, Slot};
1820

1921
type E = MainnetEthSpec;
2022
type Store<E> = Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>;
@@ -84,11 +86,13 @@ async fn schema_stability() {
8486

8587
chain.persist_op_pool().unwrap();
8688
chain.persist_custody_context().unwrap();
89+
insert_data_column_custody_info(&store, &harness.spec);
8790

8891
check_db_columns();
8992
check_metadata_sizes(&store);
9093
check_op_pool(&store);
9194
check_custody_context(&store, &harness.spec);
95+
check_custody_info(&store, &harness.spec);
9296
check_persisted_chain(&store);
9397

9498
// Not covered here:
@@ -100,13 +104,21 @@ async fn schema_stability() {
100104
fn check_db_columns() {
101105
let current_columns: Vec<&'static str> = DBColumn::iter().map(|c| c.as_str()).collect();
102106
let expected_columns = vec![
103-
"bma", "blk", "blb", "bdc", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs", "bst",
104-
"exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr", "brm",
105-
"dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
107+
"bma", "blk", "blb", "bdc", "bdi", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs",
108+
"bst", "exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr",
109+
"brm", "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
106110
];
107111
assert_eq!(expected_columns, current_columns);
108112
}
109113

114+
fn insert_data_column_custody_info(store: &Store<E>, spec: &ChainSpec) {
115+
if spec.is_peer_das_scheduled() {
116+
store
117+
.put_data_column_custody_info(Some(Slot::new(0)))
118+
.unwrap();
119+
}
120+
}
121+
110122
/// Check the SSZ sizes of known on-disk metadata.
111123
///
112124
/// New types can be added here as the schema evolves.
@@ -122,6 +134,7 @@ fn check_metadata_sizes(store: &Store<E>) {
122134
}
123135
);
124136
assert_eq!(DataColumnInfo::default().ssz_bytes_len(), 5);
137+
assert_eq!(DataColumnCustodyInfo::default().ssz_bytes_len(), 5);
125138
}
126139

127140
fn check_op_pool(store: &Store<E>) {
@@ -143,6 +156,15 @@ fn check_custody_context(store: &Store<E>, spec: &ChainSpec) {
143156
}
144157
}
145158

159+
fn check_custody_info(store: &Store<E>, spec: &ChainSpec) {
160+
let data_column_custody_info = store.get_data_column_custody_info().unwrap();
161+
if spec.is_peer_das_scheduled() {
162+
assert_eq!(data_column_custody_info.unwrap().as_ssz_bytes().len(), 13);
163+
} else {
164+
assert!(data_column_custody_info.is_none());
165+
}
166+
}
167+
146168
fn check_persisted_chain(store: &Store<E>) {
147169
let chain = store
148170
.get_item::<PersistedBeaconChain>(&Hash256::ZERO)

beacon_node/beacon_chain/tests/store_tests.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3157,7 +3157,11 @@ async fn schema_downgrade_to_min_version(
31573157
)
31583158
.await;
31593159

3160-
let min_version = SchemaVersion(22);
3160+
let min_version = if spec.is_fulu_scheduled() {
3161+
SchemaVersion(27)
3162+
} else {
3163+
SchemaVersion(22)
3164+
};
31613165

31623166
// Save the slot clock so that the new harness doesn't revert in time.
31633167
let slot_clock = harness.chain.slot_clock.clone();

beacon_node/http_api/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3760,7 +3760,6 @@ pub fn serve<T: BeaconChainTypes>(
37603760
.to_string(),
37613761
));
37623762
}
3763-
37643763
Ok(())
37653764
})
37663765
},
@@ -3845,6 +3844,12 @@ pub fn serve<T: BeaconChainTypes>(
38453844
current_slot,
38463845
&chain.spec,
38473846
) {
3847+
chain.update_data_column_custody_info(Some(
3848+
cgc_change
3849+
.effective_epoch
3850+
.start_slot(T::EthSpec::slots_per_epoch()),
3851+
));
3852+
38483853
network_tx.send(NetworkMessage::CustodyCountChanged {
38493854
new_custody_group_count: cgc_change.new_custody_group_count,
38503855
sampling_count: cgc_change.sampling_count,

beacon_node/network/src/status.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,22 @@ pub(crate) fn status_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>)
2929
finalized_checkpoint.root = Hash256::zero();
3030
}
3131

32-
let earliest_available_slot = beacon_chain.store.get_anchor_info().oldest_block_slot;
32+
// NOTE: We are making an assumption that `get_data_column_custody_info` wont fail.
33+
let earliest_available_data_column_slot = beacon_chain
34+
.store
35+
.get_data_column_custody_info()
36+
.ok()
37+
.flatten()
38+
.and_then(|info| info.earliest_data_column_slot);
3339

40+
// If data_column_custody_info.earliest_data_column_slot is `None`,
41+
// no recent cgc changes have occurred and no cgc backfill is in progress.
42+
let earliest_available_slot =
43+
if let Some(earliest_available_data_column_slot) = earliest_available_data_column_slot {
44+
earliest_available_data_column_slot
45+
} else {
46+
beacon_chain.store.get_anchor_info().oldest_block_slot
47+
};
3448
StatusMessage::V2(StatusMessageV2 {
3549
fork_digest,
3650
finalized_root: finalized_checkpoint.root,

beacon_node/store/src/hot_cold_store.rs

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use crate::historic_state_cache::HistoricStateCache;
66
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
77
use crate::memory_store::MemoryStore;
88
use crate::metadata::{
9-
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, SchemaVersion, ANCHOR_INFO_KEY,
10-
ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY,
11-
CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
12-
STATE_UPPER_LIMIT_NO_RETAIN,
9+
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnCustodyInfo, DataColumnInfo,
10+
SchemaVersion, ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY,
11+
CONFIG_KEY, CURRENT_SCHEMA_VERSION, DATA_COLUMN_CUSTODY_INFO_KEY, DATA_COLUMN_INFO_KEY,
12+
SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
1313
};
1414
use crate::state_cache::{PutStateOutcome, StateCache};
1515
use crate::{
@@ -91,6 +91,7 @@ struct BlockCache<E: EthSpec> {
9191
block_cache: LruCache<Hash256, SignedBeaconBlock<E>>,
9292
blob_cache: LruCache<Hash256, BlobSidecarList<E>>,
9393
data_column_cache: LruCache<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>,
94+
data_column_custody_info_cache: Option<DataColumnCustodyInfo>,
9495
}
9596

9697
impl<E: EthSpec> BlockCache<E> {
@@ -99,6 +100,7 @@ impl<E: EthSpec> BlockCache<E> {
99100
block_cache: LruCache::new(size),
100101
blob_cache: LruCache::new(size),
101102
data_column_cache: LruCache::new(size),
103+
data_column_custody_info_cache: None,
102104
}
103105
}
104106
pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock<E>) {
@@ -112,6 +114,12 @@ impl<E: EthSpec> BlockCache<E> {
112114
.get_or_insert_mut(block_root, Default::default)
113115
.insert(data_column.index, data_column);
114116
}
117+
pub fn put_data_column_custody_info(
118+
&mut self,
119+
data_column_custody_info: Option<DataColumnCustodyInfo>,
120+
) {
121+
self.data_column_custody_info_cache = data_column_custody_info;
122+
}
115123
pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock<E>> {
116124
self.block_cache.get(block_root)
117125
}
@@ -129,6 +137,9 @@ impl<E: EthSpec> BlockCache<E> {
129137
.get(block_root)
130138
.and_then(|map| map.get(column_index).cloned())
131139
}
140+
pub fn get_data_column_custody_info(&self) -> Option<DataColumnCustodyInfo> {
141+
self.data_column_custody_info_cache.clone()
142+
}
132143
pub fn delete_block(&mut self, block_root: &Hash256) {
133144
let _ = self.block_cache.pop(block_root);
134145
}
@@ -922,6 +933,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
922933
));
923934
}
924935

936+
pub fn put_data_column_custody_info(
937+
&self,
938+
earliest_data_column_slot: Option<Slot>,
939+
) -> Result<(), Error> {
940+
let data_column_custody_info = DataColumnCustodyInfo {
941+
earliest_data_column_slot,
942+
};
943+
944+
self.blobs_db
945+
.put(&DATA_COLUMN_CUSTODY_INFO_KEY, &data_column_custody_info)?;
946+
947+
self.block_cache
948+
.lock()
949+
.put_data_column_custody_info(Some(data_column_custody_info));
950+
951+
Ok(())
952+
}
953+
925954
pub fn put_data_columns(
926955
&self,
927956
block_root: &Hash256,
@@ -2389,6 +2418,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
23892418
})
23902419
}
23912420

2421+
/// Fetch custody info from the cache.
2422+
/// If custody info doesn't exist in the cache,
2423+
/// try to fetch from the DB and prime the cache.
2424+
pub fn get_data_column_custody_info(&self) -> Result<Option<DataColumnCustodyInfo>, Error> {
2425+
let Some(data_column_custody_info) = self.block_cache.lock().get_data_column_custody_info()
2426+
else {
2427+
let data_column_custody_info = self
2428+
.blobs_db
2429+
.get::<DataColumnCustodyInfo>(&DATA_COLUMN_CUSTODY_INFO_KEY)?;
2430+
2431+
// Update the cache
2432+
self.block_cache
2433+
.lock()
2434+
.put_data_column_custody_info(data_column_custody_info.clone());
2435+
2436+
return Ok(data_column_custody_info);
2437+
};
2438+
2439+
Ok(Some(data_column_custody_info))
2440+
}
2441+
23922442
/// Fetch all columns for a given block from the store.
23932443
pub fn get_data_columns(
23942444
&self,

beacon_node/store/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,8 @@ pub enum DBColumn {
264264
BeaconBlob,
265265
#[strum(serialize = "bdc")]
266266
BeaconDataColumn,
267+
#[strum(serialize = "bdi")]
268+
BeaconDataColumnCustodyInfo,
267269
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
268270
///
269271
/// DEPRECATED.
@@ -424,6 +426,7 @@ impl DBColumn {
424426
| Self::CustodyContext
425427
| Self::OptimisticTransitionBlock => 32,
426428
Self::BeaconBlockRoots
429+
| Self::BeaconDataColumnCustodyInfo
427430
| Self::BeaconBlockRootsChunked
428431
| Self::BeaconStateRoots
429432
| Self::BeaconStateRootsChunked

0 commit comments

Comments
 (0)