Skip to content

Make block available when we have 50% of columns on supernodes #7598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ use store::{
KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::time::sleep;
use tokio::{pin, select};
use tokio_stream::Stream;
use tracing::{debug, error, info, trace, warn};
use tree_hash::TreeHash;
Expand Down Expand Up @@ -1127,11 +1129,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

pub fn get_data_columns_checking_all_caches(
pub async fn get_data_columns_checking_all_caches(
&self,
block_root: Hash256,
indices: &[ColumnIndex],
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
// If this is in the DA checker, wait until all columns requested are available, up to a
// certain timeout
if let Some(mut cols) = self
.data_availability_checker
.get_data_column_watcher(block_root)
{
let timeout = sleep(Duration::from_secs(3));
pin!(timeout);
// Wait until we have one of:
// 1. time out
// 2. all needed columns in DA (wait_for returns Ok)
// 3. block no longer in DA (wait_for returns Err)
select! {
_ = &mut timeout => (),
_ = cols.wait_for(|cols| indices.iter().all(|index| cols.contains(index))) => (),
}
}

let all_cached_columns_opt = self
.data_availability_checker
.get_data_columns(block_root)
Expand Down Expand Up @@ -3809,6 +3829,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}

let remove_from_da = !matches!(block.data(), AvailableBlockData::PartialDataColumns(_));

// TODO(das) record custody column available timestamp

// import
Expand Down Expand Up @@ -3840,8 +3862,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
//
// If `import_block` errors (only errors with internal errors), the pending components will
// be pruned on data_availability_checker maintenance as finality advances.
self.data_availability_checker
.remove_pending_components(block_root);
if remove_from_da {
self.data_availability_checker
.remove_pending_components(block_root);
}

Ok(AvailabilityProcessingStatus::Imported(block_root))
}
Expand Down Expand Up @@ -7208,7 +7232,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
}
AvailableBlockData::DataColumns(data_columns) => {
AvailableBlockData::DataColumns(data_columns)
| AvailableBlockData::PartialDataColumns(data_columns) => {
debug!(
%block_root,
count = data_columns.len(),
Expand Down
29 changes: 18 additions & 11 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ use crate::data_availability_checker::overflow_lru_cache::{
use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext};
use kzg::Kzg;
use slot_clock::SlotClock;
use std::collections::HashSet;
use std::fmt;
use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::watch;
use tracing::{debug, error, info_span, Instrument};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
RuntimeVariableList, SignedBeaconBlock,
BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch,
EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock,
};

mod error;
Expand Down Expand Up @@ -199,6 +201,13 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_data_columns(block_root)
}

pub fn get_data_column_watcher(
&self,
block_root: Hash256,
) -> Option<watch::Receiver<HashSet<ColumnIndex>>> {
self.availability_cache.get_data_column_watcher(&block_root)
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
Expand Down Expand Up @@ -699,8 +708,11 @@ pub enum AvailableBlockData<E: EthSpec> {
NoData,
/// Block is post-Deneb, pre-PeerDAS and has more than zero blobs
Blobs(BlobSidecarList<E>),
/// Block is post-PeerDAS and has more than zero blobs
/// Block is post-PeerDAS and has more than zero blobs. We have all columns that we custody.
DataColumns(DataColumnSidecarList<E>),
/// Block is post-PeerDAS and has more than zero blobs. We do not have all columns yet, but are
/// able to reconstruct the rest.
PartialDataColumns(DataColumnSidecarList<E>),
}

/// A fully available block that is ready to be imported into fork choice.
Expand Down Expand Up @@ -745,14 +757,6 @@ impl<E: EthSpec> AvailableBlock<E> {
&self.blob_data
}

pub fn has_blobs(&self) -> bool {
match self.blob_data {
AvailableBlockData::NoData => false,
AvailableBlockData::Blobs(..) => true,
AvailableBlockData::DataColumns(_) => false,
}
}

#[allow(clippy::type_complexity)]
pub fn deconstruct(self) -> (Hash256, Arc<SignedBeaconBlock<E>>, AvailableBlockData<E>) {
let AvailableBlock {
Expand All @@ -775,6 +779,9 @@ impl<E: EthSpec> AvailableBlock<E> {
AvailableBlockData::DataColumns(data_columns) => {
AvailableBlockData::DataColumns(data_columns.clone())
}
AvailableBlockData::PartialDataColumns(data_columns) => {
AvailableBlockData::PartialDataColumns(data_columns.clone())
}
},
blobs_available_timestamp: self.blobs_available_timestamp,
spec: self.spec.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use crate::CustodyContext;
use lru::LruCache;
use parking_lot::RwLock;
use std::cmp::Ordering;
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::sync::watch;
use tracing::debug;
use types::blob_sidecar::BlobIdentifier;
use types::{
Expand All @@ -29,8 +31,10 @@ pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
pub verified_blobs: RuntimeFixedVector<Option<KzgVerifiedBlob<E>>>,
pub verified_data_columns: Vec<KzgVerifiedCustodyDataColumn<E>>,
pub verified_column_indices: watch::Sender<HashSet<ColumnIndex>>,
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
pub reconstruction_started: bool,
pub eagerly_imported: bool,
}

impl<E: EthSpec> PendingComponents<E> {
Expand Down Expand Up @@ -134,11 +138,17 @@ impl<E: EthSpec> PendingComponents<E> {
&mut self,
kzg_verified_data_columns: I,
) -> Result<(), AvailabilityCheckError> {
for data_column in kzg_verified_data_columns {
if self.get_cached_data_column(data_column.index()).is_none() {
self.verified_data_columns.push(data_column);
}
}
self.verified_column_indices
.send_if_modified(|set_column_indices| {
let mut modified = false;
for data_column in kzg_verified_data_columns {
if set_column_indices.insert(data_column.index()) {
self.verified_data_columns.push(data_column);
modified = true;
}
}
modified
});
Ok(())
}

Expand Down Expand Up @@ -196,7 +206,21 @@ impl<E: EthSpec> PendingComponents<E> {
}
Ordering::Less => {
// Not enough data columns received yet
None
// If we have more than half of all columns, we can reconstruct all -> make
// available eagerly if we haven't done so yet
if !self.eagerly_imported
&& num_received_columns >= spec.number_of_columns as usize / 2
{
self.eagerly_imported = true;
let data_columns = self
.verified_data_columns
.iter()
.map(|d| d.clone().into_inner())
.collect::<Vec<_>>();
Some(AvailableBlockData::PartialDataColumns(data_columns))
} else {
None
}
}
}
} else {
Expand Down Expand Up @@ -249,6 +273,7 @@ impl<E: EthSpec> PendingComponents<E> {
.max(),
// TODO(das): To be fixed with https://github.com/sigp/lighthouse/pull/6850
AvailableBlockData::DataColumns(_) => None,
AvailableBlockData::PartialDataColumns(_) => None,
};

let AvailabilityPendingExecutedBlock {
Expand Down Expand Up @@ -277,8 +302,10 @@ impl<E: EthSpec> PendingComponents<E> {
block_root,
verified_blobs: RuntimeFixedVector::new(vec![None; max_len]),
verified_data_columns: vec![],
verified_column_indices: watch::channel(HashSet::new()).0,
executed_block: None,
reconstruction_started: false,
eagerly_imported: false,
}
}

Expand Down Expand Up @@ -434,6 +461,16 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
f(self.critical.read().peek(block_root))
}

pub fn get_data_column_watcher(
&self,
block_root: &Hash256,
) -> Option<watch::Receiver<HashSet<ColumnIndex>>> {
self.critical
.read()
.peek(block_root)
.map(|components| components.verified_column_indices.subscribe())
}

/// Puts the KZG verified blobs into the availability cache as pending components.
pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
&self,
Expand Down Expand Up @@ -592,6 +629,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn handle_reconstruction_failure(&self, block_root: &Hash256) {
if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) {
pending_components_mut.verified_data_columns = vec![];
let _ = pending_components_mut
.verified_column_indices
.send(HashSet::new());
pending_components_mut.reconstruction_started = false;
}
}
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/beacon_chain/src/early_attester_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
let (blobs, data_columns) = match block.data() {
AvailableBlockData::NoData => (None, None),
AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None),
AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())),
AvailableBlockData::DataColumns(data_columns)
| AvailableBlockData::PartialDataColumns(data_columns) => {
(None, Some(data_columns.clone()))
}
};

