diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 8db432bbec3..749c690cb90 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3664,7 +3664,7 @@ impl BeaconChain { EngineGetBlobsOutput::Blobs(blobs) => { self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?; self.data_availability_checker - .put_gossip_verified_blobs(block_root, blobs)? + .put_kzg_verified_blobs(block_root, blobs)? } EngineGetBlobsOutput::CustodyColumns(data_columns) => { self.check_columns_for_slashability( diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index a78224fb708..958416552db 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -9,7 +9,7 @@ use crate::block_verification::{ BlockSlashInfo, }; use crate::kzg_utils::{validate_blob, validate_blobs}; -use crate::observed_data_sidecars::{DoNotObserve, ObservationStrategy, Observe}; +use crate::observed_data_sidecars::{ObservationStrategy, Observe}; use crate::{metrics, BeaconChainError}; use kzg::{Error as KzgError, Kzg, KzgCommitment}; use ssz_derive::{Decode, Encode}; @@ -304,6 +304,14 @@ impl KzgVerifiedBlob { seen_timestamp: Duration::from_secs(0), } } + /// Mark a blob as KZG verified. Caller must ONLY use this on blob sidecars constructed + /// from EL blobs. + pub fn from_execution_verified(blob: Arc>, seen_timestamp: Duration) -> Self { + Self { + blob, + seen_timestamp, + } + } } /// Complete kzg verification for a `BlobSidecar`. @@ -594,21 +602,7 @@ pub fn validate_blob_sidecar_for_gossip GossipVerifiedBlob { - pub fn observe( - self, - chain: &BeaconChain, - ) -> Result, GossipBlobError> { - observe_gossip_blob(&self.blob.blob, chain)?; - Ok(GossipVerifiedBlob { - block_root: self.block_root, - blob: self.blob, - _phantom: PhantomData, - }) - } -} - -fn observe_gossip_blob( +pub fn observe_gossip_blob( blob_sidecar: &BlobSidecar, chain: &BeaconChain, ) -> Result<(), GossipBlobError> { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 54047180480..4f05e8a3b6f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -1,4 +1,6 @@ -use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlobList}; +use crate::blob_verification::{ + verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlob, KzgVerifiedBlobList, +}; use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; @@ -264,6 +266,15 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(block_root, blobs.into_iter().map(|b| b.into_inner())) } + pub fn put_kzg_verified_blobs>>( + &self, + block_root: Hash256, + blobs: I, + ) -> Result, AvailabilityCheckError> { + self.availability_cache + .put_kzg_verified_blobs(block_root, blobs) + } + /// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the data column sidecar. diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index fe8af5b70ea..9526921da73 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -1,7 +1,5 @@ -use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError}; use crate::observed_block_producers::ProposalKey; -use crate::observed_data_sidecars::DoNotObserve; use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes}; use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; use kzg::Kzg; @@ -10,7 +8,7 @@ use mockall::automock; use std::collections::HashSet; use std::sync::Arc; use task_executor::TaskExecutor; -use types::{BlobSidecar, ChainSpec, ColumnIndex, Hash256, Slot}; +use types::{ChainSpec, ColumnIndex, Hash256, Slot}; /// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic. pub(crate) struct FetchBlobsBeaconAdapter { @@ -69,11 +67,17 @@ impl FetchBlobsBeaconAdapter { .map_err(FetchEngineBlobError::RequestFailed) } - pub(crate) fn verify_blob_for_gossip( + pub(crate) fn blobs_known_for_proposal( &self, - blob: &Arc>, - ) -> Result, GossipBlobError> { - GossipVerifiedBlob::::new(blob.clone(), blob.index, &self.chain) + proposer: u64, + slot: Slot, + ) -> Option> { + let proposer_key = ProposalKey::new(proposer, slot); + self.chain + .observed_blob_sidecars + .read() + .known_for_proposal(&proposer_key) + .cloned() } pub(crate) fn data_column_known_for_proposal( @@ -87,6 +91,12 @@ impl FetchBlobsBeaconAdapter { .cloned() } + pub(crate) fn cached_blob_indexes(&self, block_root: &Hash256) -> Option> { + self.chain + .data_availability_checker + .cached_blob_indexes(block_root) + } + pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { self.chain .data_availability_checker diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index bf4409fbb9c..efc7854fa5f 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -12,14 +12,14 @@ mod fetch_blobs_beacon_adapter; #[cfg(test)] mod tests; -use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::blob_verification::{GossipBlobError, KzgVerifiedBlob}; use crate::block_verification_types::AsBlock; use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn}; #[cfg_attr(test, double)] use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::ProposalKey; -use crate::observed_data_sidecars::DoNotObserve; +use crate::validator_monitor::timestamp_now; use crate::{ metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, @@ -34,11 +34,11 @@ use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_h use std::collections::HashSet; use std::sync::Arc; use tracing::{debug, warn}; -use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; +use types::blob_sidecar::BlobSidecarError; use types::data_column_sidecar::DataColumnSidecarError; use types::{ - BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, EthSpec, FullPayload, Hash256, - KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, + BeaconStateError, Blob, BlobSidecar, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs, + SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, }; /// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the @@ -46,7 +46,7 @@ use types::{ /// be published immediately. #[derive(Debug)] pub enum EngineGetBlobsOutput { - Blobs(Vec>), + Blobs(Vec>), /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`. CustodyColumns(Vec>), } @@ -186,46 +186,47 @@ async fn fetch_and_process_blobs_v1( .signed_block_header_and_kzg_commitments_proof() .map_err(FetchEngineBlobError::BeaconStateError)?; - let fixed_blob_sidecar_list = build_blob_sidecars( + let mut blob_sidecar_list = build_blob_sidecars( &block, response, signed_block_header, &kzg_commitments_proof, - chain_adapter.spec(), )?; - // Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from - // the EL making it into the data availability checker. We do not immediately add these - // blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip - // and be accepted (and propagated) while we are waiting to publish. Just before publishing - // we will observe the blobs/columns and only proceed with publishing if they are not yet seen. - let blobs_to_import_and_publish = fixed_blob_sidecar_list - .into_iter() - .filter_map(|opt_blob| { - let blob = opt_blob.as_ref()?; - match chain_adapter.verify_blob_for_gossip(blob) { - Ok(verified) => Some(Ok(verified)), - // Ignore already seen blobs. - Err(GossipBlobError::RepeatBlob { .. }) => None, - Err(e) => Some(Err(e)), - } - }) - .collect::, _>>() - .map_err(FetchEngineBlobError::GossipBlob)?; + if let Some(observed_blobs) = + chain_adapter.blobs_known_for_proposal(block.message().proposer_index(), block.slot()) + { + blob_sidecar_list.retain(|blob| !observed_blobs.contains(&blob.blob_index())); + if blob_sidecar_list.is_empty() { + debug!( + info = "blobs have already been seen on gossip", + "Ignoring EL blobs response" + ); + return Ok(None); + } + } - if blobs_to_import_and_publish.is_empty() { - return Ok(None); + if let Some(known_blobs) = chain_adapter.cached_blob_indexes(&block_root) { + blob_sidecar_list.retain(|blob| !known_blobs.contains(&blob.blob_index())); + if blob_sidecar_list.is_empty() { + debug!( + info = "blobs have already been imported into data availability checker", + "Ignoring EL blobs response" + ); + return Ok(None); + } } - publish_fn(EngineGetBlobsOutput::Blobs( - blobs_to_import_and_publish.clone(), - )); + // Up until this point we have not observed the blobs in the gossip cache, which allows them to + // arrive independently while this function is running. In `publish_fn` we will observe them + // and then publish any blobs that had not already been observed. + publish_fn(EngineGetBlobsOutput::Blobs(blob_sidecar_list.clone())); let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, - EngineGetBlobsOutput::Blobs(blobs_to_import_and_publish), + EngineGetBlobsOutput::Blobs(blob_sidecar_list), ) .await?; @@ -408,37 +409,28 @@ fn build_blob_sidecars( response: Vec>>, signed_block_header: SignedBeaconBlockHeader, kzg_commitments_inclusion_proof: &FixedVector, - spec: &ChainSpec, -) -> Result, FetchEngineBlobError> { - let epoch = block.epoch(); - let mut fixed_blob_sidecar_list = - FixedBlobSidecarList::default(spec.max_blobs_per_block(epoch) as usize); +) -> Result>, FetchEngineBlobError> { + let mut sidecars = vec![]; for (index, blob_and_proof) in response .into_iter() .enumerate() - .filter_map(|(i, opt_blob)| Some((i, opt_blob?))) + .filter_map(|(index, opt_blob)| Some((index, opt_blob?))) { - match BlobSidecar::new_with_existing_proof( + let blob_sidecar = BlobSidecar::new_with_existing_proof( index, blob_and_proof.blob, block, signed_block_header.clone(), kzg_commitments_inclusion_proof, blob_and_proof.proof, - ) { - Ok(blob) => { - if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(index) { - *blob_mut = Some(Arc::new(blob)); - } else { - return Err(FetchEngineBlobError::InternalError(format!( - "Blobs from EL contains blob with invalid index {index}" - ))); - } - } - Err(e) => { - return Err(FetchEngineBlobError::BlobSidecarError(e)); - } - } + ) + .map_err(FetchEngineBlobError::BlobSidecarError)?; + + sidecars.push(KzgVerifiedBlob::from_execution_verified( + Arc::new(blob_sidecar), + timestamp_now(), + )); } - Ok(fixed_blob_sidecar_list) + + Ok(sidecars) } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 9cb597c6df9..f1ffabdd8f8 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -250,8 +250,8 @@ mod get_blobs_v2 { mod get_blobs_v1 { use super::*; - use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::block_verification_types::AsBlock; + use std::collections::HashSet; const ELECTRA_FORK: ForkName = ForkName::Electra; @@ -325,10 +325,13 @@ mod get_blobs_v1 { mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); // AND block is not imported into fork choice mock_fork_choice_contains_block(&mut mock_adapter, vec![]); - // AND all blobs returned are valid + // AND all blobs have not yet been seen mock_adapter - .expect_verify_blob_for_gossip() - .returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone()))); + .expect_cached_blob_indexes() + .returning(|_| None); + mock_adapter + .expect_blobs_known_for_proposal() + .returning(|_, _| None); // Returned blobs should be processed mock_process_engine_blobs_result( &mut mock_adapter, @@ -408,17 +411,22 @@ mod get_blobs_v1 { // **GIVEN**: // All blobs returned let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::>(); + let all_blob_indices = blob_and_proof_opts + .iter() + .enumerate() + .map(|(i, _)| i as u64) + .collect::>(); + mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); // block not yet imported into fork choice mock_fork_choice_contains_block(&mut mock_adapter, vec![]); // All blobs already seen on gossip - mock_adapter.expect_verify_blob_for_gossip().returning(|b| { - Err(GossipBlobError::RepeatBlob { - proposer: b.block_proposer_index(), - slot: b.slot(), - index: b.index, - }) - }); + mock_adapter + .expect_cached_blob_indexes() + .returning(|_| None); + mock_adapter + .expect_blobs_known_for_proposal() + .returning(move |_, _| Some(all_blob_indices.clone())); // **WHEN**: Trigger `fetch_blobs` on the block let custody_columns = hashset![0, 1, 2]; @@ -454,8 +462,11 @@ mod get_blobs_v1 { mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); mock_fork_choice_contains_block(&mut mock_adapter, vec![]); mock_adapter - .expect_verify_blob_for_gossip() - .returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone()))); + .expect_cached_blob_indexes() + .returning(|_| None); + mock_adapter + .expect_blobs_known_for_proposal() + .returning(|_, _| None); mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f7c3a1bf8db..613a0692fb4 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,13 +1,12 @@ use crate::sync::manager::BlockProcessType; use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; -use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use beacon_chain::blob_verification::{observe_gossip_blob, GossipBlobError}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError}; use beacon_chain::fetch_blobs::{ fetch_and_process_engine_blobs, EngineGetBlobsOutput, FetchEngineBlobError, }; -use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, }; @@ -798,7 +797,10 @@ impl NetworkBeaconProcessor { if publish_blobs { match blobs_or_data_column { EngineGetBlobsOutput::Blobs(blobs) => { - self_cloned.publish_blobs_gradually(blobs, block_root); + self_cloned.publish_blobs_gradually( + blobs.into_iter().map(|b| b.to_blob()).collect(), + block_root, + ); } EngineGetBlobsOutput::CustodyColumns(columns) => { self_cloned.publish_data_columns_gradually( @@ -941,7 +943,7 @@ impl NetworkBeaconProcessor { /// publisher exists for a blob, it will eventually get published here. fn publish_blobs_gradually( self: &Arc, - mut blobs: Vec>, + mut blobs: Vec>>, block_root: Hash256, ) { let self_clone = self.clone(); @@ -972,8 +974,8 @@ impl NetworkBeaconProcessor { while blobs_iter.peek().is_some() { let batch = blobs_iter.by_ref().take(batch_size); let publishable = batch - .filter_map(|unobserved| match unobserved.observe(&chain) { - Ok(observed) => Some(observed.clone_blob()), + .filter_map(|blob| match observe_gossip_blob(&blob, &chain) { + Ok(()) => Some(blob), Err(GossipBlobError::RepeatBlob { .. }) => None, Err(e) => { warn!(