Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,6 @@ where
.init_blob_info(genesis.beacon_block.slot())
.map_err(|e| format!("Failed to initialize genesis blob info: {:?}", e))?,
);
self.pending_io_batch.push(
store
.init_data_column_info(genesis.beacon_block.slot())
.map_err(|e| format!("Failed to initialize genesis data column info: {:?}", e))?,
);

let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis)
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
Expand Down Expand Up @@ -623,11 +618,6 @@ where
.init_blob_info(weak_subj_block.slot())
.map_err(|e| format!("Failed to initialize blob info: {:?}", e))?,
);
self.pending_io_batch.push(
store
.init_data_column_info(weak_subj_block.slot())
.map_err(|e| format!("Failed to initialize data column info: {:?}", e))?,
);

let snapshot = BeaconSnapshot {
beacon_block_root: weak_subj_block_root,
Expand Down
18 changes: 1 addition & 17 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use state_processing::{
use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::metadata::DataColumnInfo;
use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp};
use strum::IntoStaticStr;
use tracing::debug;
Expand Down Expand Up @@ -69,7 +68,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<usize, HistoricalBlockError> {
let anchor_info = self.store.get_anchor_info();
let blob_info = self.store.get_blob_info();
let data_column_info = self.store.get_data_column_info();

// Take all blocks with slots less than the oldest block slot.
let num_relevant = blocks.partition_point(|available_block| {
Expand All @@ -96,7 +94,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut expected_block_root = anchor_info.oldest_block_parent;
let mut prev_block_slot = anchor_info.oldest_block_slot;
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot;

let mut blob_batch = Vec::<KeyValueStoreOp>::new();
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
Expand Down Expand Up @@ -133,7 +130,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
new_oldest_blob_slot = Some(block.slot());
}
AvailableBlockData::DataColumns(_) => {
new_oldest_data_column_slot = Some(block.slot());
// No need to update `new_oldest_blob_slot` for data columns
}
}

Expand Down Expand Up @@ -249,19 +246,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

// Update the data column info.
if new_oldest_data_column_slot != data_column_info.oldest_data_column_slot {
if let Some(oldest_data_column_slot) = new_oldest_data_column_slot {
let new_data_column_info = DataColumnInfo {
oldest_data_column_slot: Some(oldest_data_column_slot),
};
anchor_and_blob_batch.push(
self.store
.compare_and_set_data_column_info(data_column_info, new_data_column_info)?,
);
}
}

// Update the anchor.
let new_anchor = AnchorInfo {
oldest_block_slot: prev_block_slot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,12 +1022,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
};

let oldest_data_column_slot = self
.chain
.store
.get_data_column_info()
.oldest_data_column_slot
.unwrap_or(data_availability_boundary_slot);
let oldest_data_column_slot = data_availability_boundary_slot;

if request_start_slot < oldest_data_column_slot {
debug!(
Expand Down
102 changes: 2 additions & 100 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use crate::historic_state_cache::HistoricStateCache;
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, SchemaVersion, ANCHOR_INFO_KEY,
AnchorInfo, BlobInfo, CompactionTimestamp, SchemaVersion, ANCHOR_INFO_KEY,
ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY,
CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
STATE_UPPER_LIMIT_NO_RETAIN,
CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::state_cache::{PutStateOutcome, StateCache};
use crate::{
Expand Down Expand Up @@ -58,7 +57,6 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The starting slots for the range of blobs stored in the database.
blob_info: RwLock<BlobInfo>,
/// The starting slots for the range of data columns stored in the database.
data_column_info: RwLock<DataColumnInfo>,
pub(crate) config: StoreConfig,
pub hierarchy: HierarchyModuli,
/// Cold database containing compact historical data.
Expand Down Expand Up @@ -214,7 +212,6 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(ANCHOR_UNINITIALIZED),
blob_info: RwLock::new(BlobInfo::default()),
data_column_info: RwLock::new(DataColumnInfo::default()),
cold_db: MemoryStore::open(),
blobs_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
Expand Down Expand Up @@ -266,7 +263,6 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
split: RwLock::new(Split::default()),
anchor_info,
blob_info: RwLock::new(BlobInfo::default()),
data_column_info: RwLock::new(DataColumnInfo::default()),
blobs_db: BeaconNodeBackend::open(&config, blobs_db_path)?,
cold_db: BeaconNodeBackend::open(&config, cold_path)?,
hot_db,
Expand Down Expand Up @@ -349,35 +345,9 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
};
db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?;

let data_column_info = db.load_data_column_info()?;
let fulu_fork_slot = db
.spec
.fulu_fork_epoch
.map(|epoch| epoch.start_slot(E::slots_per_epoch()));
let new_data_column_info = match &data_column_info {
Some(data_column_info) => {
// Set the oldest data column slot to the fork slot if it is not yet set.
let oldest_data_column_slot =
data_column_info.oldest_data_column_slot.or(fulu_fork_slot);
DataColumnInfo {
oldest_data_column_slot,
}
}
// First start.
None => DataColumnInfo {
// Set the oldest data column slot to the fork slot if it is not yet set.
oldest_data_column_slot: fulu_fork_slot,
},
};
db.compare_and_set_data_column_info_with_write(
<_>::default(),
new_data_column_info.clone(),
)?;

info!(
path = ?blobs_db_path,
oldest_blob_slot = ?new_blob_info.oldest_blob_slot,
oldest_data_column_slot = ?new_data_column_info.oldest_data_column_slot,
"Blob DB initialized"
);

Expand Down Expand Up @@ -2633,24 +2603,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.blob_info.read_recursive().clone()
}

/// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint.
pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result<KeyValueStoreOp, Error> {
let oldest_data_column_slot = self.spec.fulu_fork_epoch.map(|fork_epoch| {
std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch()))
});
let data_column_info = DataColumnInfo {
oldest_data_column_slot,
};
self.compare_and_set_data_column_info(self.get_data_column_info(), data_column_info)
}

/// Get a clone of the store's data column info.
///
/// To do mutations, use `compare_and_set_data_column_info`.
pub fn get_data_column_info(&self) -> DataColumnInfo {
self.data_column_info.read_recursive().clone()
}

/// Atomically update the blob info from `prev_value` to `new_value`.
///
/// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other
Expand Down Expand Up @@ -2698,56 +2650,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blob_info.as_kv_store_op(BLOB_INFO_KEY)
}

