diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 01075ae4a4c..88437835917 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6835,6 +6835,15 @@ impl BeaconChain { .map(|duration| (next_digest_epoch, duration)) } + /// Update data column custody info with the slot at which cgc was changed. + pub fn update_data_column_custody_info(&self, slot: Option) { + self.store + .put_data_column_custody_info(slot) + .unwrap_or_else( + |e| tracing::error!(error = ?e, "Failed to update data column custody info"), + ); + } + /// This method serves to get a sense of the current chain health. It is used in block proposal /// to determine whether we should outsource payload production duties. /// diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 317b89cbdd4..15c9498e1c1 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -3,6 +3,7 @@ mod migration_schema_v23; mod migration_schema_v24; mod migration_schema_v25; mod migration_schema_v26; +mod migration_schema_v27; use crate::beacon_chain::BeaconChainTypes; use std::sync::Arc; @@ -67,6 +68,17 @@ pub fn migrate_schema( let ops = migration_schema_v26::downgrade_from_v26::(db.clone())?; db.store_schema_version_atomically(to, ops) } + (SchemaVersion(26), SchemaVersion(27)) => { + // This migration updates the blobs db. The schema version + // is bumped inside upgrade_to_v27. + migration_schema_v27::upgrade_to_v27::(db.clone()) + } + (SchemaVersion(27), SchemaVersion(26)) => { + // Downgrading is essentially a no-op and is only possible + // if peer das isn't scheduled. + migration_schema_v27::downgrade_from_v27::(db.clone())?; + db.store_schema_version_atomically(to, vec![]) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v27.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v27.rs new file mode 100644 index 00000000000..6275b1c5bea --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v27.rs @@ -0,0 +1,26 @@ +use crate::BeaconChainTypes; +use std::sync::Arc; +use store::{metadata::SchemaVersion, Error, HotColdDB}; + +/// Add `DataColumnCustodyInfo` entry to v27. +pub fn upgrade_to_v27( + db: Arc>, +) -> Result<(), Error> { + if db.spec.is_peer_das_scheduled() { + db.put_data_column_custody_info(None)?; + db.store_schema_version_atomically(SchemaVersion(27), vec![])?; + } + + Ok(()) +} + +pub fn downgrade_from_v27( + db: Arc>, +) -> Result<(), Error> { + if db.spec.is_peer_das_scheduled() { + return Err(Error::MigrationError( + "Cannot downgrade from v27 if peerDAS is scheduled".to_string(), + )); + } + Ok(()) +} diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 7dc5b18ae43..4224125a2ac 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -217,6 +217,7 @@ impl CustodyContext { new_custody_group_count: updated_cgc, sampling_count: self .num_of_custody_groups_to_sample(Some(effective_epoch), spec), + effective_epoch, }); } } @@ -287,6 +288,7 @@ impl CustodyContext { pub struct CustodyCountChanged { pub new_custody_group_count: u64, pub sampling_count: u64, + pub effective_epoch: Epoch, } /// The custody information that gets persisted across runs. diff --git a/beacon_node/beacon_chain/tests/schema_stability.rs b/beacon_node/beacon_chain/tests/schema_stability.rs index fc37a1159bc..1d12fc878e7 100644 --- a/beacon_node/beacon_chain/tests/schema_stability.rs +++ b/beacon_node/beacon_chain/tests/schema_stability.rs @@ -9,12 +9,14 @@ use operation_pool::PersistedOperationPool; use ssz::Encode; use std::sync::{Arc, LazyLock}; use store::{ - database::interface::BeaconNodeBackend, hot_cold_store::Split, metadata::DataColumnInfo, + database::interface::BeaconNodeBackend, + hot_cold_store::Split, + metadata::{DataColumnCustodyInfo, DataColumnInfo}, DBColumn, HotColdDB, StoreConfig, StoreItem, }; use strum::IntoEnumIterator; use tempfile::{tempdir, TempDir}; -use types::{ChainSpec, Hash256, Keypair, MainnetEthSpec}; +use types::{ChainSpec, Hash256, Keypair, MainnetEthSpec, Slot}; type E = MainnetEthSpec; type Store = Arc, BeaconNodeBackend>>; @@ -84,11 +86,13 @@ async fn schema_stability() { chain.persist_op_pool().unwrap(); chain.persist_custody_context().unwrap(); + insert_data_column_custody_info(&store, &harness.spec); check_db_columns(); check_metadata_sizes(&store); check_op_pool(&store); check_custody_context(&store, &harness.spec); + check_custody_info(&store, &harness.spec); check_persisted_chain(&store); // Not covered here: @@ -100,13 +104,21 @@ async fn schema_stability() { fn check_db_columns() { let current_columns: Vec<&'static str> = DBColumn::iter().map(|c| c.as_str()).collect(); let expected_columns = vec![ - "bma", "blk", "blb", "bdc", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs", "bst", - "exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr", "brm", - "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy", + "bma", "blk", "blb", "bdc", "bdi", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs", + "bst", "exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr", + "brm", "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy", ]; assert_eq!(expected_columns, current_columns); } +fn insert_data_column_custody_info(store: &Store, spec: &ChainSpec) { + if spec.is_peer_das_scheduled() { + store + .put_data_column_custody_info(Some(Slot::new(0))) + .unwrap(); + } +} + /// Check the SSZ sizes of known on-disk metadata. /// /// New types can be added here as the schema evolves. @@ -122,6 +134,7 @@ fn check_metadata_sizes(store: &Store) { } ); assert_eq!(DataColumnInfo::default().ssz_bytes_len(), 5); + assert_eq!(DataColumnCustodyInfo::default().ssz_bytes_len(), 5); } fn check_op_pool(store: &Store) { @@ -143,6 +156,15 @@ fn check_custody_context(store: &Store, spec: &ChainSpec) { } } +fn check_custody_info(store: &Store, spec: &ChainSpec) { + let data_column_custody_info = store.get_data_column_custody_info().unwrap(); + if spec.is_peer_das_scheduled() { + assert_eq!(data_column_custody_info.unwrap().as_ssz_bytes().len(), 13); + } else { + assert!(data_column_custody_info.is_none()); + } +} + fn check_persisted_chain(store: &Store) { let chain = store .get_item::(&Hash256::ZERO) diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index e9b19ee6e0f..691ec003179 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3157,7 +3157,11 @@ async fn schema_downgrade_to_min_version( ) .await; - let min_version = SchemaVersion(22); + let min_version = if spec.is_fulu_scheduled() { + SchemaVersion(27) + } else { + SchemaVersion(22) + }; // Save the slot clock so that the new harness doesn't revert in time. let slot_clock = harness.chain.slot_clock.clone(); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index cacdd4a44c5..83422090caa 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3760,7 +3760,6 @@ pub fn serve( .to_string(), )); } - Ok(()) }) }, @@ -3845,6 +3844,12 @@ pub fn serve( current_slot, &chain.spec, ) { + chain.update_data_column_custody_info(Some( + cgc_change + .effective_epoch + .start_slot(T::EthSpec::slots_per_epoch()), + )); + network_tx.send(NetworkMessage::CustodyCountChanged { new_custody_group_count: cgc_change.new_custody_group_count, sampling_count: cgc_change.sampling_count, diff --git a/beacon_node/network/src/status.rs b/beacon_node/network/src/status.rs index be0d7c063be..6c2ada447d2 100644 --- a/beacon_node/network/src/status.rs +++ b/beacon_node/network/src/status.rs @@ -29,8 +29,22 @@ pub(crate) fn status_message(beacon_chain: &BeaconChain) finalized_checkpoint.root = Hash256::zero(); } - let earliest_available_slot = beacon_chain.store.get_anchor_info().oldest_block_slot; + // NOTE: We are making an assumption that `get_data_column_custody_info` wont fail. + let earliest_available_data_column_slot = beacon_chain + .store + .get_data_column_custody_info() + .ok() + .flatten() + .and_then(|info| info.earliest_data_column_slot); + // If data_column_custody_info.earliest_data_column_slot is `None`, + // no recent cgc changes have occurred and no cgc backfill is in progress. + let earliest_available_slot = + if let Some(earliest_available_data_column_slot) = earliest_available_data_column_slot { + earliest_available_data_column_slot + } else { + beacon_chain.store.get_anchor_info().oldest_block_slot + }; StatusMessage::V2(StatusMessageV2 { fork_digest, finalized_root: finalized_checkpoint.root, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 0c230494b87..9c9374e7fe5 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -6,10 +6,10 @@ use crate::historic_state_cache::HistoricStateCache; use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::memory_store::MemoryStore; use crate::metadata::{ - AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, SchemaVersion, ANCHOR_INFO_KEY, - ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, - CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, - STATE_UPPER_LIMIT_NO_RETAIN, + AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnCustodyInfo, DataColumnInfo, + SchemaVersion, ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, + CONFIG_KEY, CURRENT_SCHEMA_VERSION, DATA_COLUMN_CUSTODY_INFO_KEY, DATA_COLUMN_INFO_KEY, + SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, }; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ @@ -91,6 +91,7 @@ struct BlockCache { block_cache: LruCache>, blob_cache: LruCache>, data_column_cache: LruCache>>>, + data_column_custody_info_cache: Option, } impl BlockCache { @@ -99,6 +100,7 @@ impl BlockCache { block_cache: LruCache::new(size), blob_cache: LruCache::new(size), data_column_cache: LruCache::new(size), + data_column_custody_info_cache: None, } } pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock) { @@ -112,6 +114,12 @@ impl BlockCache { .get_or_insert_mut(block_root, Default::default) .insert(data_column.index, data_column); } + pub fn put_data_column_custody_info( + &mut self, + data_column_custody_info: Option, + ) { + self.data_column_custody_info_cache = data_column_custody_info; + } pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock> { self.block_cache.get(block_root) } @@ -129,6 +137,9 @@ impl BlockCache { .get(block_root) .and_then(|map| map.get(column_index).cloned()) } + pub fn get_data_column_custody_info(&self) -> Option { + self.data_column_custody_info_cache.clone() + } pub fn delete_block(&mut self, block_root: &Hash256) { let _ = self.block_cache.pop(block_root); } @@ -922,6 +933,24 @@ impl, Cold: ItemStore> HotColdDB )); } + pub fn put_data_column_custody_info( + &self, + earliest_data_column_slot: Option, + ) -> Result<(), Error> { + let data_column_custody_info = DataColumnCustodyInfo { + earliest_data_column_slot, + }; + + self.blobs_db + .put(&DATA_COLUMN_CUSTODY_INFO_KEY, &data_column_custody_info)?; + + self.block_cache + .lock() + .put_data_column_custody_info(Some(data_column_custody_info)); + + Ok(()) + } + pub fn put_data_columns( &self, block_root: &Hash256, @@ -2389,6 +2418,27 @@ impl, Cold: ItemStore> HotColdDB }) } + /// Fetch custody info from the cache. + /// If custody info doesn't exist in the cache, + /// try to fetch from the DB and prime the cache. + pub fn get_data_column_custody_info(&self) -> Result, Error> { + let Some(data_column_custody_info) = self.block_cache.lock().get_data_column_custody_info() + else { + let data_column_custody_info = self + .blobs_db + .get::(&DATA_COLUMN_CUSTODY_INFO_KEY)?; + + // Update the cache + self.block_cache + .lock() + .put_data_column_custody_info(data_column_custody_info.clone()); + + return Ok(data_column_custody_info); + }; + + Ok(Some(data_column_custody_info)) + } + /// Fetch all columns for a given block from the store. pub fn get_data_columns( &self, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index e996b47b723..a3d4e4a8cea 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -264,6 +264,8 @@ pub enum DBColumn { BeaconBlob, #[strum(serialize = "bdc")] BeaconDataColumn, + #[strum(serialize = "bdi")] + BeaconDataColumnCustodyInfo, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). /// /// DEPRECATED. @@ -424,6 +426,7 @@ impl DBColumn { | Self::CustodyContext | Self::OptimisticTransitionBlock => 32, Self::BeaconBlockRoots + | Self::BeaconDataColumnCustodyInfo | Self::BeaconBlockRootsChunked | Self::BeaconStateRoots | Self::BeaconStateRootsChunked diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 39a46451fcb..b6091087efc 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(26); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(27); // All the keys that get stored under the `BeaconMeta` column. // @@ -18,6 +18,7 @@ pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4); pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5); pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6); pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7); +pub const DATA_COLUMN_CUSTODY_INFO_KEY: Hash256 = Hash256::repeat_byte(8); /// State upper limit value used to indicate that a node is not storing historic states. pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX); @@ -204,6 +205,30 @@ impl StoreItem for BlobInfo { } } +/// Database parameter relevant to data column custody sync. There is only at most a single +/// `DataColumnCustodyInfo` stored in the db. `earliest_data_column_slot` is updated when cgc +/// count changes and is updated incrementally during data column custody backfill. Once custody backfill +/// is complete `earliest_data_column_slot` is set to `None`. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] +pub struct DataColumnCustodyInfo { + /// The earliest slot for which data columns are available. + pub earliest_data_column_slot: Option, +} + +impl StoreItem for DataColumnCustodyInfo { + fn db_column() -> DBColumn { + DBColumn::BeaconDataColumnCustodyInfo + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(DataColumnCustodyInfo::from_ssz_bytes(bytes)?) + } +} + /// Database parameters relevant to data column sync. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] pub struct DataColumnInfo {