diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 117377c9245..130ca994d05 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -268,21 +268,51 @@ impl NetworkBeaconProcessor { inbound_request_id: InboundRequestId, request: BlobsByRootRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { - let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root) - else { - // No blob ids requested. - return Ok(()); - }; - let requested_indices = request - .blob_ids - .as_slice() - .iter() - .map(|id| id.index) - .collect::>(); let mut send_blob_count = 0; + let fulu_start_slot = self + .chain + .spec + .fulu_fork_epoch + .map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch())); + let mut blob_list_results = HashMap::new(); + let mut slots_by_block_root = HashMap::new(); + // For logging purpose, to display one log per block root + let mut logging_index_by_block_root = HashMap::new(); for id in request.blob_ids.as_slice() { + let BlobIdentifier { + block_root: root, + index, + } = id; + + // Get the slot for where the blob belongs to from the HashMap or cache without touching the database + let slot = if let Some(slot) = slots_by_block_root.get(root) { + Some(*slot) + } else { + // Try to get block from caches to extract slot + if let Some(block) = self + .chain + .data_availability_checker + .get_execution_valid_block(root) + .or_else(|| self.chain.early_attester_cache.get_block(*root)) + { + let slot = block.slot(); + slots_by_block_root.insert(*root, slot); + Some(slot) + } else { + // Block not found in cache, returning None to query blobs from the database later + None + } + }; + + // Skip if slot is >= fulu_start_slot + if let (Some(slot), Some(fulu_slot)) = (slot, fulu_start_slot) + && slot >= fulu_slot + { + continue; + } + // First attempt to get the blobs from the RPC cache. if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) { self.send_response( @@ -291,12 +321,11 @@ impl NetworkBeaconProcessor { Response::BlobsByRoot(Some(blob)), ); send_blob_count += 1; + logging_index_by_block_root + .entry(*root) + .or_insert(Vec::new()) + .push(*index); } else { - let BlobIdentifier { - block_root: root, - index, - } = id; - let blob_list_result = match blob_list_results.entry(root) { Entry::Vacant(entry) => { entry.insert(self.chain.get_blobs_checking_early_attester_cache(root)) @@ -314,6 +343,10 @@ impl NetworkBeaconProcessor { Response::BlobsByRoot(Some(blob_sidecar.clone())), ); send_blob_count += 1; + logging_index_by_block_root + .entry(*root) + .or_insert(Vec::new()) + .push(*index); break 'inner; } } @@ -329,10 +362,10 @@ impl NetworkBeaconProcessor { } } } + debug!( %peer_id, - %requested_root, - ?requested_indices, + block_root = ?logging_index_by_block_root.keys(), returned = send_blob_count, "BlobsByRoot outgoing response processed" ); @@ -893,6 +926,34 @@ impl NetworkBeaconProcessor { ); let request_start_slot = Slot::from(req.start_slot); + let request_start_epoch = request_start_slot.epoch(T::EthSpec::slots_per_epoch()); + let fork_name = self.chain.spec.fork_name_at_epoch(request_start_epoch); + // Should not send more than max request blob sidecars + if req.max_blobs_requested(request_start_epoch, &self.chain.spec) + > self.chain.spec.max_request_blob_sidecars(fork_name) as u64 + { + return Err(( + RpcErrorResponse::InvalidRequest, + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS_ELECTRA`", + )); + } + + let effective_count = if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch { + let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let request_end_slot = request_start_slot.saturating_add(req.count) - 1; + + // If the request_start_slot is at or after a Fulu slot, return an empty response + if request_start_slot >= fulu_start_slot { + return Ok(()); + // For the case that the request slots spans across the Fulu fork slot + } else if request_end_slot >= fulu_start_slot { + (fulu_start_slot - request_start_slot).as_u64() + } else { + req.count + } + } else { + req.count + }; let data_availability_boundary_slot = match self.chain.data_availability_boundary() { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), @@ -930,7 +991,7 @@ impl NetworkBeaconProcessor { } let block_roots = - self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; + self.get_block_roots_for_slot_range(req.start_slot, effective_count, "BlobsByRange")?; let current_slot = self .chain @@ -957,7 +1018,7 @@ impl NetworkBeaconProcessor { // Due to skip slots, blobs could be out of the range, we ensure they // are in the range before sending if blob_sidecar.slot() >= request_start_slot - && blob_sidecar.slot() < request_start_slot + req.count + && blob_sidecar.slot() < request_start_slot + effective_count { blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { @@ -1024,7 +1085,7 @@ impl NetworkBeaconProcessor { if req.max_requested::() > self.chain.spec.max_request_data_column_sidecars { return Err(( RpcErrorResponse::InvalidRequest, - "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", + "Request exceeded `MAX_REQUEST_DATA_COLUMN_SIDECARS`", )); } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 557f9a29141..8c6e76c44b3 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -21,7 +21,7 @@ use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; use lighthouse_network::rpc::InboundRequestId; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, MetaDataV3}; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, discv5::enr::{self, CombinedKey}, @@ -34,12 +34,12 @@ use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::blob_sidecar::FixedBlobSidecarList; +use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::{ AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, - SubnetId, + RuntimeVariableList, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, + SingleAttestation, Slot, SubnetId, }; type E = MainnetEthSpec; @@ -419,15 +419,22 @@ impl TestRig { } } - pub fn enqueue_blobs_by_range_request(&self, count: u64) { + pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) { self.network_beacon_processor .send_blobs_by_range_request( PeerId::random(), InboundRequestId::new_unchecked(42, 24), - BlobsByRangeRequest { - start_slot: 0, - count, - }, + BlobsByRangeRequest { start_slot, count }, + ) + .unwrap(); + } + + pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList) { + self.network_beacon_processor + .send_blobs_by_roots_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + BlobsByRootRequest { blob_ids }, ) .unwrap(); } @@ -1328,8 +1335,9 @@ async fn test_blobs_by_range() { return; }; let mut rig = TestRig::new(64).await; + let start_slot = 0; let slot_count = 32; - rig.enqueue_blobs_by_range_request(slot_count); + rig.enqueue_blobs_by_range_request(start_slot, slot_count); let mut blob_count = 0; for slot in 0..slot_count { @@ -1365,3 +1373,200 @@ async fn test_blobs_by_range() { } assert_eq!(blob_count, actual_count); } + +#[tokio::test] +async fn test_blobs_by_range_post_fulu_should_return_empty() { + // Only test for Fulu fork + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + let mut rig = TestRig::new(64).await; + let start_slot = 0; + let slot_count = 32; + rig.enqueue_blobs_by_range_request(start_slot, slot_count); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + // Post-Fulu should return 0 blobs + assert_eq!(0, actual_count); +} + +#[tokio::test] +async fn test_blobs_by_range_spans_fulu_fork() { + // Only test for Electra & Fulu fork transition + if test_spec::().electra_fork_epoch.is_none() { + return; + }; + let mut spec = test_spec::(); + spec.fulu_fork_epoch = Some(Epoch::new(1)); + + let mut rig = TestRig::new_parametric(64, BeaconProcessorConfig::default(), spec).await; + + let start_slot = 16; + // This will span from epoch 0 (Electra) to epoch 1 (Fulu) + let slot_count = 32; + + rig.enqueue_blobs_by_range_request(start_slot, slot_count); + + let mut blob_count = 0; + for slot in start_slot..slot_count { + let root = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap(); + blob_count += root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + } + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(blob_count, actual_count); +} + +#[tokio::test] +async fn test_blobs_by_root() { + if test_spec::().deneb_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + // Get the block root of a sample slot, e.g., slot 1 + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + let blobs = rig.chain.get_blobs(&block_root).unwrap(); + let blob_count = blobs.len(); + + let blob_ids: Vec = (0..blob_count) + .map(|index| BlobIdentifier { + block_root, + index: index as u64, + }) + .collect(); + + let blob_ids_list = RuntimeVariableList::new(blob_ids, blob_count).unwrap(); + + rig.enqueue_blobs_by_root_request(blob_ids_list); + + let mut blob_count = 0; + let root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap(); + blob_count += root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRoot(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(blob_count, actual_count); +} + +#[tokio::test] +async fn test_blobs_by_root_post_fulu_should_return_empty() { + // Only test for Fulu fork + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + let blob_ids = vec![BlobIdentifier { + block_root, + index: 0, + }]; + + let blob_ids_list = RuntimeVariableList::new(blob_ids, 1).unwrap(); + + rig.enqueue_blobs_by_root_request(blob_ids_list); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRoot(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + // Post-Fulu should return 0 blobs + assert_eq!(0, actual_count); +}