/// Atomically update the data column info from `prev_value` to `new_value`.
///
/// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other
/// values.
///
/// Return an `DataColumnInfoConcurrentMutation` error if the `prev_value` provided
/// is not correct.
pub fn compare_and_set_data_column_info(
&self,
prev_value: DataColumnInfo,
new_value: DataColumnInfo,
) -> Result<KeyValueStoreOp, Error> {
let mut data_column_info = self.data_column_info.write();
if *data_column_info == prev_value {
let kv_op = self.store_data_column_info_in_batch(&new_value);
*data_column_info = new_value;
Ok(kv_op)
} else {
Err(Error::DataColumnInfoConcurrentMutation)
}
}

/// As for `compare_and_set_data_column_info`, but also writes the blob info to disk immediately.
pub fn compare_and_set_data_column_info_with_write(
&self,
prev_value: DataColumnInfo,
new_value: DataColumnInfo,
) -> Result<(), Error> {
let kv_store_op = self.compare_and_set_data_column_info(prev_value, new_value)?;
self.hot_db.do_atomically(vec![kv_store_op])
}

/// Load the blob info from disk, but do not set `self.data_column_info`.
fn load_data_column_info(&self) -> Result<Option<DataColumnInfo>, Error> {
self.hot_db
.get(&DATA_COLUMN_INFO_KEY)
.map_err(|e| Error::LoadDataColumnInfo(e.into()))
}

/// Store the given `data_column_info` to disk.
///
/// The argument is intended to be `self.data_column_info`, but is passed manually to avoid issues
/// with recursive locking.
fn store_data_column_info_in_batch(
&self,
data_column_info: &DataColumnInfo,
) -> KeyValueStoreOp {
data_column_info.as_kv_store_op(DATA_COLUMN_INFO_KEY)
}

/// Return the slot-window describing the available historic states.
///
/// Returns `(lower_limit, upper_limit)`.
Expand Down
27 changes: 0 additions & 27 deletions beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,30 +203,3 @@ impl StoreItem for BlobInfo {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

/// Database parameters relevant to data column sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnInfo {
/// The slot after which data columns are or *will be* available (>=).
///
/// If this slot is in the future, then it is the first slot of the Fulu fork, from which
/// data columns will be available.
///
/// If the `oldest_data_column_slot` is `None` then this means that the Fulu fork epoch is
/// not yet known.
pub oldest_data_column_slot: Option<Slot>,
}

impl StoreItem for DataColumnInfo {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}