let item = CacheItem {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
AvailableBlockData::Blobs(..) => {
new_oldest_blob_slot = Some(block.slot());
}
AvailableBlockData::DataColumns(_) => {
AvailableBlockData::DataColumns(_) | AvailableBlockData::PartialDataColumns(_) => {
new_oldest_data_column_slot = Some(block.slot());
}
}
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ pub enum Work<E: EthSpec> {
BlocksByRootsRequest(AsyncFn),
BlobsByRangeRequest(BlockingFn),
BlobsByRootsRequest(BlockingFn),
DataColumnsByRootsRequest(BlockingFn),
DataColumnsByRootsRequest(AsyncFn),
DataColumnsByRangeRequest(BlockingFn),
GossipBlsToExecutionChange(BlockingFn),
LightClientBootstrapRequest(BlockingFn),
Expand Down Expand Up @@ -1611,13 +1611,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
}),
Work::BlobsByRangeRequest(process_fn)
| Work::BlobsByRootsRequest(process_fn)
| Work::DataColumnsByRootsRequest(process_fn)
| Work::DataColumnsByRangeRequest(process_fn) => {
task_spawner.spawn_blocking(process_fn)
}
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_async(work)
}
Work::BlocksByRangeRequest(work)
| Work::BlocksByRootsRequest(work)
| Work::DataColumnsByRootsRequest(work) => task_spawner.spawn_async(work),

Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
Expand Down
7 changes: 3 additions & 4 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,13 +724,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request: DataColumnsByRootRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move || {
processor.handle_data_columns_by_root_request(peer_id, inbound_request_id, request)
};
let process_fn =
processor.handle_data_columns_by_root_request(peer_id, inbound_request_id, request);

