diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 2c05df3c7fe..440388661c2 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -377,7 +377,7 @@ where .store .get_hot_state(&self.justified_state_root, update_cache) .map_err(Error::FailedToReadState)? - .ok_or_else(|| Error::MissingState(self.justified_state_root))?; + .ok_or(Error::MissingState(self.justified_state_root))?; self.justified_balances = JustifiedBalances::from_justified_state(&state)?; } diff --git a/beacon_node/beacon_chain/src/historical_data_columns.rs b/beacon_node/beacon_chain/src/historical_data_columns.rs new file mode 100644 index 00000000000..0e7b2d150c2 --- /dev/null +++ b/beacon_node/beacon_chain/src/historical_data_columns.rs @@ -0,0 +1,148 @@ +use std::collections::{HashMap, HashSet}; + +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, + data_column_verification::verify_kzg_for_data_column_list_with_scoring, +}; +use store::{Error as StoreError, KeyValueStore}; +use types::{ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, Hash256, Slot}; + +#[derive(Debug)] +pub enum HistoricalDataColumnError { + // The provided data column sidecar pertains to a block that doesn't exist in the database. + NoBlockFound { + data_column_block_root: Hash256, + }, + + /// Logic error: should never occur. + IndexOutOfBounds, + + /// The provided data column sidecar list doesn't contain columns for the full range of slots for the given epoch. + MissingDataColumns { + missing_slots_and_data_columns: Vec<(Slot, ColumnIndex)>, + }, + + /// The provided data column sidecar list contains at least one column with an invalid kzg commitment. + InvalidKzg, + + /// Internal store error + StoreError(StoreError), + + /// Internal beacon chain error + BeaconChainError(Box), +} + +impl From for HistoricalDataColumnError { + fn from(e: StoreError) -> Self { + Self::StoreError(e) + } +} + +impl BeaconChain { + /// Store a batch of historical data columns in the database. + /// + /// The data columns block roots and proposer signatures are verified with the existing + /// block stored in the DB. This function assumes that KZG proofs have already been verified. + /// + /// This function requires that the data column sidecar list contains columns for a full epoch. + /// + /// Return the number of `data_columns` successfully imported. + pub fn import_historical_data_column_batch( + &self, + epoch: Epoch, + historical_data_column_sidecar_list: DataColumnSidecarList, + ) -> Result { + let mut total_imported = 0; + let mut ops = vec![]; + + let unique_column_indices = historical_data_column_sidecar_list + .iter() + .map(|item| item.index) + .collect::>(); + + let mut slot_and_column_index_to_data_columns = historical_data_column_sidecar_list + .iter() + .map(|data_column| ((data_column.slot(), data_column.index), data_column)) + .collect::>(); + + if historical_data_column_sidecar_list.is_empty() { + return Ok(total_imported); + } + + let forward_blocks_iter = self + .forwards_iter_block_roots_until( + epoch.start_slot(T::EthSpec::slots_per_epoch()), + epoch.end_slot(T::EthSpec::slots_per_epoch()), + ) + .map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?; + + for block_iter_result in forward_blocks_iter { + let (block_root, slot) = block_iter_result + .map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?; + + for column_index in unique_column_indices.clone() { + if let Some(data_column) = + slot_and_column_index_to_data_columns.remove(&(slot, column_index)) + { + if self + .store + .get_data_column(&block_root, &data_column.index)? + .is_none() + { + tracing::debug!( + block_root = ?block_root, + column_index = data_column.index, + "Skipping data column import as identical data column exists" + ); + continue; + } + if block_root != data_column.block_root() { + return Err(HistoricalDataColumnError::NoBlockFound { + data_column_block_root: data_column.block_root(), + }); + } + self.store.data_column_as_kv_store_ops( + &block_root, + data_column.clone(), + &mut ops, + ); + total_imported += 1; + } + } + } + + verify_kzg_for_data_column_list_with_scoring( + historical_data_column_sidecar_list.iter(), + &self.kzg, + ) + .map_err(|_| HistoricalDataColumnError::InvalidKzg)?; + + self.store.blobs_db.do_atomically(ops)?; + + if slot_and_column_index_to_data_columns.is_empty() { + self.store.put_data_column_custody_info(Some( + epoch.start_slot(T::EthSpec::slots_per_epoch()), + ))?; + } else { + tracing::warn!( + ?epoch, + missing_slots = ?slot_and_column_index_to_data_columns.keys().map(|(slot, _)| slot), + "Some data columns are missing from the batch" + ); + return Err(HistoricalDataColumnError::MissingDataColumns { + missing_slots_and_data_columns: slot_and_column_index_to_data_columns + .keys() + .cloned() + .collect::>(), + }); + } + + self.data_availability_checker + .custody_context() + .backfill_custody_count_at_epoch(epoch); + + tracing::debug!(total_imported, "Imported historical data columns"); + + Ok(total_imported) + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 9d8c3dba38f..fd2162e7d31 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -28,6 +28,7 @@ pub mod fork_choice_signal; pub mod fork_revert; pub mod graffiti_calculator; pub mod historical_blocks; +pub mod historical_data_columns; pub mod kzg_utils; pub mod light_client_finality_update_verification; pub mod light_client_optimistic_update_verification; diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 1c89624f9d7..f47933d62c4 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -30,8 +30,10 @@ struct ValidatorRegistrations { /// /// Note: Only stores the epoch value when there's a change in custody requirement. /// So if epoch 10 and 11 has the same custody requirement, only 10 is stored. - /// This map is never pruned, because currently we never decrease custody requirement, so this - /// map size is contained at 128. + /// This map is only pruned during custody backfill. If epoch 11 has custody requirements + /// that are then backfilled to epoch 10, the value at epoch 11 will be removed and epoch 10 + /// will be added to the map instead. This should keep map size constrained to a maximum + /// value of 128. epoch_validator_custody_requirements: BTreeMap, } @@ -99,6 +101,22 @@ impl ValidatorRegistrations { None } } + + /// Updates the `epoch_validator_custody_requirements` map by pruning all values on/after `effective_epoch` + /// and updating the map to store the latest validator custody requirements for the `effective_epoch`. + pub fn backfill_validator_custody_requirements(&mut self, effective_epoch: Epoch) { + if let Some(latest_validator_custody) = self.latest_validator_custody_requirement() { + self.epoch_validator_custody_requirements + .retain(|&epoch, custody_requirement| { + !(epoch >= effective_epoch && *custody_requirement == latest_validator_custody) + }); + + self.epoch_validator_custody_requirements + .entry(effective_epoch) + .and_modify(|old_custody| *old_custody = latest_validator_custody) + .or_insert(latest_validator_custody); + } + } } /// Given the `validator_custody_units`, return the custody requirement based on @@ -250,6 +268,7 @@ impl CustodyContext { ); return Some(CustodyCountChanged { new_custody_group_count: updated_cgc, + old_custody_group_count: current_cgc, sampling_count: self.num_of_custody_groups_to_sample(effective_epoch, spec), effective_epoch, }); @@ -324,6 +343,13 @@ impl CustodyContext { &all_columns_ordered[..num_of_columns_to_sample] } + // TODO(custody-sync) delete this once it becomes unused (after testing) + pub fn get_all_custody_columns(&self) -> &[ColumnIndex] { + self.all_custody_columns_ordered + .get() + .expect("Should be init") + } + /// Returns the ordered list of column indices that the node is assigned to custody /// (and advertised to peers) at the given epoch. If epoch is `None`, this function /// computes the custody columns at head. @@ -352,14 +378,22 @@ impl CustodyContext { .all_custody_columns_ordered .get() .expect("all_custody_columns_ordered should be initialized"); + &all_columns_ordered[..custody_group_count] } + + pub fn backfill_custody_count_at_epoch(&self, effective_epoch: Epoch) { + self.validator_registrations + .write() + .backfill_validator_custody_requirements(effective_epoch); + } } /// The custody count changed because of a change in the /// number of validators being managed. pub struct CustodyCountChanged { pub new_custody_group_count: u64, + pub old_custody_group_count: u64, pub sampling_count: u64, pub effective_epoch: Epoch, } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index fbb592b510f..0ff19894c34 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -4,6 +4,7 @@ use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::builder::BeaconChainBuilder; use beacon_chain::data_availability_checker::AvailableBlock; +use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::schema_change::migrate_schema; use beacon_chain::test_utils::SyncCommitteeStrategy; use beacon_chain::test_utils::{ @@ -2718,6 +2719,242 @@ async fn weak_subjectivity_sync_test( assert_eq!(store.get_anchor_info().state_upper_limit, Slot::new(0)); } +#[tokio::test] +async fn test_import_historical_data_columns_batch() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Epoch::new(0).start_slot(E::slots_per_epoch()) + 1; + let end_slot = Epoch::new(0).end_slot(E::slots_per_epoch()); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + for data_column in data_columns.unwrap() { + data_columns_list.push(data_column); + } + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + harness + .chain + .import_historical_data_column_batch(Epoch::new(0), data_columns_list) + .unwrap(); + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()) + } +} + +// This should verify that a data column sidecar containing mismatched block roots should fail to be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_mismatched_block_root() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + if data_column.index % 2 == 0 { + data_column.signed_block_header.message.body_root = Hash256::ZERO; + } + + data_columns_list.push(Arc::new(data_column)); + } + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(Epoch::new(0), data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::NoBlockFound { .. } + )); +} + +// This should verify that a data column sidecar associated to a block root that doesn't exist in the store cannot +// be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_no_block_found() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + data_column.signed_block_header.message.body_root = Hash256::ZERO; + data_columns_list.push(Arc::new(data_column)); + } + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(Epoch::new(0), data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::NoBlockFound { .. } + )); +} + /// Test that blocks and attestations that refer to states around an unaligned split state are /// processed correctly. #[tokio::test] diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 1e58c210daa..8a481353339 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -80,6 +80,7 @@ pub fn spawn_notifier( // Perform post-genesis logging. let mut last_backfill_log_slot = None; + let mut last_custody_backfill_log_slot = None; loop { // Run the notifier half way through each slot. @@ -154,6 +155,25 @@ pub fn spawn_notifier( Instant::now(), ); } + SyncState::CustodyBackFillSyncing { .. } => { + // TODO(custody-sync) this is a mess, need to fix + match beacon_chain.store.get_data_column_custody_info() { + Ok(data_column_custody_info) => { + if let Some(earliest_data_column_slot) = data_column_custody_info + .and_then(|info| info.earliest_data_column_slot) + { + if let Some(da_boundary) = beacon_chain.data_availability_boundary() + { + sync_distance = earliest_data_column_slot.saturating_sub( + da_boundary.start_slot(T::EthSpec::slots_per_epoch()), + ); + } + speedo.observe(earliest_data_column_slot, Instant::now()) + } + } + Err(e) => error!(error=?e, "Unable to get data column custody info"), + } + } SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } | SyncState::SyncTransition => { @@ -190,6 +210,8 @@ pub fn spawn_notifier( // Log if we are backfilling. let is_backfilling = matches!(current_sync_state, SyncState::BackFillSyncing { .. }); + let is_custody_backfilling = + matches!(current_sync_state, SyncState::CustodyBackFillSyncing { .. }); if is_backfilling && last_backfill_log_slot .is_none_or(|slot| slot + BACKFILL_LOG_INTERVAL <= current_slot) @@ -234,6 +256,47 @@ pub fn spawn_notifier( info!("Historical block download complete"); } + if is_custody_backfilling + && last_custody_backfill_log_slot + .is_none_or(|slot| slot + BACKFILL_LOG_INTERVAL <= current_slot) + { + last_custody_backfill_log_slot = Some(current_slot); + + let distance = format!( + "{} slots ({})", + sync_distance.as_u64(), + slot_distance_pretty(sync_distance, slot_duration) + ); + + let speed = speedo.slots_per_second(); + let display_speed = speed.is_some_and(|speed| speed != 0.0); + + if display_speed { + info!( + distance, + speed = sync_speed_pretty(speed), + est_time = estimated_time_pretty( + speedo.estimated_time_till_slot( + original_oldest_block_slot + .saturating_sub(beacon_chain.genesis_backfill_slot) + ) + ), + "Downloading historical data columns" + ); + } else { + info!( + distance, + est_time = estimated_time_pretty( + speedo.estimated_time_till_slot( + original_oldest_block_slot + .saturating_sub(beacon_chain.genesis_backfill_slot) + ) + ), + "Downloading historical data columns" + ); + } + } + // Log if we are syncing if current_sync_state.is_syncing() { metrics::set_gauge(&metrics::IS_SYNCED, 0); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b0b4f9df56f..b6af8f78104 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -481,7 +481,8 @@ pub fn serve( } SyncState::SyncingHead { .. } | SyncState::SyncTransition - | SyncState::BackFillSyncing { .. } => Ok(()), + | SyncState::BackFillSyncing { .. } + | SyncState::CustodyBackFillSyncing { .. } => Ok(()), SyncState::Synced => Ok(()), SyncState::Stalled => Ok(()), } @@ -3809,6 +3810,14 @@ pub fn serve( let current_slot = chain.slot().map_err(warp_utils::reject::unhandled_error)?; + + let current_custody_columns = chain + .sampling_columns_for_epoch( + current_slot.epoch(T::EthSpec::slots_per_epoch()), + ) + .iter() + .copied() + .collect::>(); if let Some(cgc_change) = chain .data_availability_checker .custody_context() @@ -3819,10 +3828,20 @@ pub fn serve( .effective_epoch .start_slot(T::EthSpec::slots_per_epoch()), )); + let updated_custody_columns = chain + .sampling_columns_for_epoch(cgc_change.effective_epoch) + .iter() + .copied() + .collect::>(); + let new_column_indices = current_custody_columns + .symmetric_difference(&updated_custody_columns) + .copied() + .collect::>(); network_tx.send(NetworkMessage::CustodyCountChanged { new_custody_group_count: cgc_change.new_custody_group_count, sampling_count: cgc_change.sampling_count, + new_column_indices, }).unwrap_or_else(|e| { debug!(error = %e, "Could not send message to the network service. \ Likely shutdown") @@ -4127,6 +4146,60 @@ pub fn serve( }, ); + // POST lighthouse/custody_count + let post_lighthouse_increase_custody_count = warp::path("lighthouse") + .and(warp::path("custody_count")) + .and(warp::path::end()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(network_tx_filter.clone()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + let effective_epoch = chain + .canonical_head + .cached_head() + .head_slot() + .epoch(T::EthSpec::slots_per_epoch()) + - 1; + + let cgc = chain + .data_availability_checker + .custody_context() + .num_of_custody_groups_to_sample(effective_epoch, &chain.spec); + + let sampling_count = chain + .data_availability_checker + .custody_context() + .num_of_custody_groups_to_sample(effective_epoch, &chain.spec); + + let _ = chain.store.put_data_column_custody_info(Some( + (effective_epoch).end_slot(T::EthSpec::slots_per_epoch()), + )); + + let updated_custody_columns = chain + .data_availability_checker + .custody_context() + .get_all_custody_columns() + .iter() + .copied() + .collect::>(); + + publish_network_message( + &network_tx, + NetworkMessage::CustodyCountChanged { + new_custody_group_count: cgc, + sampling_count, + new_column_indices: updated_custody_columns, + }, + )?; + Ok(()) + }) + }, + ); + // POST lighthouse/compaction let post_lighthouse_compaction = warp::path("lighthouse") .and(warp::path("compaction")) @@ -4883,6 +4956,7 @@ pub fn serve( .uor(post_lighthouse_compaction) .uor(post_lighthouse_add_peer) .uor(post_lighthouse_remove_peer) + .uor(post_lighthouse_increase_custody_count) .recover(warp_utils::reject::handle_rejection), ), ) diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 0f5fd99c279..264f697da8f 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -30,6 +30,8 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Custody sync Data column by range request + CustodySyncDataColumnsByRange(CustodySyncByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -40,6 +42,13 @@ pub struct DataColumnsByRootRequestId { pub requester: DataColumnsByRootRequester, } +/// Request ID for data_columns_by_root requests for custody sync. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodySyncDataColumnsByRootRequestId { + pub id: Id, + pub parent_id: Id, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct BlocksByRangeRequestId { /// Id to identify this attempt at a blocks_by_range request for `parent_request_id` @@ -80,6 +89,37 @@ pub struct ComponentsByRangeRequestId { pub requester: RangeRequestId, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodySyncByRangeRequestId { + /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the "parent request". + pub parent_request_id: CustodySyncBatchRequestId, + /// The peer id associated with the request. + /// + /// This is useful to penalize the peer at a later point if it returned data columns that + /// did not match with the verified block. + pub peer: PeerId, +} + +// A batch of data columns by range request for custody sync. Includes an ID for downstream consumers to +// handle retries and tie all the range requests for the given epoch together. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodySyncBatchRequestId { + /// For each `epoch` we may request the same data in a later retry. This Id identifies the + /// current attempt. + pub id: Id, + pub epoch: Epoch, +} + +impl Display for CustodySyncBatchRequestId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let id = self.id; + let epoch = self.epoch; + write!(f, "CustodySync/{id}/{epoch}") + } +} + /// Range sync chain or backfill batch #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RangeRequestId { @@ -217,6 +257,7 @@ impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); impl_display!(CustodyId, "{}", requester); +impl_display!(CustodySyncByRangeRequestId, "{}/{}", id, parent_request_id); impl Display for DataColumnsByRootRequester { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index bcb4758386a..e1af032b623 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -4,6 +4,7 @@ use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV3}; use crate::types::{BackFillState, SyncState}; use crate::{Client, Enr, EnrExt, GossipTopic, Multiaddr, NetworkConfig, PeerId}; +use eth2::lighthouse::sync_state::CustodyBackFillState; use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; @@ -28,6 +29,8 @@ pub struct NetworkGlobals { pub sync_state: RwLock, /// The current state of the backfill sync. pub backfill_state: RwLock, + /// The current state of custody sync. + pub custody_sync_state: RwLock, /// The computed sampling subnets and columns is stored to avoid re-computing. pub sampling_subnets: RwLock>, /// Network-related configuration. Immutable after initialization. @@ -90,6 +93,7 @@ impl NetworkGlobals { gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::Paused), + custody_sync_state: RwLock::new(CustodyBackFillState::Paused), sampling_subnets: RwLock::new(sampling_subnets), config, spec, diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index 0bbbcebaf29..3f57406fc78 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -10,7 +10,7 @@ pub type EnrSyncCommitteeBitfield = BitVector<::SyncCommitteeSu pub type Enr = discv5::enr::Enr; -pub use eth2::lighthouse::sync_state::{BackFillState, SyncState}; +pub use eth2::lighthouse::sync_state::{BackFillState, CustodyBackFillState, SyncState}; pub use globals::NetworkGlobals; pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index a2b5af8b086..b80603374c3 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -212,6 +212,22 @@ pub static BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL: LazyLock, +> = LazyLock::new(|| { + try_create_int_counter( + "beacon_processor_custody_backfill_column_import_success_total", + "Total number of columns successfully processed.", + ) +}); +pub static BEACON_PROCESSOR_CUSTODY_BACKFILL_BATCH_FAILED_TOTAL: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_processor_custody_backfill_batch_failed_total", + "Total number of custody backfill batches that failed to be processed.", + ) + }); // Chain segments. pub static BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL: LazyLock> = LazyLock::new(|| { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 73349cd4314..14c2ad02473 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -494,6 +494,27 @@ impl NetworkBeaconProcessor { }) } + pub fn send_data_columns( + self: &Arc, + process_id: Epoch, + data_columns: DataColumnSidecarList, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .process_data_columns(process_id, data_columns) + .await; + }; + let process_fn = Box::pin(process_fn); + + let work = Work::ChainSegmentBackfill(process_fn); + + self.try_send(BeaconWorkEvent { + drop_during_sync: true, + work, + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index f24495cc54c..142272a8b3d 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,6 +1,7 @@ use crate::metrics::{self, register_process_result_metrics}; use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProcessor}; use crate::sync::BatchProcessResult; +use crate::sync::manager::CustodyBatchProcessResult; use crate::sync::{ ChainId, manager::{BlockProcessType, SyncMessage}, @@ -8,6 +9,7 @@ use crate::sync::{ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; +use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, validator_monitor::get_slot_delay_ms, @@ -438,6 +440,99 @@ impl NetworkBeaconProcessor { }); } + pub async fn process_data_columns( + &self, + process_id: Epoch, + downloaded_columns: DataColumnSidecarList, + ) { + let sent_columns = downloaded_columns.len(); + let result = match self + .chain + .import_historical_data_column_batch(process_id, downloaded_columns) + { + Ok(imported_columns) => { + metrics::inc_counter_by( + &metrics::BEACON_PROCESSOR_CUSTODY_BACKFILL_COLUMN_IMPORT_SUCCESS_TOTAL, + imported_columns as u64, + ); + CustodyBatchProcessResult::Success { + batch_id: process_id, + sent_columns, + imported_columns, + } + } + Err(e) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_CUSTODY_BACKFILL_BATCH_FAILED_TOTAL, + ); + let peer_action: Option = match &e { + HistoricalDataColumnError::NoBlockFound { + data_column_block_root, + } => { + debug!( + error = "no_block_found", + ?data_column_block_root, + "Custody backfill batch processing error" + ); + // The peer is faulty if they send blocks with bad roots. + Some(PeerAction::LowToleranceError) + } + HistoricalDataColumnError::MissingDataColumns { .. } => { + warn!( + error = ?e, + "Custody backfill batch processing error", + ); + // The peer is faulty if they don't return data columns + // that they advertised as available. + Some(PeerAction::LowToleranceError) + } + HistoricalDataColumnError::InvalidKzg => { + warn!( + error = ?e, + "Custody backfill batch processing error", + ); + // The peer is faulty if they don't return data columns + // with valid kzg commitments. + Some(PeerAction::LowToleranceError) + } + HistoricalDataColumnError::BeaconChainError(e) => { + warn!( + error = ?e, + "Custody ackfill batch processing error", + ); + + // This is an interal error, don't penalize the peer + None + } + HistoricalDataColumnError::IndexOutOfBounds => { + error!( + error = ?e, + "Custody backfill batch out of bounds error" + ); + // This should never occur, don't penalize the peer. + None + } + HistoricalDataColumnError::StoreError(e) => { + warn!(error = ?e, "Custody backfill batch processing error"); + // This is an internal error, don't penalize the peer. + None + } + }; + match peer_action { + Some(penalty) => CustodyBatchProcessResult::FaultyFailure { + imported_columns: 0, + penalty, + batch_id: process_id, + }, + None => CustodyBatchProcessResult::NonFaultyFailure { + batch_id: process_id, + }, + } + } + }; + self.send_sync_message(SyncMessage::CustodyBatchProcessed { result }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. #[instrument( diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 2027a525e6b..a27b6951582 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -1352,6 +1352,39 @@ async fn test_backfill_sync_processing() { } } +/// Ensure that custody sync batches get rate-limited and processing is scheduled at specified intervals. +#[tokio::test] +async fn test_custody_sync_processing() { + let mut rig = TestRig::new(SMALL_CHAIN).await; + let slot_count = 4; + + let all_custody_columns = rig + .chain + .sampling_columns_for_epoch(rig.chain.epoch().unwrap()); + let available_columns: Vec = all_custody_columns.to_vec(); + + let requested_columns = vec![available_columns[0], available_columns[2]]; + // Note: to verify the exact event times in an integration test is not straight forward here + // (not straight forward to manipulate `TestingSlotClock` due to cloning of `SlotClock` in code) + // and makes the test very slow, hence timing calculation is unit tested separately in + // `work_reprocessing_queue`. + for _ in 0..1 { + rig.enqueue_data_columns_by_range_request(slot_count, requested_columns.clone()); + // ensure queued batch is not processed until later + rig.assert_no_events_for(Duration::from_millis(100)).await; + // A new batch should be processed within a slot. + rig.assert_event_journal_with_timeout( + &[ + WorkType::DataColumnsByRangeRequest.into(), + WORKER_FREED, + NOTHING_TO_DO, + ], + rig.chain.slot_clock.slot_duration(), + ) + .await; + } +} + /// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled. #[tokio::test] async fn test_backfill_sync_processing_rate_limiting_disabled() { @@ -1378,6 +1411,39 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() { .await; } +/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled. +#[tokio::test] +async fn test_custody_sync_processing_rate_limiting_disabled() { + let beacon_processor_config = BeaconProcessorConfig { + enable_backfill_rate_limiting: false, + ..Default::default() + }; + let mut rig = + TestRig::new_parametric(SMALL_CHAIN, beacon_processor_config, test_spec::()).await; + let slot_count = 4; + + let all_custody_columns = rig + .chain + .sampling_columns_for_epoch(rig.chain.epoch().unwrap()); + let available_columns: Vec = all_custody_columns.to_vec(); + + let requested_columns = vec![available_columns[0], available_columns[2]]; + for _ in 0..3 { + rig.enqueue_data_columns_by_range_request(slot_count, requested_columns.clone()); + } + + // ensure all batches are processed + rig.assert_event_journal_with_timeout( + &[ + WorkType::DataColumnsByRangeRequest.into(), + WorkType::DataColumnsByRangeRequest.into(), + WorkType::DataColumnsByRangeRequest.into(), + ], + Duration::from_millis(100), + ) + .await; +} + #[tokio::test] async fn test_blobs_by_range() { if test_spec::().deneb_fork_epoch.is_none() { diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 60fe094bb7c..feced7fcc3f 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -6,7 +6,7 @@ #![allow(clippy::unit_arg)] use crate::network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor}; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, SyncServiceMessage}; use crate::status::status_message; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -78,6 +78,7 @@ pub enum RouterMessage { impl Router { /// Initializes and runs the Router. #[allow(clippy::too_many_arguments)] + #[allow(clippy::type_complexity)] pub fn spawn( beacon_chain: Arc>, network_globals: Arc>, @@ -86,13 +87,21 @@ impl Router { invalid_block_storage: InvalidBlockStorage, beacon_processor_send: BeaconProcessorSend, fork_context: Arc, - ) -> Result>, String> { + ) -> Result< + ( + mpsc::UnboundedSender>, + mpsc::UnboundedSender, + ), + String, + > { trace!("Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); // generate the message channel let (sync_send, sync_recv) = mpsc::unbounded_channel::>(); + let (sync_service_send, sync_service_recv) = + mpsc::unbounded_channel::(); let network_beacon_processor = NetworkBeaconProcessor { beacon_processor_send, @@ -113,6 +122,7 @@ impl Router { network_send.clone(), network_beacon_processor.clone(), sync_recv, + sync_service_recv, fork_context, ); @@ -137,7 +147,7 @@ impl Router { "router", ); - Ok(handler_send) + Ok((handler_send, sync_service_send)) } /// Handle all messages incoming from the network service. diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index c97206ea873..16ae9a02ea9 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -36,6 +36,7 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use tracing::{debug, error, info, trace, warn}; +use types::ColumnIndex; use types::{ EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription, @@ -111,10 +112,16 @@ pub enum NetworkMessage { /// Subscribe to new subnets and update ENR metadata. CustodyCountChanged { new_custody_group_count: u64, + new_column_indices: HashSet, sampling_count: u64, }, } +#[derive(Debug)] +pub enum SyncServiceMessage { + CustodyCountChanged { columns: HashSet }, +} + /// Messages triggered by validators that may trigger a subscription to a subnet. /// /// These messages can be very numerous with large validator counts (hundreds of thousands per @@ -183,6 +190,8 @@ pub struct NetworkService { /// The sending channel for the network service to send messages to be routed throughout /// lighthouse. router_send: mpsc::UnboundedSender>, + /// The sending channel for the network service to send messages to sync. + sync_service_send: mpsc::UnboundedSender, /// A reference to lighthouse's database to persist the DHT. store: Arc>, /// A collection of global variables, accessible outside of the network service. @@ -308,7 +317,7 @@ impl NetworkService { // launch derived network services // router task - let router_send = Router::spawn( + let (router_send, sync_service_send) = Router::spawn( beacon_chain.clone(), network_globals.clone(), network_senders.network_send(), @@ -344,6 +353,7 @@ impl NetworkService { network_recv, validator_subscription_recv, router_send, + sync_service_send, store, network_globals: network_globals.clone(), next_digest_update, @@ -731,10 +741,24 @@ impl NetworkService { NetworkMessage::CustodyCountChanged { new_custody_group_count, sampling_count, + new_column_indices, } => { // subscribe to `sampling_count` subnets self.libp2p .subscribe_new_data_column_subnets(sampling_count); + + if let Err(e) = + self.sync_service_send + .send(SyncServiceMessage::CustodyCountChanged { + columns: new_column_indices, + }) + { + warn!( + error = ?e, + "Failed to send custody count change to the sync service" + ); + } + if self .network_globals .config diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index ffc79c1550d..2ea9bd17e0d 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -4,10 +4,14 @@ use beacon_chain::{ use lighthouse_network::{ PeerAction, PeerId, service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + BlobsByRangeRequestId, BlocksByRangeRequestId, CustodySyncByRangeRequestId, + DataColumnsByRangeRequestId, }, }; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tracing::Span; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, @@ -56,6 +60,160 @@ enum RangeBlockDataRequest { }, } +pub struct RangeDataColumnBatchRequest { + requests: HashMap< + CustodySyncByRangeRequestId, + ByRangeRequest>, + >, + /// The column indices corresponding to the request + column_peers: HashMap>, + expected_custody_columns: HashSet, + attempt: usize, +} + +impl RangeDataColumnBatchRequest { + pub fn new(by_range_requests: Vec<(CustodySyncByRangeRequestId, Vec)>) -> Self { + let requests = by_range_requests + .clone() + .into_iter() + .map(|(req, _)| (req, ByRangeRequest::Active(req))) + .collect::>(); + + let column_peers = by_range_requests.clone().into_iter().collect(); + + let expected_custody_columns = by_range_requests + .into_iter() + .flat_map(|(_, column_indices)| column_indices) + .collect(); + + Self { + requests, + column_peers, + expected_custody_columns, + attempt: 0, + } + } + + pub fn add_custody_columns( + &mut self, + req_id: CustodySyncByRangeRequestId, + columns: Vec>>, + ) -> Result<(), String> { + let req = self + .requests + .get_mut(&req_id) + .ok_or(format!("unknown data columns by range req_id {req_id}"))?; + req.finish(req_id, columns) + } + + pub fn responses( + &mut self, + _spec: &ChainSpec, + ) -> Option, CouplingError>> { + let mut data_columns = vec![]; + let mut column_to_peer_id: HashMap = HashMap::new(); + for req in self.requests.values() { + let Some(data) = req.to_finished() else { + return None; + }; + data_columns.extend(data.clone()) + } + + // Note: this assumes that only 1 peer is responsible for a column + // with a batch. + for (id, columns) in self.column_peers.iter() { + for column in columns { + column_to_peer_id.insert(*column, id.peer); + } + } + + // An "attempt" is complete here after we have received a response for all the + // requests we made. i.e. `req.to_finished()` returns Some for all requests. + self.attempt += 1; + + let resp = Self::responses_with_custody_columns( + data_columns, + column_to_peer_id, + &self.expected_custody_columns, + self.attempt, + ); + + if let Err(CouplingError::DataColumnPeerFailure { + error: _, + faulty_peers, + action: _, + exceeded_retries: _, + }) = &resp + { + for (_, peer) in faulty_peers.iter() { + // find the req id associated with the peer and + // delete it from the entries as we are going to make + // a separate attempt for those components. + self.requests.retain(|&k, _| k.peer != *peer); + } + } + + Some(resp) + } + + fn responses_with_custody_columns( + data_columns: DataColumnSidecarList, + column_to_peer: HashMap, + expected_custody_columns: &HashSet, + attempt: usize, + ) -> Result, CouplingError> { + let mut naughty_peers = vec![]; + + let mut slots_to_column_indices = data_columns + .iter() + .map(|col| (col.slot(), expected_custody_columns.clone())) + .collect::>(); + + for data_column in data_columns.iter() { + if slots_to_column_indices + .remove(&data_column.slot()) + .is_none() + { + let Some(naughty_peer) = column_to_peer.get(&data_column.index) else { + return Err(CouplingError::InternalError(format!( + "Internal error, no request made for column {}", + data_column.index + ))); + }; + + naughty_peers.push((data_column.index, naughty_peer)); + } + } + + if !slots_to_column_indices.is_empty() { + let faulty_peers = slots_to_column_indices + .iter() + .flat_map(|(_, column_indices)| { + column_indices.iter().filter_map(|column_index| { + column_to_peer + .get(column_index) + .map(|faulty_peer| (*column_index, *faulty_peer)) + }) + }) + .collect::>(); + + tracing::debug!( + missing_slots_and_columns = ?slots_to_column_indices, + "Missing columns for some slots" + ); + + return Err(CouplingError::DataColumnPeerFailure { + error: "Missing columns for some slots".to_string(), + faulty_peers, + action: PeerAction::LowToleranceError, + exceeded_retries: attempt >= MAX_COLUMN_RETRIES, + }); + } + + Ok(data_columns) + } +} + #[derive(Debug)] pub(crate) enum CouplingError { InternalError(String), diff --git a/beacon_node/network/src/sync/custody_sync/batch.rs b/beacon_node/network/src/sync/custody_sync/batch.rs new file mode 100644 index 00000000000..4e7edd45e21 --- /dev/null +++ b/beacon_node/network/src/sync/custody_sync/batch.rs @@ -0,0 +1,338 @@ +use std::{ + collections::HashSet, + hash::{DefaultHasher, Hash, Hasher}, + ops::Sub, + time::{Duration, Instant}, +}; + +use crate::sync::BatchOperationOutcome; +use crate::sync::range_sync::BatchProcessingResult; +use lighthouse_network::{PeerId, rpc::methods::DataColumnsByRangeRequest, service::api_types::Id}; +use types::{ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, Slot}; + +/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed +/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. +const MAX_BATCH_PROCESSING_ATTEMPTS: usize = 10; + +#[derive(Debug)] +pub struct WrongState(pub(crate) String); + +#[derive(Debug)] +pub struct Attempt { + /// The peer that made the attempt. + pub peer_id: PeerId, + /// The hash of the blocks of the attempt. + pub hash: u64, +} + +impl Attempt { + fn new(peer_id: PeerId, data_columns: &DataColumnSidecarList) -> Self { + let mut hasher = DefaultHasher::new(); + data_columns.hash(&mut hasher); + let hash = hasher.finish(); + Attempt { peer_id, hash } + } +} + +#[derive(Debug)] +/// A segment of a chain. +pub struct CustodyBatchInfo { + /// Start slot of the batch. + start_slot: Slot, + /// End slot of the batch. + end_slot: Slot, + /// Columns to fetch + columns: HashSet, + /// The `Attempts` that have been made and failed to send us this batch. + failed_processing_attempts: Vec, + /// Number of processing attempts that have failed but we do not count. + non_faulty_processing_attempts: u8, + /// The number of download retries this batch has undergone due to a failed request. + failed_download_attempts: Vec>, + /// State of the batch. + state: CustodyBatchState, +} + +impl std::fmt::Display for CustodyBatchInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Start Slot: {}, End Slot: {}, State: {:?}", + self.start_slot, self.end_slot, self.state + ) + } +} + +impl CustodyBatchInfo { + pub fn new(start_epoch: &Epoch, num_of_epochs: u64, columns: HashSet) -> Self { + let start_slot = start_epoch.start_slot(E::slots_per_epoch()); + let end_slot = start_slot + num_of_epochs * E::slots_per_epoch(); + Self { + start_slot, + end_slot, + columns, + failed_processing_attempts: Vec::new(), + failed_download_attempts: Vec::new(), + non_faulty_processing_attempts: 0, + state: CustodyBatchState::AwaitingDownload, + } + } + + pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> { + match self.state.poison() { + CustodyBatchState::AwaitingDownload => { + self.state = CustodyBatchState::Downloading(request_id); + Ok(()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Starting download for batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// Marks the batch as ready to be processed if the blocks are in the range. The number of + /// received blocks is returned, or the wrong batch end on failure + #[must_use = "Batch may have failed"] + pub fn download_completed( + &mut self, + data_columns: DataColumnSidecarList, + peer: PeerId, + ) -> Result { + match self.state.poison() { + CustodyBatchState::Downloading(_) => { + let received = data_columns.len(); + self.state = + CustodyBatchState::AwaitingProcessing(peer, data_columns, Instant::now()); + Ok(received) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Download completed for batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// Mark the batch as failed and return whether we can attempt a re-download. + /// + /// This can happen if a peer disconnects or some error occurred that was not the peers fault. + /// The `peer` parameter, when set to None, does not increment the failed attempts of + /// this batch and register the peer, rather attempts a re-download. + #[must_use = "Batch may have failed"] + pub fn download_failed( + &mut self, + peer: Option, + ) -> Result { + match self.state.poison() { + CustodyBatchState::Downloading(_) => { + // register the attempt and check if the batch can be tried again + self.failed_download_attempts.push(peer); + + self.state = if self.failed_download_attempts.len() >= MAX_BATCH_PROCESSING_ATTEMPTS + { + CustodyBatchState::Failed + } else { + // drop the columns + CustodyBatchState::AwaitingDownload + }; + Ok(self.outcome()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Download failed for batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// Gives a list of peers from which this batch has had a failed download or processing + /// attempt. + pub fn failed_peers(&self) -> HashSet { + let mut peers = HashSet::with_capacity( + self.failed_processing_attempts.len() + self.failed_download_attempts.len(), + ); + + for attempt in &self.failed_processing_attempts { + peers.insert(attempt.peer_id); + } + + for peer in self.failed_download_attempts.iter().flatten() { + peers.insert(*peer); + } + + peers + } + + /// Returns a DataColumnsByRange request associated with the batch. + pub fn to_data_columns_by_range_request(&self) -> DataColumnsByRangeRequest { + DataColumnsByRangeRequest { + start_slot: self.start_slot.into(), + count: self.end_slot.sub(self.start_slot).into(), + columns: self.columns.clone().into_iter().collect(), + } + } + + /// After different operations over a batch, this could be in a state that allows it to + /// continue, or in failed state. When the batch has failed, we check if it did mainly due to + /// processing failures. In this case the batch is considered failed and faulty. + pub fn outcome(&self) -> BatchOperationOutcome { + match self.state { + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + CustodyBatchState::Failed => BatchOperationOutcome::Failed { + blacklist: self.failed_processing_attempts.len() + > self.failed_download_attempts.len(), + }, + _ => BatchOperationOutcome::Continue, + } + } + + pub fn processing_completed( + &mut self, + procesing_result: BatchProcessingResult, + ) -> Result { + match self.state.poison() { + CustodyBatchState::Processing(attempt) => { + self.state = match procesing_result { + BatchProcessingResult::Success => { + CustodyBatchState::AwaitingValidation(attempt) + } + BatchProcessingResult::FaultyFailure => { + // register the failed attempt + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + if self.failed_processing_attempts.len() >= MAX_BATCH_PROCESSING_ATTEMPTS { + CustodyBatchState::Failed + } else { + CustodyBatchState::AwaitingDownload + } + } + BatchProcessingResult::NonFaultyFailure => { + self.non_faulty_processing_attempts = + self.non_faulty_processing_attempts.saturating_add(1); + CustodyBatchState::AwaitingDownload + } + }; + Ok(self.outcome()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Procesing completed for batch in wrong state: {:?}", + self.state + ))) + } + } + } + + /// Returns the peer that is currently responsible for progressing the state of the batch. + pub fn processing_peer(&self) -> Option<&PeerId> { + match &self.state { + CustodyBatchState::AwaitingDownload + | CustodyBatchState::Failed + | CustodyBatchState::Downloading(..) => None, + CustodyBatchState::AwaitingProcessing(peer_id, _, _) + | CustodyBatchState::Processing(Attempt { peer_id, .. }) + | CustodyBatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + } + } + + pub fn state(&self) -> &CustodyBatchState { + &self.state + } + + pub fn attempts(&self) -> &[Attempt] { + &self.failed_processing_attempts + } + + pub fn start_processing(&mut self) -> Result<(DataColumnSidecarList, Duration), WrongState> { + match self.state.poison() { + CustodyBatchState::AwaitingProcessing(peer, data_columns, start_instant) => { + self.state = CustodyBatchState::Processing(Attempt::new::(peer, &data_columns)); + Ok((data_columns, start_instant.elapsed())) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Starting processing batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// Verifies if an incoming column belongs to this batch. + pub fn is_expecting_data_column(&self, request_id: &Id) -> bool { + if let CustodyBatchState::Downloading(expected_id) = &self.state { + return expected_id == request_id; + } + false + } + + #[must_use = "Batch may have failed"] + pub fn validation_failed(&mut self) -> Result { + match self.state.poison() { + CustodyBatchState::AwaitingValidation(attempt) => { + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + self.state = + if self.failed_processing_attempts.len() >= MAX_BATCH_PROCESSING_ATTEMPTS { + CustodyBatchState::Failed + } else { + CustodyBatchState::AwaitingDownload + }; + Ok(self.outcome()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Validation failed for batch in wrong state: {:?}", + self.state + ))) + } + } + } +} + +#[derive(Debug)] +/// Current state of a custody batch +pub enum CustodyBatchState { + /// The batch has failed either downloading or processing, but can be requested again. + AwaitingDownload, + /// The batch is being downloaded. + Downloading(Id), + /// The batch has been completely downloaded and is ready for processing. + AwaitingProcessing(PeerId, DataColumnSidecarList, Instant), + /// The batch is being processed. + Processing(Attempt), + /// The batch was successfully processed and is waiting to be validated. + AwaitingValidation(Attempt), + /// Intermediate state for inner state handling. + Poisoned, + /// The batch has maxed out the allowed attempts for either downloading or processing. It + /// cannot be recovered. + Failed, +} + +impl CustodyBatchState { + /// Helper function for poisoning a state. + pub fn poison(&mut self) -> CustodyBatchState { + std::mem::replace(self, CustodyBatchState::::Poisoned) + } +} diff --git a/beacon_node/network/src/sync/custody_sync/mod.rs b/beacon_node/network/src/sync/custody_sync/mod.rs new file mode 100644 index 00000000000..3017bde2085 --- /dev/null +++ b/beacon_node/network/src/sync/custody_sync/mod.rs @@ -0,0 +1,1155 @@ +use std::{ + collections::{BTreeMap, HashSet, btree_map::Entry}, + sync::Arc, +}; + +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::{ + NetworkGlobals, PeerAction, PeerId, service::api_types::CustodySyncBatchRequestId, + types::CustodyBackFillState, +}; +use logging::crit; +use tracing::{debug, error, info, info_span, warn}; + +use types::{ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, Slot}; + +use crate::sync::{ + backfill_sync::{ProcessResult, SyncStart}, + custody_sync::batch::{CustodyBatchInfo, CustodyBatchState}, + manager::CustodyBatchProcessResult, + network_context::{RpcResponseError, SyncNetworkContext}, + range_sync::{BatchId, BatchOperationOutcome, BatchProcessingResult}, +}; + +/// The maximum number of batches to queue before requesting more. +const BACKFILL_BATCH_BUFFER_SIZE: u8 = 20; + +mod batch; + +/// Columns are downloaded in batches from peers. This constant specifies how many epochs worth of +/// blocks per batch are requested _at most_. A batch may request less blocks to account for +/// already requested slots. There is a timeout for each batch request. If this value is too high, +/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which +/// case the responder will fill the response up to the max request size, assuming they have the +/// bandwidth to do so. +pub const CUSTODY_BACKFILL_EPOCHS_PER_BATCH: u64 = 1; + +/// The ways a custody backfill sync can fail. +// The info in the enum variants is displayed in logging, clippy thinks it's dead code. +#[derive(Debug)] +pub enum CustodyBackfillError { + /// A batch failed to be downloaded. + BatchDownloadFailed(#[allow(dead_code)] BatchId), + // /// A batch could not be processed. + BatchProcessingFailed(#[allow(dead_code)] BatchId), + /// A batch entered an invalid state. + BatchInvalidState(#[allow(dead_code)] BatchId, #[allow(dead_code)] String), + /// The sync algorithm entered an invalid state. + InvalidSyncState(#[allow(dead_code)] String), + // /// The chain became paused. + Paused, +} + +pub struct CustodySync { + /// Keeps track of the current progress of the custody backfill. + /// This only gets refreshed from the beacon chain if we enter a failed state. + current_start: BatchId, + + /// Starting epoch of the batch that needs to be processed next. + /// This is incremented as the chain advances. + processing_target: BatchId, + + /// The columns we're targeting for download. + columns: HashSet, + + /// Starting epoch of the next batch that needs to be downloaded. + to_be_downloaded: BatchId, + + /// Keeps track if we have requested the final batch. + last_batch_downloaded: bool, + + /// Sorted map of batches undergoing some kind of processing. + batches: BTreeMap>, + + /// The current processing batch, if any. + current_processing_batch: Option, + + /// Batches validated by this chain. + validated_batches: u64, + + /// We keep track of peers that are participating in the backfill sync. Unlike RangeSync, + /// BackFillSync uses all synced peers to download the chain from. If BackFillSync fails, we don't + /// want to penalize all our synced peers, so we use this variable to keep track of peers that + /// have participated and only penalize these peers if backfill sync fails. + participating_peers: HashSet, + + /// When a backfill sync fails, we keep track of whether a new fully synced peer has joined. + /// This signifies that we are able to attempt to restart a failed chain. + restart_failed_sync: bool, + + /// Reference to the beacon chain to obtain initial starting points for the backfill sync. + beacon_chain: Arc>, + + /// Reference to the network globals in order to obtain valid peers to backfill blocks from + /// (i.e synced peers). + network_globals: Arc>, +} + +impl CustodySync { + pub fn new( + beacon_chain: Arc>, + network_globals: Arc>, + ) -> Self { + Self { + current_start: Epoch::new(0), + processing_target: Epoch::new(0), + columns: HashSet::new(), + to_be_downloaded: Epoch::new(0), + last_batch_downloaded: false, + batches: BTreeMap::new(), + current_processing_batch: None, + validated_batches: 0, + participating_peers: HashSet::new(), + restart_failed_sync: false, + beacon_chain, + network_globals, + } + } + + /// Pauses the custody sync if it's currently syncing. + pub fn pause(&mut self) { + if let CustodyBackFillState::Syncing = self.state() { + debug!(processed_epochs = %self.validated_batches, to_be_processed = %self.current_start,"Custody backfill sync paused"); + self.set_state(CustodyBackFillState::Paused); + } + } + + pub fn resume_pending_sync( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + match self.state() { + CustodyBackFillState::Pending => { + if self.check_completed() { + self.set_state(CustodyBackFillState::Completed); + return Ok(SyncStart::NotSyncing); + } + if self + .network_globals + .peers + .read() + .synced_peers() + .next() + .is_some() + { + // If there are peers to resume with, begin the resume. + self.set_state(CustodyBackFillState::Syncing); + // Resume any previously failed batches. + self.resume_batches(network)?; + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + + // start processing batches if needed + self.process_completed_batches(network)?; + } else { + return Ok(SyncStart::NotSyncing); + } + } + _ => return Ok(SyncStart::NotSyncing), + } + + let Some(column_da_boundary) = self.get_column_da_boundary() else { + return Ok(SyncStart::NotSyncing); + }; + + Ok(SyncStart::Syncing { + completed: (self.validated_batches + * CUSTODY_BACKFILL_EPOCHS_PER_BATCH + * T::EthSpec::slots_per_epoch()) as usize, + remaining: self + .current_start + .end_slot(T::EthSpec::slots_per_epoch()) + .saturating_sub(column_da_boundary.start_slot(T::EthSpec::slots_per_epoch())) + .as_usize(), + }) + } + + /// Starts syncing. + #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] + pub fn start( + &mut self, + column_indices: HashSet, + network: &mut SyncNetworkContext, + ) -> Result { + match self.state() { + CustodyBackFillState::Syncing => { + if self.check_completed() { + self.set_state(CustodyBackFillState::Completed); + return Ok(SyncStart::NotSyncing); + } + } // already syncing ignore. + CustodyBackFillState::Paused => { + if self.check_completed() { + self.set_state(CustodyBackFillState::Completed); + return Ok(SyncStart::NotSyncing); + } + self.columns = column_indices; + self.set_start_epoch()?; + if self + .network_globals + .peers + .read() + .synced_peers() + .next() + .is_some() + { + // If there are peers to resume with, begin the resume. + self.set_state(CustodyBackFillState::Syncing); + // Resume any previously failed batches. + self.resume_batches(network)?; + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + + // start processing batches if needed + self.process_completed_batches(network)?; + } else { + return Ok(SyncStart::NotSyncing); + } + } + CustodyBackFillState::Failed => { + // Attempt to recover from a failed sync. All local variables should be reset and + // cleared already for a fresh start. + // We only attempt to restart a failed backfill sync if a new synced peer has been + // added. + // if !self.restart_failed_sync { + // return Ok(SyncStart::NotSyncing); + // } + + self.set_state(CustodyBackFillState::Syncing); + + debug!(start_epoch = %self.current_start, "Resuming a failed backfill sync"); + + // begin requesting data columns from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + } + CustodyBackFillState::Completed => { + if !self.check_completed() { + self.last_batch_downloaded = false; + self.validated_batches = 0; + self.current_processing_batch = None; + self.batches = BTreeMap::new(); + + self.columns = column_indices; + self.set_start_epoch()?; + if self + .network_globals + .peers + .read() + .synced_peers() + .next() + .is_some() + { + // If there are peers to resume with, begin the resume. + self.set_state(CustodyBackFillState::Syncing); + // Resume any previously failed batches. + self.resume_batches(network)?; + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + + // start processing batches if needed + self.process_completed_batches(network)?; + } else { + return Ok(SyncStart::NotSyncing); + } + } else { + return Ok(SyncStart::NotSyncing); + } + } + CustodyBackFillState::Pending => return Ok(SyncStart::NotSyncing), + } + + let Some(column_da_boundary) = self.get_column_da_boundary() else { + return Ok(SyncStart::NotSyncing); + }; + + Ok(SyncStart::Syncing { + completed: (self.validated_batches + * CUSTODY_BACKFILL_EPOCHS_PER_BATCH + * T::EthSpec::slots_per_epoch()) as usize, + remaining: self + .current_start + .end_slot(T::EthSpec::slots_per_epoch()) + .saturating_sub(column_da_boundary.start_slot(T::EthSpec::slots_per_epoch())) + .as_usize(), + }) + } + + fn set_start_epoch(&mut self) -> Result<(), CustodyBackfillError> { + let earliest_data_column_slot = self + .beacon_chain + .store + .get_data_column_custody_info() + .unwrap_or(None) + .and_then(|info| info.earliest_data_column_slot) + .unwrap_or(Slot::new(0)); + + self.current_start = earliest_data_column_slot.epoch(T::EthSpec::slots_per_epoch()); + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + Ok(()) + } + + /// Attempts to request the next required batches from the peer pool. It will exhaust the peer + /// pool and left over batches until the batch buffer is reached or all peers are exhausted. + fn request_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result<(), CustodyBackfillError> { + if !matches!(self.state(), CustodyBackFillState::Syncing) { + return Ok(()); + } + + // find the next pending batch and request it from the peer + // Note: for this function to not infinite loop we must: + // - If `include_next_batch` returns Some we MUST increase the count of batches that are + // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of + // that function. + while let Some(batch_id) = self.include_next_batch() { + // send the batch + self.send_batch(network, batch_id)?; + } + + // No more batches, simply stop + Ok(()) + } + + /// When resuming a chain, this function searches for batches that need to be re-downloaded and + /// transitions their state to redownload the batch. + fn resume_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result<(), CustodyBackfillError> { + let batch_ids_to_retry = self + .batches + .iter() + .filter_map(|(batch_id, batch)| { + // In principle there should only ever be on of these, and we could terminate the + // loop early, however the processing is negligible and we continue the search + // for robustness to handle potential future modification + if matches!(batch.state(), CustodyBatchState::AwaitingDownload) { + Some(*batch_id) + } else { + None + } + }) + .collect::>(); + + for batch_id in batch_ids_to_retry { + self.send_batch(network, batch_id)?; + } + Ok(()) + } + + /// Creates the next required batch from the chain. If there are no more batches required, + /// `None` is returned. + fn include_next_batch(&mut self) -> Option { + // Don't request batches before the Fulu fork epoch + if let Some(fulu_fork_epoch) = self.beacon_chain.spec.fulu_fork_epoch + && self.to_be_downloaded < fulu_fork_epoch + { + return None; + } + + // Don't request batches beyond the DA window + if self.last_batch_downloaded { + return None; + } + + // Only request batches up to the buffer size limit + // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync + // if the current processing window is contained in a long range of skip slots. + let in_buffer = |batch: &CustodyBatchInfo| { + matches!( + batch.state(), + CustodyBatchState::Downloading(..) | CustodyBatchState::AwaitingProcessing(..) + ) + }; + if self + .batches + .iter() + .filter(|&(_epoch, batch)| in_buffer(batch)) + .count() + > BACKFILL_BATCH_BUFFER_SIZE as usize + { + return None; + } + + let batch_id = self.to_be_downloaded; + // this batch could have been included already being an optimistic batch + match self.batches.entry(batch_id) { + Entry::Occupied(_) => { + // this batch doesn't need downloading, let this same function decide the next batch + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(CUSTODY_BACKFILL_EPOCHS_PER_BATCH); + self.include_next_batch() + } + Entry::Vacant(entry) => { + entry.insert(CustodyBatchInfo::new( + &batch_id, + CUSTODY_BACKFILL_EPOCHS_PER_BATCH, + self.columns.clone(), + )); + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(CUSTODY_BACKFILL_EPOCHS_PER_BATCH); + Some(batch_id) + } + } + } + + /// Processes the batch with the given id. + /// The batch must exist and be ready for processing + fn process_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result { + if self.state() != CustodyBackFillState::Syncing || self.current_processing_batch.is_some() + { + return Ok(ProcessResult::Successful); + } + + let Some(batch) = self.batches.get_mut(&batch_id) else { + return self + .fail_sync(CustodyBackfillError::InvalidSyncState(format!( + "Trying to process a batch that does not exist: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + }; + + // NOTE: We send empty batches to the processor in order to trigger the processor + // result callback. This is done, because an empty batch could end a bad batch and catching those + // bad batch states is handled in `start_processing`. + let (data_columns, _) = match batch.start_processing() { + Err(e) => { + return self + .fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0)) + .map(|_| ProcessResult::Successful); + } + Ok(v) => v, + }; + + self.current_processing_batch = Some(batch_id); + + if let Err(e) = network + .beacon_processor() + .send_data_columns(batch_id, data_columns) + { + crit!( + msg = "process_batch", + error = %e, + batch = ?self.processing_target, + "Failed to send data columns to processor." + ); + // This is unlikely to happen but it would stall syncing since the batch now has no + // data columns to continue, and the chain is expecting a processing result that won't + // arrive. To mitigate this, (fake) fail this processing so that the batch is + // re-downloaded. + self.on_batch_process_result( + network, + batch_id, + &CustodyBatchProcessResult::NonFaultyFailure { batch_id }, + ) + } else { + Ok(ProcessResult::Successful) + } + } + + /// a data column has been received for a batch. + /// If the column correctly completes the batch it will be processed if possible. + /// If this returns an error, the custody sync has failed and will be restarted once new peers + /// join the system. + /// The sync manager should update the global sync state on failure. + #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] + pub fn on_data_column_response( + &mut self, + network: &mut SyncNetworkContext, + custody_sync_request_id: CustodySyncBatchRequestId, + peer_id: &PeerId, + data_columns: DataColumnSidecarList, + ) -> Result { + // check if we have this batch + let Some(batch) = self.batches.get_mut(&custody_sync_request_id.epoch) else { + if !matches!(self.state(), CustodyBackFillState::Failed) { + // A batch might get removed when custody sync advances, so this is non fatal. + debug!(epoch = %custody_sync_request_id.epoch, "Received a column for unknown batch"); + } + return Ok(ProcessResult::Successful); + }; + + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed for other + // reasons. Check that this column belongs to the expected peer, and that the + // request_id matches + if !batch.is_expecting_data_column(&custody_sync_request_id.id) { + return Ok(ProcessResult::Successful); + } + + match batch.download_completed(data_columns, *peer_id) { + Ok(received) => { + let awaiting_batches = self + .processing_target + .saturating_sub(custody_sync_request_id.epoch) + / CUSTODY_BACKFILL_EPOCHS_PER_BATCH; + debug!( + epoch = %custody_sync_request_id.epoch, + blocks = received, + %awaiting_batches, + "Completed batch received" + ); + + // pre-emptively request more blocks from peers whilst we process current blocks, + self.request_batches(network)?; + self.process_completed_batches(network) + } + Err(e) => { + self.fail_sync(CustodyBackfillError::BatchInvalidState( + custody_sync_request_id.epoch, + e.0, + ))?; + Ok(ProcessResult::Successful) + } + } + } + + /// The beacon processor has completed processing a batch. This function handles the result + /// of the batch processor. + /// If an error is returned the BackFill sync has failed. + #[must_use = "A failure here indicates the custody backfill sync has failed and the global sync state should be updated"] + pub fn on_batch_process_result( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + result: &CustodyBatchProcessResult, + ) -> Result { + // The first two cases are possible in regular sync, should not occur in backfill, but we + // keep this logic for handling potential processing race conditions. + // result + let batch = match &self.current_processing_batch { + Some(processing_id) if *processing_id != batch_id => { + debug!( + batch_epoch = %batch_id.as_u64(), + expected_batch_epoch = processing_id.as_u64(), + "Unexpected batch result" + ); + return Ok(ProcessResult::Successful); + } + None => { + debug!(%batch_id, "Chain was not expecting a batch result"); + return Ok(ProcessResult::Successful); + } + _ => { + // batch_id matches, continue + self.current_processing_batch = None; + + match self.batches.get_mut(&batch_id) { + Some(batch) => batch, + None => { + // This is an error. Fail the sync algorithm. + return self + .fail_sync(CustodyBackfillError::InvalidSyncState(format!( + "Current processing batch not found: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + } + } + } + }; + + let Some(peer) = batch.processing_peer() else { + self.fail_sync(CustodyBackfillError::BatchInvalidState( + batch_id, + String::from("Peer does not exist"), + ))?; + return Ok(ProcessResult::Successful); + }; + + debug!( + ?result, + %batch, + batch_epoch = %batch_id, + %peer, + client = %network.client_type(peer), + "Custody backfill batch processed" + ); + + match result { + CustodyBatchProcessResult::Success { + imported_columns, .. + } => { + if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + } + // If the processed batch was not empty, we can validate previous un-validated + // columns. + if *imported_columns > 0 { + self.advance_custody_sync(network, batch_id); + } + + let Some(fulu_fork_epoch) = self.beacon_chain.spec.fulu_fork_epoch else { + return Err(CustodyBackfillError::InvalidSyncState( + "Fulu epoch isn't schedlued".to_string(), + )); + }; + + if batch_id == self.processing_target { + // Advance processing target if we're above the Fulu fork epoch + if self.processing_target > fulu_fork_epoch { + self.processing_target = self + .processing_target + .saturating_sub(CUSTODY_BACKFILL_EPOCHS_PER_BATCH); + } + } + + // check if custody sync has completed syncing up to the DA window + if self.check_completed() { + info!( + slots_processed = self.validated_batches * T::EthSpec::slots_per_epoch(), + "Custody sync completed" + ); + self.set_state(CustodyBackFillState::Completed); + Ok(ProcessResult::SyncCompleted) + } else { + // custody sync is not completed + // attempt to request more batches + self.request_batches(network)?; + // attempt to process more batches + self.process_completed_batches(network) + } + } + CustodyBatchProcessResult::FaultyFailure { + imported_columns, + penalty, + batch_id, + } => { + match batch.processing_completed(BatchProcessingResult::FaultyFailure) { + Err(e) => { + // Batch was in the wrong state + self.fail_sync(CustodyBackfillError::BatchInvalidState(*batch_id, e.0)) + .map(|_| ProcessResult::Successful) + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + // check that we have not exceeded the re-process retry counter + // If a batch has exceeded the invalid batch lookup attempts limit, it means + // that it is likely all peers are sending invalid batches + // repeatedly and are either malicious or faulty. We stop the backfill sync and + // report all synced peers that have participated. + warn!( + score_adjustment = %penalty, + batch_epoch = %batch_id, + "Backfill batch failed to download. Penalizing peers" + ); + + for peer in self.participating_peers.drain() { + // TODO(das): `participating_peers` only includes block peers. Should we + // penalize the custody column peers too? + network.report_peer(peer, *penalty, "backfill_batch_failed"); + } + self.fail_sync(CustodyBackfillError::BatchProcessingFailed(*batch_id)) + .map(|_| ProcessResult::Successful) + } + + Ok(BatchOperationOutcome::Continue) => { + // custody backfill can continue. Check if it can be progressed + if *imported_columns > 0 { + // TODO(custody-sync) is this actually true? + // At least one column was successfully verified and imported, then we can be sure all + // previous batches are valid and we only need to download the current failed + // batch. + self.advance_custody_sync(network, *batch_id); + } + // Handle this invalid batch, that is within the re-process retries limit. + self.handle_invalid_batch(network, *batch_id) + .map(|_| ProcessResult::Successful) + } + } + } + CustodyBatchProcessResult::NonFaultyFailure { .. } => { + if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure) + { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + } + self.send_batch(network, batch_id)?; + Ok(ProcessResult::Successful) + } + } + } + + /// Processes the next ready batch. + fn process_completed_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + // Only process batches if backfill is syncing and only process one batch at a time + if self.state() != CustodyBackFillState::Syncing || self.current_processing_batch.is_some() + { + return Ok(ProcessResult::Successful); + } + + // Don't try to process batches before the Fulu fork epoch since data columns don't exist + if let Some(fulu_fork_epoch) = self.beacon_chain.spec.fulu_fork_epoch + && self.processing_target < fulu_fork_epoch + { + return Ok(ProcessResult::Successful); + } + + // Find the id of the batch we are going to process. + if let Some(batch) = self.batches.get(&self.processing_target) { + let state = batch.state(); + match state { + CustodyBatchState::AwaitingProcessing(..) => { + return self.process_batch(network, self.processing_target); + } + CustodyBatchState::Downloading(..) => { + // Batch is not ready, nothing to process + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + CustodyBatchState::Failed + | CustodyBatchState::AwaitingDownload + | CustodyBatchState::Processing(_) => { + // these are all inconsistent states: + // - Failed -> non recoverable batch. Chain should have been removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - Processing -> `self.current_processing_batch` is None + self.fail_sync(CustodyBackfillError::InvalidSyncState(String::from( + "Invalid expected batch state", + )))?; + return Ok(ProcessResult::Successful); + } + CustodyBatchState::AwaitingValidation(_) => { + // TODO: I don't think this state is possible, log a CRIT just in case. + // If this is not observed, add it to the failed state branch above. + crit!( + batch = ?self.processing_target, + "Custody Sync encountered a robust batch awaiting validation" + ); + + self.processing_target -= CUSTODY_BACKFILL_EPOCHS_PER_BATCH; + if self.to_be_downloaded >= self.processing_target { + self.to_be_downloaded = + self.processing_target - CUSTODY_BACKFILL_EPOCHS_PER_BATCH; + } + self.request_batches(network)?; + } + } + } else { + self.fail_sync(CustodyBackfillError::InvalidSyncState(format!( + "Batch not found for current processing target {}", + self.processing_target + )))?; + return Ok(ProcessResult::Successful); + } + Ok(ProcessResult::Successful) + } + + /// Removes any batches previous to the given `validating_epoch` + /// + /// The `validating_epoch` must align with batch boundaries. + /// + /// If a previous batch has been validated and it had been re-processed, penalize the original + /// peer. + fn advance_custody_sync( + &mut self, + network: &mut SyncNetworkContext, + validating_epoch: Epoch, + ) { + let Some(column_da_boundary) = self.get_column_da_boundary() else { + return; + }; + // make sure this epoch produces an advancement, unless its at the column DA boundary + if validating_epoch >= self.current_start && validating_epoch > column_da_boundary { + return; + } + + // We can now validate higher batches that the current batch. Here we remove all + // batches that are higher than the current batch. We add on an extra + // `BACKFILL_EPOCHS_PER_BATCH` as `split_off` is inclusive. + let removed_batches = self + .batches + .split_off(&(validating_epoch + CUSTODY_BACKFILL_EPOCHS_PER_BATCH)); + + for (id, batch) in removed_batches.into_iter() { + self.validated_batches = self.validated_batches.saturating_add(1); + // only for batches awaiting validation can we be sure the last attempt is + // right, and thus, that any different attempt is wrong + match batch.state() { + CustodyBatchState::AwaitingValidation(processed_attempt) => { + for attempt in batch.attempts() { + // The validated batch has been re-processed + if attempt.hash != processed_attempt.hash { + // The re-downloaded version was different. + if processed_attempt.peer_id != attempt.peer_id { + // A different peer sent the correct batch, the previous peer did not + // We negatively score the original peer. + let action = PeerAction::LowToleranceError; + debug!( + batch_epoch = ?id, + score_adjustment = %action, + original_peer = %attempt.peer_id, + new_peer = %processed_attempt.peer_id, + "Re-processed batch validated. Scoring original peer" + ); + network.report_peer( + attempt.peer_id, + action, + "custody_backfill_reprocessed_original_peer", + ); + } else { + // The same peer corrected it's previous mistake. There was an error, so we + // negative score the original peer. + let action = PeerAction::MidToleranceError; + debug!( + batch_epoch = ?id, + score_adjustment = %action, + original_peer = %attempt.peer_id, + new_peer = %processed_attempt.peer_id, + "Re-processed batch validated by the same peer" + ); + network.report_peer( + attempt.peer_id, + action, + "custody_backfill_reprocessed_same_peer", + ); + } + } + } + } + CustodyBatchState::Downloading(..) => {} + CustodyBatchState::Failed + | CustodyBatchState::Poisoned + | CustodyBatchState::AwaitingDownload => { + crit!("batch indicates inconsistent data columns while advancing custody sync") + } + CustodyBatchState::AwaitingProcessing(..) => {} + CustodyBatchState::Processing(_) => { + debug!(batch = %id, %batch, "Advancing custody sync while processing a batch"); + if let Some(processing_id) = self.current_processing_batch + && id >= processing_id + { + self.current_processing_batch = None; + } + } + } + } + + self.processing_target = self.processing_target.min(validating_epoch); + self.current_start = self.current_start.min(validating_epoch); + self.to_be_downloaded = self.to_be_downloaded.min(validating_epoch); + + if self.batches.contains_key(&self.to_be_downloaded) { + // if custody sync is advanced by Range beyond the previous `self.to_be_downloaded`, we + // won't have this batch, so we need to request it. + self.to_be_downloaded -= CUSTODY_BACKFILL_EPOCHS_PER_BATCH; + } + debug!(?validating_epoch, processing_target = ?self.processing_target, "Custody backfill advanced"); + } + + /// An invalid batch has been received that could not be processed, but that can be retried. + /// + /// These events occur when a peer has successfully responded with columns, but the columns + /// received are incorrect or invalid. This indicates the peer has not performed as + /// intended and can result in down voting a peer. + fn handle_invalid_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), CustodyBackfillError> { + // The current batch could not be processed, indicating either the current or previous + // batches are invalid. + + // The previous batch could be incomplete due to the block sizes being too large to fit in + // a single RPC request or there could be consecutive empty batches which are not supposed + // to be there + + // The current (sub-optimal) strategy is to simply re-request all batches that could + // potentially be faulty. If a batch returns a different result than the original and + // results in successful processing, we downvote the original peer that sent us the batch. + + // this is our robust `processing_target`. All previous batches must be awaiting + // validation + let mut redownload_queue = Vec::new(); + + for (id, batch) in self.batches.iter_mut().filter(|&(&id, _)| id > batch_id) { + match batch + .validation_failed() + .map_err(|e| CustodyBackfillError::BatchInvalidState(batch_id, e.0))? + { + BatchOperationOutcome::Failed { blacklist: _ } => { + // Batch has failed and cannot be re downloaded. + return self.fail_sync(CustodyBackfillError::BatchProcessingFailed(batch_id)); + } + BatchOperationOutcome::Continue => { + redownload_queue.push(*id); + } + } + } + + // no batch maxed out it process attempts, so now the chain's volatile progress must be + // reset + self.processing_target = self.current_start; + + for id in redownload_queue { + self.send_batch(network, id)?; + } + // finally, re-request the failed batch. + self.send_batch(network, batch_id) + } + + /// Checks with the beacon chain if custody sync has completed. + fn check_completed(&mut self) -> bool { + if self.would_complete(self.current_start) { + // Check that the data column custody info `earliest_available_slot` + // is in an epoch that is less than or equal to the current DA boundary + let Some(earliest_data_column_slot) = self + .beacon_chain + .store + .get_data_column_custody_info() + .unwrap_or(None) + .and_then(|info| info.earliest_data_column_slot) + else { + return false; + }; + + let Some(column_da_boundary) = self.get_column_da_boundary() else { + return false; + }; + + return earliest_data_column_slot.epoch(T::EthSpec::slots_per_epoch()) + <= column_da_boundary; + } + false + } + + /// Calculates the minimum amount of epochs a node should custody data columns. In + /// most cases this is simply the DA boundary, unless we are near the fulu fork epoch. + fn get_column_da_boundary(&self) -> Option { + match self.beacon_chain.data_availability_boundary() { + Some(da_boundary_epoch) => { + // If fulu isnt scheduled, custody backfill sync should not run. + if let Some(fulu_fork_epoch) = self.beacon_chain.spec.fulu_fork_epoch { + // If the da boundary is before fulu, only custody sync up to the fulu fork epoch + if da_boundary_epoch < fulu_fork_epoch { + Some(fulu_fork_epoch) + } else { + Some(da_boundary_epoch) + } + } else { + None + } + } + None => None, // If no DA boundary set, dont try to custody backfill + } + } + + /// Checks if custody backfill would complete by syncing to `start_epoch`. + fn would_complete(&self, start_epoch: Epoch) -> bool { + let Some(column_da_boundary) = self.get_column_da_boundary() else { + return false; + }; + start_epoch <= column_da_boundary + } + + /// Requests the batch assigned to the given id from a given peer. + fn send_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), CustodyBackfillError> { + let span = info_span!("custody_sync_batch_request"); + let _enter = span.enter(); + + if let Some(batch) = self.batches.get_mut(&batch_id) { + let synced_peers = self + .network_globals + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + + let request = batch.to_data_columns_by_range_request(); + let failed_peers = batch.failed_peers(); + + match network.custody_sync_data_columns_batch_request( + request, + batch_id, + &synced_peers, + &failed_peers, + ) { + Ok(request_id) => { + // inform the batch about the new request + if let Err(e) = batch.start_downloading(request_id) { + return self + .fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0)); + } + debug!(epoch = %batch_id, %batch, "Requesting batch"); + + return Ok(()); + } + Err(e) => match e { + crate::sync::network_context::RpcRequestSendError::NoPeer(no_peer) => { + // If we are here we have no more synced peers + info!( + "reason" = format!("insufficient_synced_peers({no_peer:?})"), + "Custody sync paused" + ); + self.set_state(CustodyBackFillState::Paused); + return Err(CustodyBackfillError::Paused); + } + crate::sync::network_context::RpcRequestSendError::InternalError(e) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(%batch_id, error = ?e, %batch,"Could not send batch request"); + // register the failed download and check if the batch can be retried + if let Err(e) = batch.start_downloading(1) { + return self + .fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0)); + } + + match batch.download_failed(None) { + Err(e) => self.fail_sync(CustodyBackfillError::BatchInvalidState( + batch_id, e.0, + ))?, + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(CustodyBackfillError::BatchDownloadFailed(batch_id))? + } + Ok(BatchOperationOutcome::Continue) => { + return self.send_batch(network, batch_id); + } + } + } + }, + } + } + + Ok(()) + } + + /// The syncing process has failed. + /// + /// This resets past variables, to allow for a fresh start when resuming. + fn fail_sync(&mut self, error: CustodyBackfillError) -> Result<(), CustodyBackfillError> { + // Some errors shouldn't cause failure. + if matches!(error, CustodyBackfillError::Paused) { + return Ok(()); + } + + // Set the state + self.set_state(CustodyBackFillState::Failed); + // Remove all batches and active requests and participating peers. + self.batches.clear(); + self.participating_peers.clear(); + self.restart_failed_sync = false; + + // Reset all downloading and processing targets + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + self.last_batch_downloaded = false; + self.current_processing_batch = None; + + // NOTE: Lets keep validated_batches for posterity + + // Emit the log here + error!(?error, "Backfill sync failed"); + + // Return the error, kinda weird pattern, but I want to use + // `self.fail_chain(_)?` in other parts of the code. + Err(error) + } + + /// A peer has disconnected. + /// If the peer has active batches, those are considered failed and re-requested. + #[must_use = "A failure here indicates custody sync has failed and the global sync state should be updated"] + pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), CustodyBackfillError> { + if matches!(self.state(), CustodyBackFillState::Failed) { + return Ok(()); + } + + // Remove the peer from the participation list + self.participating_peers.remove(peer_id); + Ok(()) + } + + pub fn state(&self) -> CustodyBackFillState { + self.network_globals.custody_sync_state.read().clone() + } + + /// Updates the global network state indicating the current state of a backfill sync. + fn set_state(&self, state: CustodyBackFillState) { + *self.network_globals.custody_sync_state.write() = state; + } + + /// A fully synced peer has joined us. + /// If we are in a failed state, update a local variable to indicate we are able to restart + /// the failed sync on the next attempt. + pub fn fully_synced_peer_joined(&mut self) { + if matches!(self.state(), CustodyBackFillState::Failed) { + self.restart_failed_sync = true; + } + } + + pub fn set_status_to_pending( + &mut self, + columns: HashSet, + ) -> Result<(), CustodyBackfillError> { + self.columns = columns; + self.set_start_epoch()?; + self.set_state(CustodyBackFillState::Pending); + + Ok(()) + } + + /// An RPC error has occurred. + /// + /// If the batch exists it is re-requested. + #[must_use = "A failure here indicates the custody sync has failed and the global sync state should be updated"] + pub fn inject_error( + &mut self, + network: &mut SyncNetworkContext, + request: CustodySyncBatchRequestId, + peer_id: &PeerId, + err: RpcResponseError, + ) -> Result<(), CustodyBackfillError> { + if let Some(batch) = self.batches.get_mut(&request.epoch) { + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed from the chain for other + // reasons. Check that this data column belongs to the expected peer + if !batch.is_expecting_data_column(&request.id) { + return Ok(()); + } + debug!(batch_epoch = %request.epoch, error = ?err, "Batch download failed"); + match batch.download_failed(Some(*peer_id)) { + Err(e) => { + self.fail_sync(CustodyBackfillError::BatchInvalidState(request.epoch, e.0)) + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(CustodyBackfillError::BatchDownloadFailed(request.epoch)) + } + Ok(BatchOperationOutcome::Continue) => self.send_batch(network, request.epoch), + } + } else { + // this could be an error for an old batch, removed when the chain advances + Ok(()) + } + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 448e784ab6d..c361432adb3 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -41,12 +41,13 @@ use super::network_context::{ use super::peer_sync_info::{PeerSyncType, remote_sync_type}; use super::range_sync::{EPOCHS_PER_BATCH, RangeSync, RangeSyncType}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, SyncServiceMessage}; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; -use crate::sync::network_context::PeerGroup; +use crate::sync::custody_sync::CustodySync; +use crate::sync::network_context::{PeerGroup, RpcResponseResult}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{ @@ -57,8 +58,8 @@ use lighthouse_network::SyncInfo; use lighthouse_network::rpc::RPCError; use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyRequester, - DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, - SingleLookupReqId, SyncRequestId, + CustodySyncByRangeRequestId, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, + DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; @@ -70,7 +71,7 @@ use std::time::Duration; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, Epoch, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -158,6 +159,9 @@ pub enum SyncMessage { result: BatchProcessResult, }, + /// A custody batch has been processed by the processor thread. + CustodyBatchProcessed { result: CustodyBatchProcessResult }, + /// Block processed BlockComponentProcessed { process_type: BlockProcessType, @@ -209,6 +213,37 @@ pub enum BatchProcessResult { NonFaultyFailure, } +/// The result of processing multiple blocks (a chain segment). +#[derive(Debug)] +pub enum CustodyBatchProcessResult { + /// The custody batch was completed successfully. It carries whether the sent batch contained blocks. + Success { + batch_id: Epoch, + #[allow(dead_code)] + sent_columns: usize, + imported_columns: usize, + }, + /// The custody batch processing failed. It carries whether the processing imported any block. + FaultyFailure { + batch_id: Epoch, + imported_columns: usize, + penalty: PeerAction, + }, + NonFaultyFailure { + batch_id: Epoch, + }, +} + +impl CustodyBatchProcessResult { + fn batch_id(&self) -> Epoch { + match self { + CustodyBatchProcessResult::Success { batch_id, .. } => *batch_id, + CustodyBatchProcessResult::FaultyFailure { batch_id, .. } => *batch_id, + CustodyBatchProcessResult::NonFaultyFailure { batch_id } => *batch_id, + } + } +} + /// The primary object for handling and driving all the current syncing logic. It maintains the /// current state of the syncing process, the number of useful peers, downloaded blocks and /// controls the logic behind both the long-range (batch) sync and the on-going potential parent @@ -220,6 +255,9 @@ pub struct SyncManager { /// A receiving channel sent by the message processor thread. input_channel: mpsc::UnboundedReceiver>, + /// A receiving channel sent by the network service. + sync_service_recv: mpsc::UnboundedReceiver, + /// A network context to contact the network service. network: SyncNetworkContext, @@ -229,6 +267,9 @@ pub struct SyncManager { /// Backfill syncing. backfill_sync: BackFillSync, + /// Custody syncing. + custody_sync: CustodySync, + block_lookups: BlockLookups, /// debounce duplicated `UnknownBlockHashFromAttestation` for the same root peer tuple. A peer /// may forward us thousands of a attestations, each one triggering an individual event. Only @@ -245,6 +286,7 @@ pub fn spawn( network_send: mpsc::UnboundedSender>, beacon_processor: Arc>, sync_recv: mpsc::UnboundedReceiver>, + sync_service_recv: mpsc::UnboundedReceiver, fork_context: Arc, ) { assert!( @@ -261,6 +303,7 @@ pub fn spawn( network_send, beacon_processor, sync_recv, + sync_service_recv, fork_context, ); @@ -275,12 +318,14 @@ impl SyncManager { network_send: mpsc::UnboundedSender>, beacon_processor: Arc>, sync_recv: mpsc::UnboundedReceiver>, + sync_service_recv: mpsc::UnboundedReceiver, fork_context: Arc, ) -> Self { let network_globals = beacon_processor.network_globals.clone(); Self { chain: beacon_chain.clone(), input_channel: sync_recv, + sync_service_recv, network: SyncNetworkContext::new( network_send, beacon_processor.clone(), @@ -288,7 +333,8 @@ impl SyncManager { fork_context.clone(), ), range_sync: RangeSync::new(beacon_chain.clone()), - backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals), + backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals.clone()), + custody_sync: CustodySync::new(beacon_chain.clone(), network_globals), block_lookups: BlockLookups::new(), notified_unknown_roots: LRUTimeCache::new(Duration::from_secs( NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS, @@ -477,6 +523,12 @@ impl SyncManager { SyncRequestId::DataColumnsByRange(req_id) => { self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::CustodySyncDataColumnsByRange(req_id) => self + .on_custody_sync_data_columns_by_range_response( + req_id, + peer_id, + RpcEvent::RPCError(error), + ), } } @@ -495,6 +547,7 @@ impl SyncManager { // Remove peer from all data structures self.range_sync.peer_disconnect(&mut self.network, peer_id); let _ = self.backfill_sync.peer_disconnected(peer_id); + let _ = self.custody_sync.peer_disconnected(peer_id); self.block_lookups.peer_disconnected(peer_id); // Regardless of the outcome, we update the sync status. @@ -549,6 +602,7 @@ impl SyncManager { // inform the backfill sync that a new synced peer has joined us. if new_state.is_synced() { self.backfill_sync.fully_synced_peer_joined(); + self.custody_sync.fully_synced_peer_joined(); } } is_connected @@ -558,17 +612,18 @@ impl SyncManager { } } - /// Updates the global sync state, optionally instigating or pausing a backfill sync as well as + /// Updates the global sync state, optionally instigating or pausing a backfill or custody sync as well as /// logging any changes. /// /// The logic for which sync should be running is as follows: - /// - If there is a range-sync running (or required) pause any backfill and let range-sync + /// - If there is a range-sync running (or required) pause any backfill/custody sync and let range-sync /// complete. /// - If there is no current range sync, check for any requirement to backfill and either /// start/resume a backfill sync if required. The global state will be BackFillSync if a /// backfill sync is running. /// - If there is no range sync and no required backfill and we have synced up to the currently /// known peers, we consider ourselves synced. + /// - If there is no range sync and no required backfill we check if we need to execute a custody sync. fn update_sync_state(&mut self) { let new_state: SyncState = match self.range_sync.state() { Err(e) => { @@ -624,15 +679,38 @@ impl SyncManager { error!(error = ?e, "Backfill sync failed to start"); } } + + // If backfill is complete, check if we have a pending custody backfill to complete + let anchor_info = self.chain.store.get_anchor_info(); + if anchor_info.block_backfill_complete(self.chain.genesis_backfill_slot) { + match self.custody_sync.resume_pending_sync(&mut self.network) { + Ok(SyncStart::Syncing { + completed, + remaining, + }) => { + sync_state = SyncState::CustodyBackFillSyncing { + completed, + remaining, + }; + } + Ok(SyncStart::NotSyncing) => {} // Ignore updating the state if custody sync state didn't start. + Err(e) => { + error!(error = ?e, "Custody backfill sync failed to resume"); + } + } + } } + // TODO(custody-sync) this comment seems wrong // Return the sync state if backfilling is not required. sync_state } Some((RangeSyncType::Finalized, start_slot, target_slot)) => { - // If there is a backfill sync in progress pause it. + // If there is a backfill or custody sync in progress pause it. + // custody sync #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); + self.custody_sync.pause(); SyncState::SyncingFinalized { start_slot, @@ -640,9 +718,11 @@ impl SyncManager { } } Some((RangeSyncType::Head, start_slot, target_slot)) => { - // If there is a backfill sync in progress pause it. + // If there is a backfill or custody sync in progress pause it. + // custody sync #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); + self.custody_sync.pause(); SyncState::SyncingHead { start_slot, @@ -699,6 +779,9 @@ impl SyncManager { Some(sync_message) = self.input_channel.recv() => { self.handle_message(sync_message); }, + Some(sync_service_message) = self.sync_service_recv.recv() => { + self.handle_sync_service_message(sync_service_message); + } Some(engine_state) = check_ee_stream.next(), if check_ee => { self.handle_new_execution_engine_state(engine_state); } @@ -865,6 +948,78 @@ impl SyncManager { } } }, + SyncMessage::CustodyBatchProcessed { result } => { + match self.custody_sync.on_batch_process_result( + &mut self.network, + result.batch_id(), + &result, + ) { + Ok(ProcessResult::Successful) => {} + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Err(error) => { + error!(error = ?error, "Custody sync failed"); + // Update the global status + self.update_sync_state(); + } + } + } + } + } + + pub(crate) fn handle_sync_service_message(&mut self, sync_service_message: SyncServiceMessage) { + let sync_state = { + let head = self.chain.best_slot(); + let current_slot = self.chain.slot().unwrap_or_else(|_| Slot::new(0)); + + let peers = self.network_globals().peers.read(); + if current_slot >= head + && current_slot.sub(head) <= (SLOT_IMPORT_TOLERANCE as u64) + && head > 0 + { + SyncState::Synced + } else if peers.advanced_peers().next().is_some() { + SyncState::SyncTransition + } else if peers.synced_peers().next().is_none() { + SyncState::Stalled + } else { + // There are no peers that require syncing and we have at least one synced + // peer + SyncState::Synced + } + }; + + match sync_service_message { + SyncServiceMessage::CustodyCountChanged { columns } => { + match sync_state { + SyncState::Synced => { + let anchor_info = self.chain.store.get_anchor_info(); + if !anchor_info.block_backfill_complete(self.chain.genesis_backfill_slot) { + if let Err(e) = self.custody_sync.set_status_to_pending(columns) { + tracing::warn!(error = ?e, "Failed to set custody backfill state to pending"); + } + return; + } + + match self.custody_sync.start(columns, &mut self.network) { + Ok(SyncStart::Syncing { + completed, + remaining, + }) => { + info!(?completed, ?remaining, "Starting Custody Sync"); + } + Ok(SyncStart::NotSyncing) => {} // Ignore updating the state if the custody backfill sync state didn't start. + Err(e) => { + error!(error = ?e, "Custody backfill sync failed to start"); + } + } + } + _ => { + if let Err(e) = self.custody_sync.set_status_to_pending(columns) { + tracing::warn!(error = ?e, "Failed to set custody backfill state to pending"); + } + } + } + } } } @@ -1086,6 +1241,12 @@ impl SyncManager { peer_id, RpcEvent::from_chunk(data_column, seen_timestamp), ), + SyncRequestId::CustodySyncDataColumnsByRange(id) => self + .on_custody_sync_data_columns_by_range_response( + id, + peer_id, + RpcEvent::from_chunk(data_column, seen_timestamp), + ), _ => { crit!(%peer_id, "bad request id for data_column"); } @@ -1181,6 +1342,20 @@ impl SyncManager { } } + fn on_custody_sync_data_columns_by_range_response( + &mut self, + id: CustodySyncByRangeRequestId, + peer_id: PeerId, + data_columns: RpcEvent>>, + ) { + if let Some(resp) = + self.network + .on_custody_sync_data_columns_by_range_response(id, peer_id, data_columns) + { + self.on_custody_sync_data_columns_batch_response(id, peer_id, resp) + } + } + fn on_custody_by_root_result( &mut self, requester: CustodyRequester, @@ -1267,6 +1442,49 @@ impl SyncManager { } } } + + /// Handles receiving a response for a custody range sync request that has columns. + fn on_custody_sync_data_columns_batch_response( + &mut self, + custody_sync_request_id: CustodySyncByRangeRequestId, + peer_id: PeerId, + data_columns: RpcResponseResult>>>, + ) { + if let Some(resp) = self + .network + .custody_data_columns_batch_response(custody_sync_request_id, data_columns) + { + match resp { + Ok(data_columns) => { + match self.custody_sync.on_data_column_response( + &mut self.network, + custody_sync_request_id.parent_request_id, + &peer_id, + data_columns, + ) { + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Ok(ProcessResult::Successful) => {} + Err(_e) => { + // The custody sync has failed, errors are reported + // within. + self.update_sync_state(); + } + } + } + Err(e) => { + match self.custody_sync.inject_error( + &mut self.network, + custody_sync_request_id.parent_request_id, + &peer_id, + e, + ) { + Ok(_) => {} + Err(_) => self.update_sync_state(), + } + } + } + } + } } impl From> for BlockProcessingResult { diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 4dab2e17d3f..e04fe62acf4 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -4,6 +4,7 @@ mod backfill_sync; mod block_lookups; mod block_sidecar_coupling; +mod custody_sync; pub mod manager; mod network_context; mod peer_sync_info; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 07462a01fe0..5585439f0c0 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -14,7 +14,7 @@ use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; -use crate::sync::block_sidecar_coupling::CouplingError; +use crate::sync::block_sidecar_coupling::{CouplingError, RangeDataColumnBatchRequest}; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; @@ -25,8 +25,9 @@ use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, Req pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + CustodyId, CustodyRequester, CustodySyncBatchRequestId, CustodySyncByRangeRequestId, + DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, + SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use lighthouse_tracing::SPAN_OUTGOING_RANGE_REQUEST; @@ -49,8 +50,8 @@ use tokio::sync::mpsc; use tracing::{Span, debug, debug_span, error, warn}; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext, - Hash256, SignedBeaconBlock, Slot, + BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, + ForkContext, Hash256, SignedBeaconBlock, Slot, }; pub mod custody; @@ -212,6 +213,9 @@ pub struct SyncNetworkContext { data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active DataColumnsByRange requests for custody sync + custody_sync_data_columns_by_range_requests: + ActiveRequests>, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -219,6 +223,10 @@ pub struct SyncNetworkContext { components_by_range_requests: FnvHashMap>, + /// A batch of data columns by range request for custody sync + custody_sync_data_column_batch_requests: + FnvHashMap>, + /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. execution_engine_state: EngineState, @@ -293,8 +301,12 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + custody_sync_data_columns_by_range_requests: ActiveRequests::new( + "custody_sync_data_columns_by_range_request", + ), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), + custody_sync_data_column_batch_requests: FnvHashMap::default(), network_beacon_processor, chain, fork_context, @@ -320,10 +332,12 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + custody_sync_data_columns_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, + custody_sync_data_column_batch_requests: _, execution_engine_state: _, network_beacon_processor: _, chain: _, @@ -354,6 +368,10 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let custody_sync_data_column_by_range_ids = custody_sync_data_columns_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::CustodySyncDataColumnsByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) @@ -361,6 +379,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(custody_sync_data_column_by_range_ids) .collect() } @@ -419,8 +438,10 @@ impl SyncNetworkContext { data_columns_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, + custody_sync_data_columns_by_range_requests, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, + custody_sync_data_column_batch_requests: _, execution_engine_state: _, network_beacon_processor: _, chain: _, @@ -438,6 +459,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(custody_sync_data_columns_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -1662,6 +1684,168 @@ impl SyncNetworkContext { }) } + /// data column by range requests sent by the custody sync algorithm + pub fn custody_sync_data_columns_batch_request( + &mut self, + request: DataColumnsByRangeRequest, + epoch: Epoch, + peers: &HashSet, + peers_to_deprioritize: &HashSet, + ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + // Attempt to find all required custody peers before sending any request or creating an ID + let columns_by_range_peers_to_request = { + let column_indexes = self + .chain + .sampling_columns_for_epoch(epoch) + .iter() + .cloned() + .collect(); + + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + }; + + // Create the overall `custody_by_range` request id + let id = CustodySyncBatchRequestId { + id: self.next_id(), + epoch, + }; + + let result = columns_by_range_peers_to_request + .map(|columns_by_range_peers_to_request| { + columns_by_range_peers_to_request + .keys() + .map(|peer_id| { + self.send_custody_sync_data_columns_by_range_request( + *peer_id, + request.clone(), + id, + ) + }) + .collect::, _>>() + }) + .transpose()?; + + let range_data_column_batch_request = if let Some(result) = result { + RangeDataColumnBatchRequest::new(result) + } else { + return Err(RpcRequestSendError::InternalError( + "Unable to send a custody sync data column by range rpc request".to_string(), + )); + }; + + self.custody_sync_data_column_batch_requests + .insert(id, range_data_column_batch_request); + + Ok(id.id) + } + + /// Received a data columns by range response from a custody sync request which batches them. + pub fn custody_data_columns_batch_response( + &mut self, + custody_sync_request_id: CustodySyncByRangeRequestId, + data_columns: RpcResponseResult>, + ) -> Option, RpcResponseError>> { + let Entry::Occupied(mut entry) = self + .custody_sync_data_column_batch_requests + .entry(custody_sync_request_id.parent_request_id) + else { + metrics::inc_counter_vec( + &metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, + &["range_data_columns"], + ); + return None; + }; + + if let Err(e) = { + let request = entry.get_mut(); + data_columns.and_then(|(data_columns, _)| { + request + .add_custody_columns(custody_sync_request_id, data_columns.clone()) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError( + e, + )) + }) + }) + } { + entry.remove(); + return Some(Err(e)); + } + + if let Some(data_column_result) = entry.get_mut().responses(&self.chain.spec) { + if data_column_result.is_ok() { + // remove the entry only if it coupled successfully with + // no errors + entry.remove(); + } + // If the request is finished, dequeue everything + Some(data_column_result.map_err(RpcResponseError::BlockComponentCouplingError)) + } else { + None + } + } + + fn send_custody_sync_data_columns_by_range_request( + &mut self, + peer_id: PeerId, + request: DataColumnsByRangeRequest, + parent_request_id: CustodySyncBatchRequestId, + ) -> Result<(CustodySyncByRangeRequestId, Vec), RpcRequestSendError> { + let requested_columns = request.columns.clone(); + let id = CustodySyncByRangeRequestId { + id: self.next_id(), + parent_request_id, + peer: peer_id, + }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::DataColumnsByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::CustodySyncDataColumnsByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "CustodySync::DataColumnsByRange", + slots = request.count, + epoch = %Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + columns = ?request.columns, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.custody_sync_data_columns_by_range_requests.insert( + id, + peer_id, + // false = do not enforce max_requests are returned for *_by_range methods. We don't + // know if there are missed blocks. + false, + DataColumnsByRangeRequestItems::new(request), + Span::none(), + ); + Ok((id, requested_columns)) + } + + #[allow(clippy::type_complexity)] + pub(crate) fn on_custody_sync_data_columns_by_range_response( + &mut self, + id: CustodySyncByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>> { + let resp = self + .custody_sync_data_columns_by_range_requests + .on_response(id, rpc_event); + self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len()) + } + pub(crate) fn register_metrics(&self) { for (id, count) in [ ("blocks_by_root", self.blocks_by_root_requests.len()), diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index b5bc10851dc..a477d1c9069 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -124,6 +124,7 @@ impl TestRig { beacon_processor.into(), // Pass empty recv not tied to any tx mpsc::unbounded_channel().1, + mpsc::unbounded_channel().1, fork_context, ), harness, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 7b390b39f33..a7a625e7742 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -933,6 +933,19 @@ impl, Cold: ItemStore> HotColdDB )); } + pub fn data_column_as_kv_store_ops( + &self, + block_root: &Hash256, + data_column: Arc>, + ops: &mut Vec, + ) { + ops.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconDataColumn, + get_data_column_key(block_root, &data_column.index), + data_column.as_ssz_bytes(), + )); + } + pub fn put_data_column_custody_info( &self, earliest_data_column_slot: Option, diff --git a/common/eth2/src/lighthouse/sync_state.rs b/common/eth2/src/lighthouse/sync_state.rs index 0327f7073fa..070868a9b52 100644 --- a/common/eth2/src/lighthouse/sync_state.rs +++ b/common/eth2/src/lighthouse/sync_state.rs @@ -15,6 +15,10 @@ pub enum SyncState { /// specified by its peers. Once completed, the node enters this sync state and attempts to /// download all required historical blocks. BackFillSyncing { completed: usize, remaining: usize }, + /// The node is undertaking a custody backfill sync. This occurs for a node that has completed forward and + /// backfill sync and has undergone a custody count change. During custody backfill sync the node attempts + /// to backfill its new column custody requirements up to the data availability window. + CustodyBackFillSyncing { completed: usize, remaining: usize }, /// The node has completed syncing a finalized chain and is in the process of re-evaluating /// which sync state to progress to. SyncTransition, @@ -39,6 +43,22 @@ pub enum BackFillState { Failed, } +#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +/// The state of the custody backfill sync. +pub enum CustodyBackFillState { + /// The sync is partially completed and currently paused. + Paused, + /// We are currently backfilling custody columns. + Syncing, + /// A custody backfill sync has completed. + Completed, + /// Too many failed attempts at backfilling. Consider it failed. + Failed, + /// A custody sync should is set to Pending if the node is undergoing range/backfill syncing. + /// It should resume syncing after the node is fully synced. + Pending, +} + impl PartialEq for SyncState { fn eq(&self, other: &Self) -> bool { matches!( @@ -65,8 +85,8 @@ impl SyncState { SyncState::SyncingFinalized { .. } => true, SyncState::SyncingHead { .. } => true, SyncState::SyncTransition => true, - // Backfill doesn't effect any logic, we consider this state, not syncing. - SyncState::BackFillSyncing { .. } => false, + // Both backfill and custody backfill don't effect any logic, we consider this state, not syncing. + SyncState::BackFillSyncing { .. } | SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -77,7 +97,7 @@ impl SyncState { SyncState::SyncingFinalized { .. } => true, SyncState::SyncingHead { .. } => false, SyncState::SyncTransition => false, - SyncState::BackFillSyncing { .. } => false, + SyncState::BackFillSyncing { .. } | SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -108,6 +128,9 @@ impl std::fmt::Display for SyncState { SyncState::Stalled => write!(f, "Stalled"), SyncState::SyncTransition => write!(f, "Evaluating known peers"), SyncState::BackFillSyncing { .. } => write!(f, "Syncing Historical Blocks"), + SyncState::CustodyBackFillSyncing { .. } => { + write!(f, "Syncing Historical Data Columns") + } } } } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 169551e35bc..26147de96d4 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1496,6 +1496,12 @@ pub struct ManualFinalizationRequestData { pub block_root: Hash256, } +#[derive(Debug, Serialize, Deserialize)] +pub struct IncreaseCustodyCount { + #[serde(with = "serde_utils::quoted_u64")] + pub count: u64, +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct AdminPeer { pub enr: String,