Skip to content

Data column custody info #7648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6835,6 +6835,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|duration| (next_digest_epoch, duration))
}

/// Update data column custody info with the slot at which cgc was changed.
pub fn update_data_column_custody_info(&self, slot: Option<Slot>) {
self.store
.put_data_column_custody_info(slot)
.unwrap_or_else(
|e| tracing::error!(error = ?e, "Failed to update data column custody info"),
);
}

/// This method serves to get a sense of the current chain health. It is used in block proposal
/// to determine whether we should outsource payload production duties.
///
Expand Down
12 changes: 12 additions & 0 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod migration_schema_v23;
mod migration_schema_v24;
mod migration_schema_v25;
mod migration_schema_v26;
mod migration_schema_v27;

use crate::beacon_chain::BeaconChainTypes;
use std::sync::Arc;
Expand Down Expand Up @@ -67,6 +68,17 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v26::downgrade_from_v26::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(26), SchemaVersion(27)) => {
// This migration updates the blobs db. The schema version
// is bumped inside upgrade_to_v27.
migration_schema_v27::upgrade_to_v27::<T>(db.clone())
}
(SchemaVersion(27), SchemaVersion(26)) => {
// Downgrading is essentially a no-op and is only possible
// if peer das isn't scheduled.
migration_schema_v27::downgrade_from_v27::<T>(db.clone())?;
db.store_schema_version_atomically(to, vec![])
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
26 changes: 26 additions & 0 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v27.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::BeaconChainTypes;
use std::sync::Arc;
use store::{metadata::SchemaVersion, Error, HotColdDB};

/// Add `DataColumnCustodyInfo` entry to v27.
pub fn upgrade_to_v27<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<(), Error> {
if db.spec.is_peer_das_scheduled() {
db.put_data_column_custody_info(None)?;
db.store_schema_version_atomically(SchemaVersion(27), vec![])?;
}

Ok(())
}

pub fn downgrade_from_v27<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<(), Error> {
if db.spec.is_peer_das_scheduled() {
return Err(Error::MigrationError(
"Cannot downgrade from v27 if peerDAS is scheduled".to_string(),
));
}
Ok(())
}
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/validator_custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ impl CustodyContext {
new_custody_group_count: updated_cgc,
sampling_count: self
.num_of_custody_groups_to_sample(Some(effective_epoch), spec),
effective_epoch,
});
}
}
Expand Down Expand Up @@ -287,6 +288,7 @@ impl CustodyContext {
pub struct CustodyCountChanged {
pub new_custody_group_count: u64,
pub sampling_count: u64,
pub effective_epoch: Epoch,
}

/// The custody information that gets persisted across runs.
Expand Down
32 changes: 27 additions & 5 deletions beacon_node/beacon_chain/tests/schema_stability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use operation_pool::PersistedOperationPool;
use ssz::Encode;
use std::sync::{Arc, LazyLock};
use store::{
database::interface::BeaconNodeBackend, hot_cold_store::Split, metadata::DataColumnInfo,
database::interface::BeaconNodeBackend,
hot_cold_store::Split,
metadata::{DataColumnCustodyInfo, DataColumnInfo},
DBColumn, HotColdDB, StoreConfig, StoreItem,
};
use strum::IntoEnumIterator;
use tempfile::{tempdir, TempDir};
use types::{ChainSpec, Hash256, Keypair, MainnetEthSpec};
use types::{ChainSpec, Hash256, Keypair, MainnetEthSpec, Slot};

type E = MainnetEthSpec;
type Store<E> = Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>;
Expand Down Expand Up @@ -84,11 +86,13 @@ async fn schema_stability() {

chain.persist_op_pool().unwrap();
chain.persist_custody_context().unwrap();
insert_data_column_custody_info(&store, &harness.spec);

check_db_columns();
check_metadata_sizes(&store);
check_op_pool(&store);
check_custody_context(&store, &harness.spec);
check_custody_info(&store, &harness.spec);
check_persisted_chain(&store);

// Not covered here:
Expand All @@ -100,13 +104,21 @@ async fn schema_stability() {
fn check_db_columns() {
let current_columns: Vec<&'static str> = DBColumn::iter().map(|c| c.as_str()).collect();
let expected_columns = vec![
"bma", "blk", "blb", "bdc", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs", "bst",
"exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr", "brm",
"dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
"bma", "blk", "blb", "bdc", "bdi", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs",
"bst", "exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr",
"brm", "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
];
assert_eq!(expected_columns, current_columns);
}

fn insert_data_column_custody_info(store: &Store<E>, spec: &ChainSpec) {
if spec.is_peer_das_scheduled() {
store
.put_data_column_custody_info(Some(Slot::new(0)))
.unwrap();
}
}

/// Check the SSZ sizes of known on-disk metadata.
///
/// New types can be added here as the schema evolves.
Expand All @@ -122,6 +134,7 @@ fn check_metadata_sizes(store: &Store<E>) {
}
);
assert_eq!(DataColumnInfo::default().ssz_bytes_len(), 5);
assert_eq!(DataColumnCustodyInfo::default().ssz_bytes_len(), 5);
}

fn check_op_pool(store: &Store<E>) {
Expand All @@ -143,6 +156,15 @@ fn check_custody_context(store: &Store<E>, spec: &ChainSpec) {
}
}

fn check_custody_info(store: &Store<E>, spec: &ChainSpec) {
let data_column_custody_info = store.get_data_column_custody_info().unwrap();
if spec.is_peer_das_scheduled() {
assert_eq!(data_column_custody_info.unwrap().as_ssz_bytes().len(), 13);
} else {
assert!(data_column_custody_info.is_none());
}
}

fn check_persisted_chain(store: &Store<E>) {
let chain = store
.get_item::<PersistedBeaconChain>(&Hash256::ZERO)
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3157,7 +3157,11 @@ async fn schema_downgrade_to_min_version(
)
.await;

let min_version = SchemaVersion(22);
let min_version = if spec.is_fulu_scheduled() {
SchemaVersion(27)
} else {
SchemaVersion(22)
};

// Save the slot clock so that the new harness doesn't revert in time.
let slot_clock = harness.chain.slot_clock.clone();
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3760,7 +3760,6 @@ pub fn serve<T: BeaconChainTypes>(
.to_string(),
));
}