self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::DataColumnsByRootsRequest(Box::new(process_fn)),
work: Work::DataColumnsByRootsRequest(Box::pin(process_fn)),
})
}

Expand Down
19 changes: 12 additions & 7 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}

/// Handle a `DataColumnsByRoot` request from the peer.
pub fn handle_data_columns_by_root_request(
pub async fn handle_data_columns_by_root_request(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
Expand All @@ -348,13 +348,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.terminate_response_stream(
peer_id,
inbound_request_id,
self.handle_data_columns_by_root_request_inner(peer_id, inbound_request_id, request),
self.handle_data_columns_by_root_request_inner(peer_id, inbound_request_id, request)
.await,
Response::DataColumnsByRoot,
);
}

/// Handle a `DataColumnsByRoot` request from the peer.
pub fn handle_data_columns_by_root_request_inner(
pub async fn handle_data_columns_by_root_request_inner(
&self,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
Expand All @@ -363,10 +364,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let mut send_data_column_count = 0;

for data_column_ids_by_root in request.data_column_ids.as_slice() {
match self.chain.get_data_columns_checking_all_caches(
data_column_ids_by_root.block_root,
data_column_ids_by_root.columns.as_slice(),
) {
match self
.chain
.get_data_columns_checking_all_caches(
data_column_ids_by_root.block_root,
data_column_ids_by_root.columns.as_slice(),
)
.await
{
Ok(data_columns) => {
send_data_column_count += data_columns.len();
for data_column in data_columns {
Expand Down