Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6825,6 +6825,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|duration| (fork_name, duration))
}

/// Update data column custody info with the slot at which cgc was changed.
pub fn update_data_column_custody_info(&self, slot: Option<Slot>) {
self.store
.put_data_column_custody_info(slot)
.unwrap_or_else(
|e| tracing::error!(error = ?e, "Failed to update data column custody info"),
);
}

/// This method serves to get a sense of the current chain health. It is used in block proposal
/// to determine whether we should outsource payload production duties.
///
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,10 +828,6 @@ where
));
}

store
.get_data_column_custody_info(true)
.map_err(|e| format!("Unable to fetch data column custody info {:?}", e))?;

let validator_pubkey_cache = self
.validator_pubkey_cache
.map(|mut validator_pubkey_cache| {
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod migration_schema_v23;
mod migration_schema_v24;
mod migration_schema_v25;
mod migration_schema_v26;
mod migration_schema_v27;

use crate::beacon_chain::BeaconChainTypes;
use std::sync::Arc;
Expand Down Expand Up @@ -67,6 +68,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v26::downgrade_from_v26::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(26), SchemaVersion(27)) => {
let ops = migration_schema_v27::upgrade_to_v27::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(27), SchemaVersion(26)) => {
let ops = migration_schema_v27::downgrade_from_v27::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
47 changes: 47 additions & 0 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v27.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::BeaconChainTypes;
use ssz::Encode;
use std::sync::Arc;
use store::metadata::DataColumnCustodyInfo;
use store::metadata::DATA_COLUMN_CUSTODY_INFO_KEY;
use store::{DBColumn, Error, HotColdDB, KeyValueStoreOp};
use tracing::info;

/// Add `DataColumnCustodyInfo` entry to v27.
pub fn upgrade_to_v27<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let ops = if db.spec.is_peer_das_scheduled() {
info!("Adding `DataColumnCustodyInfo` to the db");
let data_column_custody_info = DataColumnCustodyInfo {
earliest_data_column_slot: None,
};
vec![KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconDataColumnCustodyInfo,
DATA_COLUMN_CUSTODY_INFO_KEY.as_slice().to_vec(),
data_column_custody_info.as_ssz_bytes(),
)]
} else {
// Delete it from the db if PeerDAS hasn't been scheduled
vec![KeyValueStoreOp::DeleteKey(
DBColumn::BeaconDataColumnCustodyInfo,
DATA_COLUMN_CUSTODY_INFO_KEY.as_slice().to_vec(),
)]
};

Ok(ops)
}

pub fn downgrade_from_v27<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
if db.spec.is_peer_das_scheduled() {
return Err(Error::MigrationError(
"Cannot downgrade from v27 if peerDAS is scheduled".to_string(),
));
}
let ops = vec![KeyValueStoreOp::DeleteKey(
DBColumn::BeaconDataColumnCustodyInfo,
DATA_COLUMN_CUSTODY_INFO_KEY.as_slice().to_vec(),
)];
Ok(ops)
}
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/validator_custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ impl CustodyContext {
);
return Some(CustodyCountChanged {
new_custody_group_count: updated_cgc,
sampling_count: self.sampling_size(Some(effective_epoch), spec),
sampling_count: self
.num_of_custody_groups_to_sample(Some(effective_epoch), spec),
slot: current_slot,
});
}
Expand Down
11 changes: 7 additions & 4 deletions beacon_node/beacon_chain/tests/schema_stability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use operation_pool::PersistedOperationPool;
use ssz::Encode;
use std::sync::{Arc, LazyLock};
use store::{
database::interface::BeaconNodeBackend, hot_cold_store::Split, metadata::DataColumnInfo,
database::interface::BeaconNodeBackend,
hot_cold_store::Split,
metadata::{DataColumnCustodyInfo, DataColumnInfo},
DBColumn, HotColdDB, StoreConfig, StoreItem,
};
use strum::IntoEnumIterator;
Expand Down Expand Up @@ -100,9 +102,9 @@ async fn schema_stability() {
fn check_db_columns() {
let current_columns: Vec<&'static str> = DBColumn::iter().map(|c| c.as_str()).collect();
let expected_columns = vec![
"bma", "blk", "blb", "bdc", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs", "bst",
"exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr", "brm",
"dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
"bma", "blk", "blb", "bdc", "bdi", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs",
"bst", "exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr",
"brm", "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
];
assert_eq!(expected_columns, current_columns);
}
Expand All @@ -122,6 +124,7 @@ fn check_metadata_sizes(store: &Store<E>) {
}
);
assert_eq!(DataColumnInfo::default().ssz_bytes_len(), 5);
assert_eq!(DataColumnCustodyInfo::default().ssz_bytes_len(), 5);
}

fn check_op_pool(store: &Store<E>) {
Expand Down
49 changes: 11 additions & 38 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3760,44 +3760,6 @@ pub fn serve<T: BeaconChainTypes>(
.to_string(),
));
}