Ok(())
})
},
Expand Down Expand Up @@ -3845,6 +3844,12 @@ pub fn serve<T: BeaconChainTypes>(
current_slot,
&chain.spec,
) {
chain.update_data_column_custody_info(Some(
cgc_change
.effective_epoch
.start_slot(T::EthSpec::slots_per_epoch()),
));

network_tx.send(NetworkMessage::CustodyCountChanged {
new_custody_group_count: cgc_change.new_custody_group_count,
sampling_count: cgc_change.sampling_count,
Expand Down
16 changes: 15 additions & 1 deletion beacon_node/network/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,22 @@ pub(crate) fn status_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>)
finalized_checkpoint.root = Hash256::zero();
}

let earliest_available_slot = beacon_chain.store.get_anchor_info().oldest_block_slot;
// NOTE: We are making an assumption that `get_data_column_custody_info` wont fail.
let earliest_available_data_column_slot = beacon_chain
.store
.get_data_column_custody_info()
.ok()
.flatten()
.and_then(|info| info.earliest_data_column_slot);

// If data_column_custody_info.earliest_data_column_slot is `None`,
// no recent cgc changes have occurred and no cgc backfill is in progress.
let earliest_available_slot =
if let Some(earliest_available_data_column_slot) = earliest_available_data_column_slot {
earliest_available_data_column_slot
} else {
beacon_chain.store.get_anchor_info().oldest_block_slot
};
StatusMessage::V2(StatusMessageV2 {
fork_digest,
finalized_root: finalized_checkpoint.root,
Expand Down
58 changes: 54 additions & 4 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::historic_state_cache::HistoricStateCache;
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, SchemaVersion, ANCHOR_INFO_KEY,
ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY,
CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
STATE_UPPER_LIMIT_NO_RETAIN,
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnCustodyInfo, DataColumnInfo,
SchemaVersion, ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY,
CONFIG_KEY, CURRENT_SCHEMA_VERSION, DATA_COLUMN_CUSTODY_INFO_KEY, DATA_COLUMN_INFO_KEY,
SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::state_cache::{PutStateOutcome, StateCache};
use crate::{
Expand Down Expand Up @@ -91,6 +91,7 @@ struct BlockCache<E: EthSpec> {
block_cache: LruCache<Hash256, SignedBeaconBlock<E>>,
blob_cache: LruCache<Hash256, BlobSidecarList<E>>,
data_column_cache: LruCache<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>,
data_column_custody_info_cache: Option<DataColumnCustodyInfo>,
}

impl<E: EthSpec> BlockCache<E> {
Expand All @@ -99,6 +100,7 @@ impl<E: EthSpec> BlockCache<E> {
block_cache: LruCache::new(size),
blob_cache: LruCache::new(size),
data_column_cache: LruCache::new(size),
data_column_custody_info_cache: None,
}
}
pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock<E>) {
Expand All @@ -112,6 +114,12 @@ impl<E: EthSpec> BlockCache<E> {
.get_or_insert_mut(block_root, Default::default)
.insert(data_column.index, data_column);
}
pub fn put_data_column_custody_info(
&mut self,
data_column_custody_info: Option<DataColumnCustodyInfo>,
) {
self.data_column_custody_info_cache = data_column_custody_info;
}
pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock<E>> {
self.block_cache.get(block_root)
}
Expand All @@ -129,6 +137,9 @@ impl<E: EthSpec> BlockCache<E> {
.get(block_root)
.and_then(|map| map.get(column_index).cloned())
}
pub fn get_data_column_custody_info(&self) -> Option<DataColumnCustodyInfo> {
self.data_column_custody_info_cache.clone()
}
pub fn delete_block(&mut self, block_root: &Hash256) {
let _ = self.block_cache.pop(block_root);
}
Expand Down Expand Up @@ -922,6 +933,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
));
}

pub fn put_data_column_custody_info(
&self,
earliest_data_column_slot: Option<Slot>,
) -> Result<(), Error> {
let data_column_custody_info = DataColumnCustodyInfo {
earliest_data_column_slot,
};

self.blobs_db
.put(&DATA_COLUMN_CUSTODY_INFO_KEY, &data_column_custody_info)?;

self.block_cache
.lock()
.put_data_column_custody_info(Some(data_column_custody_info));

Ok(())
}

pub fn put_data_columns(
&self,
block_root: &Hash256,
Expand Down Expand Up @@ -2389,6 +2418,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
}

/// Fetch custody info from the cache.
/// If custody info doesn't exist in the cache,
/// try to fetch from the DB and prime the cache.
pub fn get_data_column_custody_info(&self) -> Result<Option<DataColumnCustodyInfo>, Error> {
let Some(data_column_custody_info) = self.block_cache.lock().get_data_column_custody_info()
else {
let data_column_custody_info = self
.blobs_db
.get::<DataColumnCustodyInfo>(&DATA_COLUMN_CUSTODY_INFO_KEY)?;

// Update the cache
self.block_cache
.lock()
.put_data_column_custody_info(data_column_custody_info.clone());

return Ok(data_column_custody_info);
};

Ok(Some(data_column_custody_info))
}

/// Fetch all columns for a given block from the store.
pub fn get_data_columns(
&self,
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ pub enum DBColumn {
BeaconBlob,
#[strum(serialize = "bdc")]
BeaconDataColumn,
#[strum(serialize = "bdi")]
BeaconDataColumnCustodyInfo,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
///
/// DEPRECATED.
Expand Down Expand Up @@ -424,6 +426,7 @@ impl DBColumn {
| Self::CustodyContext
| Self::OptimisticTransitionBlock => 32,
Self::BeaconBlockRoots
| Self::BeaconDataColumnCustodyInfo
| Self::BeaconBlockRootsChunked
| Self::BeaconStateRoots
| Self::BeaconStateRootsChunked
Expand Down
27 changes: 26 additions & 1 deletion beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{Hash256, Slot};

pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(26);
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(27);

// All the keys that get stored under the `BeaconMeta` column.
//
Expand All @@ -18,6 +18,7 @@ pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7);
pub const DATA_COLUMN_CUSTODY_INFO_KEY: Hash256 = Hash256::repeat_byte(8);

/// State upper limit value used to indicate that a node is not storing historic states.
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
Expand Down Expand Up @@ -204,6 +205,30 @@ impl StoreItem for BlobInfo {
}
}

/// Database parameter relevant to data column custody sync. There is only at most a single
/// `DataColumnCustodyInfo` stored in the db. `earliest_data_column_slot` is updated when cgc
/// count changes and is updated incrementally during data column custody backfill. Once custody backfill
/// is complete `earliest_data_column_slot` is set to `None`.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnCustodyInfo {
/// The earliest slot for which data columns are available.
pub earliest_data_column_slot: Option<Slot>,
}

impl StoreItem for DataColumnCustodyInfo {
fn db_column() -> DBColumn {
DBColumn::BeaconDataColumnCustodyInfo
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(DataColumnCustodyInfo::from_ssz_bytes(bytes)?)
}
}

/// Database parameters relevant to data column sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnInfo {
Expand Down