From fa801b8413acf2a1617cbc36e1fe68adc5de38a6 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Thu, 12 Jun 2025 10:51:24 +0200 Subject: [PATCH 1/3] first version embracing double imports --- beacon_node/beacon_chain/src/beacon_chain.rs | 11 ++++++++--- .../src/data_availability_checker.rs | 16 +++++++--------- .../overflow_lru_cache.rs | 19 ++++++++++++++++++- .../beacon_chain/src/early_attester_cache.rs | 2 +- .../beacon_chain/src/historical_blocks.rs | 2 +- 5 files changed, 35 insertions(+), 15 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ef741f7b5ba..9f9656a46eb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3809,6 +3809,8 @@ impl BeaconChain { ); } + let remove_from_da = !matches!(block.data(), AvailableBlockData::PartialDataColumns(_)); + // TODO(das) record custody column available timestamp // import @@ -3840,8 +3842,10 @@ impl BeaconChain { // // If `import_block` errors (only errors with internal errors), the pending components will // be pruned on data_availability_checker maintenance as finality advances. - self.data_availability_checker - .remove_pending_components(block_root); + if remove_from_da { + self.data_availability_checker + .remove_pending_components(block_root); + } Ok(AvailabilityProcessingStatus::Imported(block_root)) } @@ -7208,7 +7212,8 @@ impl BeaconChain { ); Ok(Some(StoreOp::PutBlobs(block_root, blobs))) } - AvailableBlockData::DataColumns(data_columns) => { + AvailableBlockData::DataColumns(data_columns) + | AvailableBlockData::PartialDataColumns(data_columns) => { debug!( %block_root, count = data_columns.len(), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 1bc95c22ac7..c10e47546bd 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -699,8 +699,11 @@ pub enum AvailableBlockData { NoData, /// Block is post-Deneb, pre-PeerDAS and has more than zero blobs Blobs(BlobSidecarList), - /// Block is post-PeerDAS and has more than zero blobs + /// Block is post-PeerDAS and has more than zero blobs. We have all columns that we custody. DataColumns(DataColumnSidecarList), + /// Block is post-PeerDAS and has more than zero blobs. We do not have all columns yet, but are + /// able to reconstruct the rest. + PartialDataColumns(DataColumnSidecarList), } /// A fully available block that is ready to be imported into fork choice. @@ -745,14 +748,6 @@ impl AvailableBlock { &self.blob_data } - pub fn has_blobs(&self) -> bool { - match self.blob_data { - AvailableBlockData::NoData => false, - AvailableBlockData::Blobs(..) => true, - AvailableBlockData::DataColumns(_) => false, - } - } - #[allow(clippy::type_complexity)] pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { let AvailableBlock { @@ -775,6 +770,9 @@ impl AvailableBlock { AvailableBlockData::DataColumns(data_columns) => { AvailableBlockData::DataColumns(data_columns.clone()) } + AvailableBlockData::PartialDataColumns(data_columns) => { + AvailableBlockData::PartialDataColumns(data_columns.clone()) + } }, blobs_available_timestamp: self.blobs_available_timestamp, spec: self.spec.clone(), diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 36c4f2cdc1e..41c52d448bd 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -31,6 +31,7 @@ pub struct PendingComponents { pub verified_data_columns: Vec>, pub executed_block: Option>, pub reconstruction_started: bool, + pub eagerly_imported: bool, } impl PendingComponents { @@ -196,7 +197,21 @@ impl PendingComponents { } Ordering::Less => { // Not enough data columns received yet - None + // If we have more than half of all columns, we can reconstruct all -> make + // available eagerly if we haven't done so yet + if !self.eagerly_imported + && num_received_columns >= spec.number_of_columns as usize / 2 + { + self.eagerly_imported = true; + let data_columns = self + .verified_data_columns + .iter() + .map(|d| d.clone().into_inner()) + .collect::>(); + Some(AvailableBlockData::PartialDataColumns(data_columns)) + } else { + None + } } } } else { @@ -249,6 +264,7 @@ impl PendingComponents { .max(), // TODO(das): To be fixed with https://github.com/sigp/lighthouse/pull/6850 AvailableBlockData::DataColumns(_) => None, + AvailableBlockData::PartialDataColumns(_) => None, }; let AvailabilityPendingExecutedBlock { @@ -279,6 +295,7 @@ impl PendingComponents { verified_data_columns: vec![], executed_block: None, reconstruction_started: false, + eagerly_imported: false, } } diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 5665ef3775c..b5625ba1fa6 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -73,7 +73,7 @@ impl EarlyAttesterCache { let (blobs, data_columns) = match block.data() { AvailableBlockData::NoData => (None, None), AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None), - AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())), + AvailableBlockData::DataColumns(data_columns) | AvailableBlockData::PartialDataColumns(data_columns) => (None, Some(data_columns.clone())), }; let item = CacheItem { diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 348e6d52a64..ee91e221eed 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -132,7 +132,7 @@ impl BeaconChain { AvailableBlockData::Blobs(..) => { new_oldest_blob_slot = Some(block.slot()); } - AvailableBlockData::DataColumns(_) => { + AvailableBlockData::DataColumns(_) | AvailableBlockData::PartialDataColumns(_) => { new_oldest_data_column_slot = Some(block.slot()); } } From 8c1693e0bd15e07be42d1f5cb959aeb61713784e Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 13 Jun 2025 15:00:29 +0200 Subject: [PATCH 2/3] stall early data column requests --- beacon_node/beacon_chain/src/beacon_chain.rs | 20 ++++++++++++++++- .../src/data_availability_checker.rs | 14 ++++++++---- .../overflow_lru_cache.rs | 22 +++++++++++++++---- beacon_node/beacon_processor/src/lib.rs | 10 ++++----- .../src/network_beacon_processor/mod.rs | 7 +++--- .../network_beacon_processor/rpc_methods.rs | 8 +++---- 6 files changed, 59 insertions(+), 22 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9f9656a46eb..1e43e81f405 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -121,6 +121,8 @@ use std::io::prelude::*; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; +use tokio::{pin, select}; +use tokio::time::sleep; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; use store::{ BlobSidecarListFromRoot, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary, @@ -1127,11 +1129,27 @@ impl BeaconChain { .map_or_else(|| self.get_blobs(block_root), Ok) } - pub fn get_data_columns_checking_all_caches( + pub async fn get_data_columns_checking_all_caches( &self, block_root: Hash256, indices: &[ColumnIndex], ) -> Result, Error> { + // If this is in the DA checker, wait until all columns requested are available, up to a + // certain timeout + if let Some(mut cols) = self.data_availability_checker.get_data_column_watcher(block_root) { + let timeout = sleep(Duration::from_secs(3)); + pin!(timeout); + // Wait until we have one of: + // 1. time out + // 2. all needed columns in DA (wait_for returns Ok) + // 3. block no longer in DA (wait_for returns Err) + select! { + _ = &mut timeout => (), + _ = cols.wait_for(|cols| indices.iter().all(|index| cols.contains(index))) => (), + } + } + + let all_cached_columns_opt = self .data_availability_checker .get_data_columns(block_root) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index c10e47546bd..5a6ef51f745 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlobList}; use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, @@ -13,13 +14,11 @@ use std::fmt::Debug; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; +use tokio::sync::watch; use task_executor::TaskExecutor; use tracing::{debug, error, info_span, Instrument}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; -use types::{ - BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, - RuntimeVariableList, SignedBeaconBlock, -}; +use types::{BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock}; mod error; mod overflow_lru_cache; @@ -199,6 +198,13 @@ impl DataAvailabilityChecker { self.availability_cache.peek_data_columns(block_root) } + pub fn get_data_column_watcher( + &self, + block_root: Hash256, + ) -> Option>> { + self.availability_cache.get_data_column_watcher(&block_root) + } + /// Put a list of blobs received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. pub fn put_rpc_blobs( diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 41c52d448bd..da0a33a4682 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -12,8 +12,10 @@ use crate::CustodyContext; use lru::LruCache; use parking_lot::RwLock; use std::cmp::Ordering; +use std::collections::HashSet; use std::num::NonZeroUsize; use std::sync::Arc; +use tokio::sync::watch; use tracing::debug; use types::blob_sidecar::BlobIdentifier; use types::{ @@ -29,6 +31,7 @@ pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: RuntimeFixedVector>>, pub verified_data_columns: Vec>, + pub verified_column_indices: watch::Sender>, pub executed_block: Option>, pub reconstruction_started: bool, pub eagerly_imported: bool, @@ -135,11 +138,16 @@ impl PendingComponents { &mut self, kzg_verified_data_columns: I, ) -> Result<(), AvailabilityCheckError> { - for data_column in kzg_verified_data_columns { - if self.get_cached_data_column(data_column.index()).is_none() { - self.verified_data_columns.push(data_column); + self.verified_column_indices.send_if_modified(|set_column_indices| { + let mut modified = false; + for data_column in kzg_verified_data_columns { + if set_column_indices.insert(data_column.index()) { + self.verified_data_columns.push(data_column); + modified = true; + } } - } + modified + }); Ok(()) } @@ -293,6 +301,7 @@ impl PendingComponents { block_root, verified_blobs: RuntimeFixedVector::new(vec![None; max_len]), verified_data_columns: vec![], + verified_column_indices: watch::channel(HashSet::new()).0, executed_block: None, reconstruction_started: false, eagerly_imported: false, @@ -451,6 +460,10 @@ impl DataAvailabilityCheckerInner { f(self.critical.read().peek(block_root)) } + pub fn get_data_column_watcher(&self, block_root: &Hash256) -> Option>> { + self.critical.read().peek(block_root).map(|components| components.verified_column_indices.subscribe()) + } + /// Puts the KZG verified blobs into the availability cache as pending components. pub fn put_kzg_verified_blobs>>( &self, @@ -609,6 +622,7 @@ impl DataAvailabilityCheckerInner { pub fn handle_reconstruction_failure(&self, block_root: &Hash256) { if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) { pending_components_mut.verified_data_columns = vec![]; + let _ = pending_components_mut.verified_column_indices.send(HashSet::new()); pending_components_mut.reconstruction_started = false; } } diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index e864cb1fd91..a449809dbc1 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -629,7 +629,7 @@ pub enum Work { BlocksByRootsRequest(AsyncFn), BlobsByRangeRequest(BlockingFn), BlobsByRootsRequest(BlockingFn), - DataColumnsByRootsRequest(BlockingFn), + DataColumnsByRootsRequest(AsyncFn), DataColumnsByRangeRequest(BlockingFn), GossipBlsToExecutionChange(BlockingFn), LightClientBootstrapRequest(BlockingFn), @@ -1611,13 +1611,13 @@ impl BeaconProcessor { }), Work::BlobsByRangeRequest(process_fn) | Work::BlobsByRootsRequest(process_fn) - | Work::DataColumnsByRootsRequest(process_fn) | Work::DataColumnsByRangeRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } - Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { - task_spawner.spawn_async(work) - } + Work::BlocksByRangeRequest(work) + | Work::BlocksByRootsRequest(work) + | Work::DataColumnsByRootsRequest(work) => task_spawner.spawn_async(work), + Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn { BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 0b89058ba9d..7a248c16cb6 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -724,13 +724,12 @@ impl NetworkBeaconProcessor { request: DataColumnsByRootRequest, ) -> Result<(), Error> { let processor = self.clone(); - let process_fn = move || { - processor.handle_data_columns_by_root_request(peer_id, inbound_request_id, request) - }; + let process_fn = processor + .handle_data_columns_by_root_request(peer_id, inbound_request_id, request); self.try_send(BeaconWorkEvent { drop_during_sync: false, - work: Work::DataColumnsByRootsRequest(Box::new(process_fn)), + work: Work::DataColumnsByRootsRequest(Box::pin(process_fn)), }) } diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index eca80af998d..c1788787535 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -339,7 +339,7 @@ impl NetworkBeaconProcessor { } /// Handle a `DataColumnsByRoot` request from the peer. - pub fn handle_data_columns_by_root_request( + pub async fn handle_data_columns_by_root_request( self: Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, @@ -348,13 +348,13 @@ impl NetworkBeaconProcessor { self.terminate_response_stream( peer_id, inbound_request_id, - self.handle_data_columns_by_root_request_inner(peer_id, inbound_request_id, request), + self.handle_data_columns_by_root_request_inner(peer_id, inbound_request_id, request).await, Response::DataColumnsByRoot, ); } /// Handle a `DataColumnsByRoot` request from the peer. - pub fn handle_data_columns_by_root_request_inner( + pub async fn handle_data_columns_by_root_request_inner( &self, peer_id: PeerId, inbound_request_id: InboundRequestId, @@ -366,7 +366,7 @@ impl NetworkBeaconProcessor { match self.chain.get_data_columns_checking_all_caches( data_column_ids_by_root.block_root, data_column_ids_by_root.columns.as_slice(), - ) { + ).await { Ok(data_columns) => { send_data_column_count += data_columns.len(); for data_column in data_columns { From 654188a54dc21901e066942085eec7127d9a24d2 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 13 Jun 2025 15:02:46 +0200 Subject: [PATCH 3/3] fmt --- beacon_node/beacon_chain/src/beacon_chain.rs | 10 +++--- .../src/data_availability_checker.rs | 9 +++-- .../overflow_lru_cache.rs | 33 ++++++++++++------- .../beacon_chain/src/early_attester_cache.rs | 5 ++- .../src/network_beacon_processor/mod.rs | 4 +-- .../network_beacon_processor/rpc_methods.rs | 15 ++++++--- 6 files changed, 49 insertions(+), 27 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1e43e81f405..1ca05434e33 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -121,14 +121,14 @@ use std::io::prelude::*; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use tokio::{pin, select}; -use tokio::time::sleep; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; use store::{ BlobSidecarListFromRoot, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; use task_executor::{ShutdownReason, TaskExecutor}; +use tokio::time::sleep; +use tokio::{pin, select}; use tokio_stream::Stream; use tracing::{debug, error, info, trace, warn}; use tree_hash::TreeHash; @@ -1136,7 +1136,10 @@ impl BeaconChain { ) -> Result, Error> { // If this is in the DA checker, wait until all columns requested are available, up to a // certain timeout - if let Some(mut cols) = self.data_availability_checker.get_data_column_watcher(block_root) { + if let Some(mut cols) = self + .data_availability_checker + .get_data_column_watcher(block_root) + { let timeout = sleep(Duration::from_secs(3)); pin!(timeout); // Wait until we have one of: @@ -1148,7 +1151,6 @@ impl BeaconChain { _ = cols.wait_for(|cols| indices.iter().all(|index| cols.contains(index))) => (), } } - let all_cached_columns_opt = self .data_availability_checker diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 5a6ef51f745..cd4cf4c4a3e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlobList}; use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, @@ -9,16 +8,20 @@ use crate::data_availability_checker::overflow_lru_cache::{ use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext}; use kzg::Kzg; use slot_clock::SlotClock; +use std::collections::HashSet; use std::fmt; use std::fmt::Debug; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; -use tokio::sync::watch; use task_executor::TaskExecutor; +use tokio::sync::watch; use tracing::{debug, error, info_span, Instrument}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; -use types::{BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock}; +use types::{ + BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, + EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, +}; mod error; mod overflow_lru_cache; diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index da0a33a4682..c28b657cbf8 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -138,16 +138,17 @@ impl PendingComponents { &mut self, kzg_verified_data_columns: I, ) -> Result<(), AvailabilityCheckError> { - self.verified_column_indices.send_if_modified(|set_column_indices| { - let mut modified = false; - for data_column in kzg_verified_data_columns { - if set_column_indices.insert(data_column.index()) { - self.verified_data_columns.push(data_column); - modified = true; + self.verified_column_indices + .send_if_modified(|set_column_indices| { + let mut modified = false; + for data_column in kzg_verified_data_columns { + if set_column_indices.insert(data_column.index()) { + self.verified_data_columns.push(data_column); + modified = true; + } } - } - modified - }); + modified + }); Ok(()) } @@ -460,8 +461,14 @@ impl DataAvailabilityCheckerInner { f(self.critical.read().peek(block_root)) } - pub fn get_data_column_watcher(&self, block_root: &Hash256) -> Option>> { - self.critical.read().peek(block_root).map(|components| components.verified_column_indices.subscribe()) + pub fn get_data_column_watcher( + &self, + block_root: &Hash256, + ) -> Option>> { + self.critical + .read() + .peek(block_root) + .map(|components| components.verified_column_indices.subscribe()) } /// Puts the KZG verified blobs into the availability cache as pending components. @@ -622,7 +629,9 @@ impl DataAvailabilityCheckerInner { pub fn handle_reconstruction_failure(&self, block_root: &Hash256) { if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) { pending_components_mut.verified_data_columns = vec![]; - let _ = pending_components_mut.verified_column_indices.send(HashSet::new()); + let _ = pending_components_mut + .verified_column_indices + .send(HashSet::new()); pending_components_mut.reconstruction_started = false; } } diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index b5625ba1fa6..dd1e7edeaee 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -73,7 +73,10 @@ impl EarlyAttesterCache { let (blobs, data_columns) = match block.data() { AvailableBlockData::NoData => (None, None), AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None), - AvailableBlockData::DataColumns(data_columns) | AvailableBlockData::PartialDataColumns(data_columns) => (None, Some(data_columns.clone())), + AvailableBlockData::DataColumns(data_columns) + | AvailableBlockData::PartialDataColumns(data_columns) => { + (None, Some(data_columns.clone())) + } }; let item = CacheItem { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7a248c16cb6..3b2eb9fa455 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -724,8 +724,8 @@ impl NetworkBeaconProcessor { request: DataColumnsByRootRequest, ) -> Result<(), Error> { let processor = self.clone(); - let process_fn = processor - .handle_data_columns_by_root_request(peer_id, inbound_request_id, request); + let process_fn = + processor.handle_data_columns_by_root_request(peer_id, inbound_request_id, request); self.try_send(BeaconWorkEvent { drop_during_sync: false, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index c1788787535..3acbbf04a2f 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -348,7 +348,8 @@ impl NetworkBeaconProcessor { self.terminate_response_stream( peer_id, inbound_request_id, - self.handle_data_columns_by_root_request_inner(peer_id, inbound_request_id, request).await, + self.handle_data_columns_by_root_request_inner(peer_id, inbound_request_id, request) + .await, Response::DataColumnsByRoot, ); } @@ -363,10 +364,14 @@ impl NetworkBeaconProcessor { let mut send_data_column_count = 0; for data_column_ids_by_root in request.data_column_ids.as_slice() { - match self.chain.get_data_columns_checking_all_caches( - data_column_ids_by_root.block_root, - data_column_ids_by_root.columns.as_slice(), - ).await { + match self + .chain + .get_data_columns_checking_all_caches( + data_column_ids_by_root.block_root, + data_column_ids_by_root.columns.as_slice(), + ) + .await + { Ok(data_columns) => { send_data_column_count += data_columns.len(); for data_column in data_columns {