if chain.spec.is_peer_das_scheduled() {
let (finalized_beacon_state, _, _) =
StateId(CoreStateId::Finalized).state(&chain)?;
let validators_and_balances = committee_subscriptions
.iter()
.filter_map(|subscription| {
if let Ok(effective_balance) = finalized_beacon_state
.get_effective_balance(subscription.validator_index as usize)
{
Some((subscription.validator_index as usize, effective_balance))
} else {
None
}
})
.collect::<Vec<_>>();

let current_slot =
chain.slot().map_err(warp_utils::reject::unhandled_error)?;
if let Some(cgc_change) = chain
.data_availability_checker
.custody_context()
.register_validators::<T::EthSpec>(
validators_and_balances,
current_slot,
&chain.spec,
) {
network_tx.send(NetworkMessage::CustodyCountChanged {
new_custody_group_count: cgc_change.new_custody_group_count,
sampling_count: cgc_change.sampling_count,
slot: cgc_change.slot,
}).unwrap_or_else(|e| {
debug!(error = %e, "Could not send message to the network service. \
Likely shutdown")
});
}
}

Ok(())
})
},
Expand All @@ -3812,12 +3774,14 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_globals.clone())
.and(warp_utils::json::json())
.then(
|not_synced_filter: Result<(), Rejection>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
preparation_data: Vec<ProposerPreparationData>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
not_synced_filter?;
Expand Down Expand Up @@ -3878,6 +3842,15 @@ pub fn serve<T: BeaconChainTypes>(
current_slot,
&chain.spec,
) {
// Don't update custody info if we're advertising a
// false custody group count.
if network_globals
.config
.advertise_false_custody_group_count
.is_none()
{
chain.update_data_column_custody_info(Some(cgc_change.slot))
}
network_tx.send(NetworkMessage::CustodyCountChanged {
new_custody_group_count: cgc_change.new_custody_group_count,
sampling_count: cgc_change.sampling_count,
Expand Down
6 changes: 0 additions & 6 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ pub enum NetworkMessage<E: EthSpec> {
CustodyCountChanged {
new_custody_group_count: u64,
sampling_count: u64,
slot: Slot,
},
}

Expand Down Expand Up @@ -744,7 +743,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
NetworkMessage::CustodyCountChanged {
new_custody_group_count,
sampling_count,
slot,
} => {
// subscribe to `sampling_count` subnets
self.libp2p
Expand All @@ -755,10 +753,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
.advertise_false_custody_group_count
.is_none()
{
// Update data column custody info with the slot at which cgc was changed.
self.beacon_chain.store.put_data_column_custody_info(slot).unwrap_or_else(|e| {
tracing::error!(error = ?e, "Failed to update data column custody info")
});
self.libp2p.update_enr_cgc(new_custody_group_count);
}
}
Expand Down
8 changes: 5 additions & 3 deletions beacon_node/network/src/status.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use types::{EthSpec, FixedBytesExtended, Hash256};
use types::{EthSpec, FixedBytesExtended, Hash256, Slot};

use lighthouse_network::rpc::{methods::StatusMessageV2, StatusMessage};
/// Trait to produce a `StatusMessage` representing the state of the given `beacon_chain`.
Expand Down Expand Up @@ -32,11 +32,13 @@ pub(crate) fn status_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>)
// If there is no data column custody info in the db, that indicates that
// no recent cgc changes have occurred and no cgc backfill is in progress.
let earliest_available_slot = if let Ok(Some(data_column_custody_info)) =
beacon_chain.store.get_data_column_custody_info(false)
beacon_chain.store.get_data_column_custody_info()
{
std::cmp::max(
beacon_chain.store.get_anchor_info().oldest_block_slot,
data_column_custody_info.earliest_data_column_slot,
data_column_custody_info
.earliest_data_column_slot
.unwrap_or(Slot::new(0)),
)
} else {
beacon_chain.store.get_anchor_info().oldest_block_slot
Expand Down
23 changes: 10 additions & 13 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

pub fn put_data_column_custody_info(
&self,
earliest_data_column_slot: Slot,
earliest_data_column_slot: Option<Slot>,
) -> Result<(), Error> {
let data_column_custody_info = DataColumnCustodyInfo {
earliest_data_column_slot,
Expand Down Expand Up @@ -2422,14 +2422,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

/// Fetch custody info from the cache.
/// A `None` value indicates that we have fulfilled our custody
/// requirements up to the DA window.
pub fn get_data_column_custody_info(
&self,
query_store: bool,
) -> Result<Option<DataColumnCustodyInfo>, Error> {
// We only query on startup, when the cache isn't initialized
if query_store {
/// If custody info doesn't exist in the cache,
/// try to fetch from the DB and prime the cache.
pub fn get_data_column_custody_info(&self) -> Result<Option<DataColumnCustodyInfo>, Error> {
let Some(data_column_custody_info) = self.block_cache.lock().get_data_column_custody_info()
else {
let bytes_opt = self.blobs_db.get_bytes(
DBColumn::BeaconDataColumnCustodyInfo,
DATA_COLUMN_CUSTODY_INFO_KEY.as_slice(),
Expand All @@ -2445,10 +2442,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.lock()
.put_data_column_custody_info(data_column_custody_info.clone());

Ok(data_column_custody_info)
} else {
Ok(self.block_cache.lock().get_data_column_custody_info())
}
return Ok(data_column_custody_info);
};

Ok(Some(data_column_custody_info))
}

/// Fetch all columns for a given block from the store.
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl StoreItem for BlobInfo {
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnCustodyInfo {
/// The earliest slot for which data columns are available.
pub earliest_data_column_slot: Slot,
pub earliest_data_column_slot: Option<Slot>,
}

/// Database parameters relevant to data column sync.
Expand Down