Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,10 @@ where
));
}

store
.get_data_column_custody_info(true)
.map_err(|e| format!("Unable to fetch data column custody info {:?}", e))?;

let validator_pubkey_cache = self
.validator_pubkey_cache
.map(|mut validator_pubkey_cache| {
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/validator_custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ impl CustodyContext {
return Some(CustodyCountChanged {
new_custody_group_count: updated_cgc,
sampling_count: self.sampling_size(Some(effective_epoch), spec),
slot: current_slot,
});
}
}
Expand Down Expand Up @@ -258,6 +259,7 @@ impl CustodyContext {
pub struct CustodyCountChanged {
pub new_custody_group_count: u64,
pub sampling_count: u64,
pub slot: Slot,
}

/// The custody information that gets persisted across runs.
Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3790,6 +3790,7 @@ pub fn serve<T: BeaconChainTypes>(
network_tx.send(NetworkMessage::CustodyCountChanged {
new_custody_group_count: cgc_change.new_custody_group_count,
sampling_count: cgc_change.sampling_count,
slot: cgc_change.slot,
}).unwrap_or_else(|e| {
debug!(error = %e, "Could not send message to the network service. \
Likely shutdown")
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub enum NetworkMessage<E: EthSpec> {
CustodyCountChanged {
new_custody_group_count: u64,
sampling_count: u64,
slot: Slot,
},
}

Expand Down Expand Up @@ -743,6 +744,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
NetworkMessage::CustodyCountChanged {
new_custody_group_count,
sampling_count,
slot,
} => {
// subscribe to `sampling_count` subnets
self.libp2p
Expand All @@ -753,6 +755,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
.advertise_false_custody_group_count
.is_none()
{
// Update data column custody info with the slot at which cgc was changed.
self.beacon_chain.store.put_data_column_custody_info(slot).unwrap_or_else(|e| {
tracing::error!(error = ?e, "Failed to update data column custody info")
});
self.libp2p.update_enr_cgc(new_custody_group_count);
}
}
Expand Down
13 changes: 12 additions & 1 deletion beacon_node/network/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,18 @@ pub(crate) fn status_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>)
finalized_checkpoint.root = Hash256::zero();
}

let earliest_available_slot = beacon_chain.store.get_anchor_info().oldest_block_slot;
// If there is no data column custody info in the db, that indicates that
// no recent cgc changes have occurred and no cgc backfill is in progress.
let earliest_available_slot = if let Ok(Some(data_column_custody_info)) =
beacon_chain.store.get_data_column_custody_info(false)
{
std::cmp::max(
beacon_chain.store.get_anchor_info().oldest_block_slot,
data_column_custody_info.earliest_data_column_slot,
)
} else {
beacon_chain.store.get_anchor_info().oldest_block_slot
};

StatusMessage::V2(StatusMessageV2 {
fork_digest,
Expand Down
72 changes: 67 additions & 5 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::historic_state_cache::HistoricStateCache;
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, SchemaVersion, ANCHOR_INFO_KEY,
ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY,
CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
STATE_UPPER_LIMIT_NO_RETAIN,
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnCustodyInfo, DataColumnInfo,
SchemaVersion, ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY,
CONFIG_KEY, CURRENT_SCHEMA_VERSION, DATA_COLUMN_CUSTODY_INFO_KEY, DATA_COLUMN_INFO_KEY,
SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::state_cache::{PutStateOutcome, StateCache};
use crate::{
Expand Down Expand Up @@ -91,6 +91,7 @@ struct BlockCache<E: EthSpec> {
block_cache: LruCache<Hash256, SignedBeaconBlock<E>>,
blob_cache: LruCache<Hash256, BlobSidecarList<E>>,
data_column_cache: LruCache<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>,
data_column_custody_info_cache: Option<DataColumnCustodyInfo>,
}

impl<E: EthSpec> BlockCache<E> {
Expand All @@ -99,6 +100,7 @@ impl<E: EthSpec> BlockCache<E> {
block_cache: LruCache::new(size),
blob_cache: LruCache::new(size),
data_column_cache: LruCache::new(size),
data_column_custody_info_cache: None,
}
}
pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock<E>) {
Expand All @@ -112,6 +114,12 @@ impl<E: EthSpec> BlockCache<E> {
.get_or_insert_mut(block_root, Default::default)
.insert(data_column.index, data_column);
}
pub fn put_data_column_custody_info(
&mut self,
data_column_custody_info: Option<DataColumnCustodyInfo>,
) {
self.data_column_custody_info_cache = data_column_custody_info;
}
pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock<E>> {
self.block_cache.get(block_root)
}
Expand All @@ -129,6 +137,9 @@ impl<E: EthSpec> BlockCache<E> {
.get(block_root)
.and_then(|map| map.get(column_index).cloned())
}
pub fn get_data_column_custody_info(&mut self) -> Option<DataColumnCustodyInfo> {
self.data_column_custody_info_cache.clone()
}
pub fn delete_block(&mut self, block_root: &Hash256) {
let _ = self.block_cache.pop(block_root);
}
Expand Down Expand Up @@ -922,6 +933,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
));
}

pub fn put_data_column_custody_info(
&self,
earliest_data_column_slot: Slot,
) -> Result<(), Error> {
let data_column_custody_info = DataColumnCustodyInfo {
earliest_data_column_slot,
};

self.blobs_db.put_bytes(
DBColumn::BeaconDataColumnCustodyInfo,
DATA_COLUMN_CUSTODY_INFO_KEY.as_slice(),
&data_column_custody_info.as_ssz_bytes(),
)?;

self.block_cache
.lock()
.put_data_column_custody_info(Some(data_column_custody_info));

Ok(())
}

pub fn put_data_columns(
&self,
block_root: &Hash256,
Expand Down Expand Up @@ -2389,6 +2421,36 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
}

/// Fetch custody info from the cache.
/// A `None` value indicates that we have fulfilled our custody
/// requirements up to the DA window.
pub fn get_data_column_custody_info(
&self,
query_store: bool,
) -> Result<Option<DataColumnCustodyInfo>, Error> {
// We only query on startup, when the cache isn't initialized
if query_store {
let bytes_opt = self.blobs_db.get_bytes(
DBColumn::BeaconDataColumnCustodyInfo,
DATA_COLUMN_CUSTODY_INFO_KEY.as_slice(),
)?;

let Some(bytes) = bytes_opt else {
return Ok(None);
};

let data_column_custody_info = Some(DataColumnCustodyInfo::from_ssz_bytes(&bytes)?);
// Update the cache
self.block_cache
.lock()
.put_data_column_custody_info(data_column_custody_info.clone());

Ok(data_column_custody_info)
} else {
Ok(self.block_cache.lock().get_data_column_custody_info())
}
}

/// Fetch all columns for a given block from the store.
pub fn get_data_columns(
&self,
Expand Down Expand Up @@ -3538,7 +3600,7 @@ pub fn get_ancestor_state_root<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStor
.get_cold_state_root(target_slot)
.map_err(Box::new)
.map_err(StateSummaryIteratorError::LoadStateRootError)?
.ok_or_else(|| StateSummaryIteratorError::MissingStateRoot {
.ok_or(StateSummaryIteratorError::MissingStateRoot {
target_slot,
state_upper_limit,
});
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ pub enum DBColumn {
BeaconBlob,
#[strum(serialize = "bdc")]
BeaconDataColumn,
#[strum(serialize = "bdi")]
BeaconDataColumnCustodyInfo,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
///
/// DEPRECATED.
Expand Down Expand Up @@ -424,6 +426,7 @@ impl DBColumn {
| Self::CustodyContext
| Self::OptimisticTransitionBlock => 32,
Self::BeaconBlockRoots
| Self::BeaconDataColumnCustodyInfo
| Self::BeaconBlockRootsChunked
| Self::BeaconStateRoots
| Self::BeaconStateRootsChunked
Expand Down
11 changes: 11 additions & 0 deletions beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7);
pub const DATA_COLUMN_CUSTODY_INFO_KEY: Hash256 = Hash256::repeat_byte(8);

/// State upper limit value used to indicate that a node is not storing historic states.
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
Expand Down Expand Up @@ -204,6 +205,16 @@ impl StoreItem for BlobInfo {
}
}

/// Database parameter relevant to data column custody sync. There is only at most a single
/// `DataColumnCustodyInfo` stored in the db. This record is added to the db when cgc
/// count changes and is updated incrementally during data column custody backfill. Once custody backfill
/// is complete the record is removed from the db.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnCustodyInfo {
/// The earliest slot for which data columns are available.
pub earliest_data_column_slot: Slot,
}

/// Database parameters relevant to data column sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnInfo {
Expand Down