From d7ab88cf858ec90696b1dbba199af34823d20e61 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 8 Jul 2025 12:20:57 +1000 Subject: [PATCH 1/4] Remove KZG verification from local block production and blobs fetched from local EL. --- beacon_node/beacon_chain/src/beacon_chain.rs | 32 ++--------------- .../src/data_column_verification.rs | 6 ++++ .../fetch_blobs/fetch_blobs_beacon_adapter.rs | 14 ++------ .../beacon_chain/src/fetch_blobs/mod.rs | 34 ++++++++----------- .../beacon_chain/src/fetch_blobs/tests.rs | 14 +++----- beacon_node/beacon_chain/src/metrics.rs | 9 ----- 6 files changed, 29 insertions(+), 80 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9900535b2c7..d30109728f5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -70,7 +70,7 @@ use crate::validator_monitor::{ }; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ - kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore, + metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead, }; use eth2::types::{ @@ -5748,8 +5748,6 @@ impl BeaconChain { let (mut block, _) = block.deconstruct(); *block.state_root_mut() = state_root; - let blobs_verification_timer = - metrics::start_timer(&metrics::BLOCK_PRODUCTION_BLOBS_VERIFICATION_TIMES); let blob_items = match maybe_blobs_and_proofs { Some((blobs, proofs)) => { let expected_kzg_commitments = @@ -5768,37 +5766,11 @@ impl BeaconChain { ))); } - let kzg_proofs = Vec::from(proofs); - - let kzg = self.kzg.as_ref(); - if self - .spec - .is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())) - { - kzg_utils::validate_blobs_and_cell_proofs::( - kzg, - blobs.iter().collect(), - &kzg_proofs, - expected_kzg_commitments, - ) - .map_err(BlockProductionError::KzgError)?; - } else { - kzg_utils::validate_blobs::( - kzg, - expected_kzg_commitments, - blobs.iter().collect(), - &kzg_proofs, - ) - .map_err(BlockProductionError::KzgError)?; - } - - Some((kzg_proofs.into(), blobs)) + Some((proofs, blobs)) } None => None, }; - drop(blobs_verification_timer); - metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES); trace!( diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 3009522bf60..e079b5ab78c 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -266,6 +266,12 @@ impl KzgVerifiedDataColumn { verify_kzg_for_data_column(data_column, kzg) } + /// Mark a data column as KZG verified. Caller must ONLY use this on columns constructed + /// from EL blobs. + pub fn from_execution_verified(data_column: Arc>) -> Self { + Self { data: data_column } + } + /// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. pub(crate) fn __new_for_testing(data_column: Arc>) -> Self { Self { data: data_column } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index 4a7a5aeea21..fe8af5b70ea 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -1,17 +1,16 @@ use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; -use crate::data_column_verification::KzgVerifiedDataColumn; use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError}; use crate::observed_block_producers::ProposalKey; use crate::observed_data_sidecars::DoNotObserve; use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes}; use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; -use kzg::{Error as KzgError, Kzg}; +use kzg::Kzg; #[cfg(test)] use mockall::automock; use std::collections::HashSet; use std::sync::Arc; use task_executor::TaskExecutor; -use types::{BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, Hash256, Slot}; +use types::{BlobSidecar, ChainSpec, ColumnIndex, Hash256, Slot}; /// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic. pub(crate) struct FetchBlobsBeaconAdapter { @@ -77,14 +76,7 @@ impl FetchBlobsBeaconAdapter { GossipVerifiedBlob::::new(blob.clone(), blob.index, &self.chain) } - pub(crate) fn verify_data_columns_kzg( - &self, - data_columns: Vec>>, - ) -> Result>, KzgError> { - KzgVerifiedDataColumn::from_batch(data_columns, &self.chain.kzg) - } - - pub(crate) fn known_for_proposal( + pub(crate) fn data_column_known_for_proposal( &self, proposal_key: ProposalKey, ) -> Option> { diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index e02405ddbad..21a27ace119 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -14,7 +14,7 @@ mod tests; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::block_verification_types::AsBlock; -use crate::data_column_verification::KzgVerifiedCustodyDataColumn; +use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn}; #[cfg_attr(test, double)] use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; @@ -358,17 +358,24 @@ async fn compute_custody_columns_to_import( // `DataAvailabilityChecker` requires a strict match on custody columns count to // consider a block available. let mut custody_columns = data_columns_result - .map(|mut data_columns| { - data_columns.retain(|col| custody_columns_indices.contains(&col.index)); + .map(|data_columns| { data_columns + .into_iter() + .filter(|col| custody_columns_indices.contains(&col.index)) + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::from_execution_verified(col), + ) + }) + .collect::>() }) .map_err(FetchEngineBlobError::DataColumnSidecarError)?; // Only consider columns that are not already observed on gossip. - if let Some(observed_columns) = chain_adapter_cloned.known_for_proposal( + if let Some(observed_columns) = chain_adapter_cloned.data_column_known_for_proposal( ProposalKey::new(block.message().proposer_index(), block.slot()), ) { - custody_columns.retain(|col| !observed_columns.contains(&col.index)); + custody_columns.retain(|col| !observed_columns.contains(&col.index())); if custody_columns.is_empty() { return Ok(vec![]); } @@ -378,26 +385,13 @@ async fn compute_custody_columns_to_import( if let Some(known_columns) = chain_adapter_cloned.cached_data_column_indexes(&block_root) { - custody_columns.retain(|col| !known_columns.contains(&col.index)); + custody_columns.retain(|col| !known_columns.contains(&col.index())); if custody_columns.is_empty() { return Ok(vec![]); } } - // KZG verify data columns before publishing. This prevents blobs with invalid - // KZG proofs from the EL making it into the data availability checker. We do not - // immediately add these blobs to the observed blobs/columns cache because we want - // to allow blobs/columns to arrive on gossip and be accepted (and propagated) while - // we are waiting to publish. Just before publishing we will observe the blobs/columns - // and only proceed with publishing if they are not yet seen. - let verified = chain_adapter_cloned - .verify_data_columns_kzg(custody_columns) - .map_err(FetchEngineBlobError::KzgError)?; - - Ok(verified - .into_iter() - .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) - .collect()) + Ok(custody_columns) }, "compute_custody_columns_to_import", ) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 3178020c758..9cb597c6df9 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -1,4 +1,3 @@ -use crate::data_column_verification::KzgVerifiedDataColumn; use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter; use crate::fetch_blobs::{ fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError, @@ -156,7 +155,7 @@ mod get_blobs_v2 { mock_fork_choice_contains_block(&mut mock_adapter, vec![]); // All data columns already seen on gossip mock_adapter - .expect_known_for_proposal() + .expect_data_column_known_for_proposal() .returning(|_| Some(hashset![0, 1, 2])); // No blobs should be processed mock_adapter.expect_process_engine_blobs().times(0); @@ -192,17 +191,12 @@ mod get_blobs_v2 { // All blobs returned, fork choice doesn't contain block mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); mock_fork_choice_contains_block(&mut mock_adapter, vec![]); - mock_adapter.expect_known_for_proposal().returning(|_| None); mock_adapter - .expect_cached_data_column_indexes() + .expect_data_column_known_for_proposal() .returning(|_| None); mock_adapter - .expect_verify_data_columns_kzg() - .returning(|c| { - Ok(c.into_iter() - .map(KzgVerifiedDataColumn::__new_for_testing) - .collect()) - }); + .expect_cached_data_column_indexes() + .returning(|_| None); mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 5ca764821f2..7696d386fe7 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1824,15 +1824,6 @@ pub static KZG_VERIFICATION_DATA_COLUMN_BATCH_TIMES: LazyLock> ) }); -pub static BLOCK_PRODUCTION_BLOBS_VERIFICATION_TIMES: LazyLock> = LazyLock::new( - || { - try_create_histogram( - "beacon_block_production_blobs_verification_seconds", - "Time taken to verify blobs against commitments and creating BlobSidecar objects in block production" - ) - }, -); - /* * Data Availability cache metrics */ From 47c892fcd1528bfd2871d6a5babd38ffddae2eb0 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 22 Jul 2025 17:30:30 +1000 Subject: [PATCH 2/4] Skip KZG checks on EL fetch blob sidecars. --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- .../beacon_chain/src/blob_verification.rs | 26 ++--- .../src/data_availability_checker.rs | 13 ++- .../fetch_blobs/fetch_blobs_beacon_adapter.rs | 24 ++-- .../beacon_chain/src/fetch_blobs/mod.rs | 103 +++++++++--------- .../beacon_chain/src/fetch_blobs/tests.rs | 36 ++++-- .../src/network_beacon_processor/mod.rs | 14 ++- 7 files changed, 121 insertions(+), 97 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d30109728f5..fcce8c00efb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3664,7 +3664,7 @@ impl BeaconChain { EngineGetBlobsOutput::Blobs(blobs) => { self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?; self.data_availability_checker - .put_gossip_verified_blobs(block_root, blobs)? + .put_kzg_verified_blobs(block_root, blobs)? } EngineGetBlobsOutput::CustodyColumns(data_columns) => { self.check_columns_for_slashability( diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index a78224fb708..958416552db 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -9,7 +9,7 @@ use crate::block_verification::{ BlockSlashInfo, }; use crate::kzg_utils::{validate_blob, validate_blobs}; -use crate::observed_data_sidecars::{DoNotObserve, ObservationStrategy, Observe}; +use crate::observed_data_sidecars::{ObservationStrategy, Observe}; use crate::{metrics, BeaconChainError}; use kzg::{Error as KzgError, Kzg, KzgCommitment}; use ssz_derive::{Decode, Encode}; @@ -304,6 +304,14 @@ impl KzgVerifiedBlob { seen_timestamp: Duration::from_secs(0), } } + /// Mark a blob as KZG verified. Caller must ONLY use this on blob sidecars constructed + /// from EL blobs. + pub fn from_execution_verified(blob: Arc>, seen_timestamp: Duration) -> Self { + Self { + blob, + seen_timestamp, + } + } } /// Complete kzg verification for a `BlobSidecar`. @@ -594,21 +602,7 @@ pub fn validate_blob_sidecar_for_gossip GossipVerifiedBlob { - pub fn observe( - self, - chain: &BeaconChain, - ) -> Result, GossipBlobError> { - observe_gossip_blob(&self.blob.blob, chain)?; - Ok(GossipVerifiedBlob { - block_root: self.block_root, - blob: self.blob, - _phantom: PhantomData, - }) - } -} - -fn observe_gossip_blob( +pub fn observe_gossip_blob( blob_sidecar: &BlobSidecar, chain: &BeaconChain, ) -> Result<(), GossipBlobError> { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 54047180480..4f05e8a3b6f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -1,4 +1,6 @@ -use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlobList}; +use crate::blob_verification::{ + verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlob, KzgVerifiedBlobList, +}; use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; @@ -264,6 +266,15 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(block_root, blobs.into_iter().map(|b| b.into_inner())) } + pub fn put_kzg_verified_blobs>>( + &self, + block_root: Hash256, + blobs: I, + ) -> Result, AvailabilityCheckError> { + self.availability_cache + .put_kzg_verified_blobs(block_root, blobs) + } + /// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the data column sidecar. diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index fe8af5b70ea..9526921da73 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -1,7 +1,5 @@ -use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError}; use crate::observed_block_producers::ProposalKey; -use crate::observed_data_sidecars::DoNotObserve; use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes}; use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; use kzg::Kzg; @@ -10,7 +8,7 @@ use mockall::automock; use std::collections::HashSet; use std::sync::Arc; use task_executor::TaskExecutor; -use types::{BlobSidecar, ChainSpec, ColumnIndex, Hash256, Slot}; +use types::{ChainSpec, ColumnIndex, Hash256, Slot}; /// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic. pub(crate) struct FetchBlobsBeaconAdapter { @@ -69,11 +67,17 @@ impl FetchBlobsBeaconAdapter { .map_err(FetchEngineBlobError::RequestFailed) } - pub(crate) fn verify_blob_for_gossip( + pub(crate) fn blobs_known_for_proposal( &self, - blob: &Arc>, - ) -> Result, GossipBlobError> { - GossipVerifiedBlob::::new(blob.clone(), blob.index, &self.chain) + proposer: u64, + slot: Slot, + ) -> Option> { + let proposer_key = ProposalKey::new(proposer, slot); + self.chain + .observed_blob_sidecars + .read() + .known_for_proposal(&proposer_key) + .cloned() } pub(crate) fn data_column_known_for_proposal( @@ -87,6 +91,12 @@ impl FetchBlobsBeaconAdapter { .cloned() } + pub(crate) fn cached_blob_indexes(&self, block_root: &Hash256) -> Option> { + self.chain + .data_availability_checker + .cached_blob_indexes(block_root) + } + pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { self.chain .data_availability_checker diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index 21a27ace119..efc7854fa5f 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -12,14 +12,14 @@ mod fetch_blobs_beacon_adapter; #[cfg(test)] mod tests; -use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::blob_verification::{GossipBlobError, KzgVerifiedBlob}; use crate::block_verification_types::AsBlock; use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn}; #[cfg_attr(test, double)] use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::ProposalKey; -use crate::observed_data_sidecars::DoNotObserve; +use crate::validator_monitor::timestamp_now; use crate::{ metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, @@ -34,11 +34,11 @@ use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_h use std::collections::HashSet; use std::sync::Arc; use tracing::{debug, warn}; -use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; +use types::blob_sidecar::BlobSidecarError; use types::data_column_sidecar::DataColumnSidecarError; use types::{ - BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, EthSpec, FullPayload, Hash256, - KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, + BeaconStateError, Blob, BlobSidecar, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs, + SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, }; /// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the @@ -46,7 +46,7 @@ use types::{ /// be published immediately. #[derive(Debug)] pub enum EngineGetBlobsOutput { - Blobs(Vec>), + Blobs(Vec>), /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`. CustodyColumns(Vec>), } @@ -186,46 +186,47 @@ async fn fetch_and_process_blobs_v1( .signed_block_header_and_kzg_commitments_proof() .map_err(FetchEngineBlobError::BeaconStateError)?; - let fixed_blob_sidecar_list = build_blob_sidecars( + let mut blob_sidecar_list = build_blob_sidecars( &block, response, signed_block_header, &kzg_commitments_proof, - chain_adapter.spec(), )?; - // Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from - // the EL making it into the data availability checker. We do not immediately add these - // blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip - // and be accepted (and propagated) while we are waiting to publish. Just before publishing - // we will observe the blobs/columns and only proceed with publishing if they are not yet seen. - let blobs_to_import_and_publish = fixed_blob_sidecar_list - .into_iter() - .filter_map(|opt_blob| { - let blob = opt_blob.as_ref()?; - match chain_adapter.verify_blob_for_gossip(blob) { - Ok(verified) => Some(Ok(verified)), - // Ignore already seen blobs. - Err(GossipBlobError::RepeatBlob { .. }) => None, - Err(e) => Some(Err(e)), - } - }) - .collect::, _>>() - .map_err(FetchEngineBlobError::GossipBlob)?; + if let Some(observed_blobs) = + chain_adapter.blobs_known_for_proposal(block.message().proposer_index(), block.slot()) + { + blob_sidecar_list.retain(|blob| !observed_blobs.contains(&blob.blob_index())); + if blob_sidecar_list.is_empty() { + debug!( + info = "blobs have already been seen on gossip", + "Ignoring EL blobs response" + ); + return Ok(None); + } + } - if blobs_to_import_and_publish.is_empty() { - return Ok(None); + if let Some(known_blobs) = chain_adapter.cached_blob_indexes(&block_root) { + blob_sidecar_list.retain(|blob| !known_blobs.contains(&blob.blob_index())); + if blob_sidecar_list.is_empty() { + debug!( + info = "blobs have already been imported into data availability checker", + "Ignoring EL blobs response" + ); + return Ok(None); + } } - publish_fn(EngineGetBlobsOutput::Blobs( - blobs_to_import_and_publish.clone(), - )); + // Up until this point we have not observed the blobs in the gossip cache, which allows them to + // arrive independently while this function is running. In `publish_fn` we will observe them + // and then publish any blobs that had not already been observed. + publish_fn(EngineGetBlobsOutput::Blobs(blob_sidecar_list.clone())); let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, - EngineGetBlobsOutput::Blobs(blobs_to_import_and_publish), + EngineGetBlobsOutput::Blobs(blob_sidecar_list), ) .await?; @@ -311,6 +312,9 @@ async fn fetch_and_process_blobs_v2( return Ok(None); } + // Up until this point we have not observed the data columns in the gossip cache, which allows + // them to arrive independently while this function is running. In publish_fn we will observe + // them and then publish any columns that had not already been observed. publish_fn(EngineGetBlobsOutput::CustodyColumns( custody_columns_to_import.clone(), )); @@ -405,37 +409,28 @@ fn build_blob_sidecars( response: Vec>>, signed_block_header: SignedBeaconBlockHeader, kzg_commitments_inclusion_proof: &FixedVector, - spec: &ChainSpec, -) -> Result, FetchEngineBlobError> { - let epoch = block.epoch(); - let mut fixed_blob_sidecar_list = - FixedBlobSidecarList::default(spec.max_blobs_per_block(epoch) as usize); +) -> Result>, FetchEngineBlobError> { + let mut sidecars = vec![]; for (index, blob_and_proof) in response .into_iter() .enumerate() - .filter_map(|(i, opt_blob)| Some((i, opt_blob?))) + .filter_map(|(index, opt_blob)| Some((index, opt_blob?))) { - match BlobSidecar::new_with_existing_proof( + let blob_sidecar = BlobSidecar::new_with_existing_proof( index, blob_and_proof.blob, block, signed_block_header.clone(), kzg_commitments_inclusion_proof, blob_and_proof.proof, - ) { - Ok(blob) => { - if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(index) { - *blob_mut = Some(Arc::new(blob)); - } else { - return Err(FetchEngineBlobError::InternalError(format!( - "Blobs from EL contains blob with invalid index {index}" - ))); - } - } - Err(e) => { - return Err(FetchEngineBlobError::BlobSidecarError(e)); - } - } + ) + .map_err(FetchEngineBlobError::BlobSidecarError)?; + + sidecars.push(KzgVerifiedBlob::from_execution_verified( + Arc::new(blob_sidecar), + timestamp_now(), + )); } - Ok(fixed_blob_sidecar_list) + + Ok(sidecars) } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 9cb597c6df9..7b2e22160f1 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -252,6 +252,7 @@ mod get_blobs_v1 { use super::*; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::block_verification_types::AsBlock; + use std::collections::HashSet; const ELECTRA_FORK: ForkName = ForkName::Electra; @@ -325,10 +326,13 @@ mod get_blobs_v1 { mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); // AND block is not imported into fork choice mock_fork_choice_contains_block(&mut mock_adapter, vec![]); - // AND all blobs returned are valid + // AND all blobs have not yet been seen mock_adapter - .expect_verify_blob_for_gossip() - .returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone()))); + .expect_cached_blob_indexes() + .returning(|_| None); + mock_adapter + .expect_blobs_known_for_proposal() + .returning(|_, _| None); // Returned blobs should be processed mock_process_engine_blobs_result( &mut mock_adapter, @@ -408,17 +412,22 @@ mod get_blobs_v1 { // **GIVEN**: // All blobs returned let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::>(); + let all_blob_indices = blob_and_proof_opts + .iter() + .enumerate() + .map(|(i, _)| i as u64) + .collect::>(); + mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); // block not yet imported into fork choice mock_fork_choice_contains_block(&mut mock_adapter, vec![]); // All blobs already seen on gossip - mock_adapter.expect_verify_blob_for_gossip().returning(|b| { - Err(GossipBlobError::RepeatBlob { - proposer: b.block_proposer_index(), - slot: b.slot(), - index: b.index, - }) - }); + mock_adapter + .expect_cached_blob_indexes() + .returning(|_| None); + mock_adapter + .expect_blobs_known_for_proposal() + .returning(|_, _| Some(all_blob_indices.clone())); // **WHEN**: Trigger `fetch_blobs` on the block let custody_columns = hashset![0, 1, 2]; @@ -454,8 +463,11 @@ mod get_blobs_v1 { mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); mock_fork_choice_contains_block(&mut mock_adapter, vec![]); mock_adapter - .expect_verify_blob_for_gossip() - .returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone()))); + .expect_cached_blob_indexes() + .returning(|_| None); + mock_adapter + .expect_blobs_known_for_proposal() + .returning(|_, _| None); mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f7c3a1bf8db..9e7d4693401 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,13 +1,12 @@ use crate::sync::manager::BlockProcessType; use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; -use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use beacon_chain::blob_verification::{observe_gossip_blob, GossipBlobError}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError}; use beacon_chain::fetch_blobs::{ fetch_and_process_engine_blobs, EngineGetBlobsOutput, FetchEngineBlobError, }; -use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, }; @@ -798,7 +797,10 @@ impl NetworkBeaconProcessor { if publish_blobs { match blobs_or_data_column { EngineGetBlobsOutput::Blobs(blobs) => { - self_cloned.publish_blobs_gradually(blobs, block_root); + self_cloned.publish_blobs_gradually( + blobs.into_iter().map(|b| b.clone_blob()).collect(), + block_root, + ); } EngineGetBlobsOutput::CustodyColumns(columns) => { self_cloned.publish_data_columns_gradually( @@ -941,7 +943,7 @@ impl NetworkBeaconProcessor { /// publisher exists for a blob, it will eventually get published here. fn publish_blobs_gradually( self: &Arc, - mut blobs: Vec>, + mut blobs: Vec>>, block_root: Hash256, ) { let self_clone = self.clone(); @@ -972,8 +974,8 @@ impl NetworkBeaconProcessor { while blobs_iter.peek().is_some() { let batch = blobs_iter.by_ref().take(batch_size); let publishable = batch - .filter_map(|unobserved| match unobserved.observe(&chain) { - Ok(observed) => Some(observed.clone_blob()), + .filter_map(|blob| match observe_gossip_blob(&blob, &chain) { + Ok(()) => Some(blob), Err(GossipBlobError::RepeatBlob { .. }) => None, Err(e) => { warn!( From 267a3d37771c99cf023150e0a96d692a1db3e85b Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 22 Jul 2025 18:00:47 +1000 Subject: [PATCH 3/4] Fix lint --- beacon_node/beacon_chain/src/fetch_blobs/tests.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 7b2e22160f1..f1ffabdd8f8 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -250,7 +250,6 @@ mod get_blobs_v2 { mod get_blobs_v1 { use super::*; - use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::block_verification_types::AsBlock; use std::collections::HashSet; @@ -427,7 +426,7 @@ mod get_blobs_v1 { .returning(|_| None); mock_adapter .expect_blobs_known_for_proposal() - .returning(|_, _| Some(all_blob_indices.clone())); + .returning(move |_, _| Some(all_blob_indices.clone())); // **WHEN**: Trigger `fetch_blobs` on the block let custody_columns = hashset![0, 1, 2]; From 8acbed39eeccd4b613ea06dd6b5038f68b887179 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 25 Jul 2025 13:10:17 +1000 Subject: [PATCH 4/4] Remove clone --- beacon_node/network/src/network_beacon_processor/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 9e7d4693401..613a0692fb4 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -798,7 +798,7 @@ impl NetworkBeaconProcessor { match blobs_or_data_column { EngineGetBlobsOutput::Blobs(blobs) => { self_cloned.publish_blobs_gradually( - blobs.into_iter().map(|b| b.clone_blob()).collect(), + blobs.into_iter().map(|b| b.to_blob()).collect(), block_root, ); }