From f72df2c00f0ce0bd692fdb804b6b058a8ddb993f Mon Sep 17 00:00:00 2001 From: Mac L Date: Thu, 5 Dec 2024 00:14:39 +1100 Subject: [PATCH 1/8] First try --- beacon_node/http_api/src/lib.rs | 19 +++++++++++ beacon_node/store/src/errors.rs | 9 ++--- beacon_node/store/src/hot_cold_store.rs | 44 +++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 77c9bcc34f3..343b619e2b2 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4446,6 +4446,24 @@ pub fn serve( }, ); + // POST lighthouse/database/import_blobs + let post_lighthouse_database_import_blobs = database_path + .and(warp::path("import_blobs")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |blobs, task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + match chain.store.import_historical_blobs(blobs) { + Ok(()) => Ok(()), + Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), + } + }) + }, + ); + // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") .and(warp::path("analysis")) @@ -4807,6 +4825,7 @@ pub fn serve( .uor(post_validator_liveness_epoch) .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) + .uor(post_lighthouse_database_import_blobs) .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 41fd17ef437..97e99a12bf6 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -50,10 +50,6 @@ pub enum Error { MissingGenesisState, MissingSnapshot(Slot), BlockReplayError(BlockReplayError), - AddPayloadLogicError, - InvalidKey, - InvalidBytes, - InconsistentFork(InconsistentFork), #[cfg(feature = "leveldb")] LevelDbError(LevelDBError), #[cfg(feature = "redb")] @@ -68,6 +64,11 @@ pub enum Error { state_root: Hash256, slot: Slot, }, + AddPayloadLogicError, + InvalidKey, + InvalidBytes, + InvalidBlobImport(String), + InconsistentFork(InconsistentFork), Hdiff(hdiff::Error), ForwardsIterInvalidColumn(DBColumn), ForwardsIterGap(DBColumn, Slot, Slot), diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 128c03f771b..286af3739b5 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -41,6 +41,8 @@ use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidec use types::*; use zstd::{Decoder, Encoder}; +const HISTORICAL_BLOB_BATCH_SIZE: usize = 1000; + /// On-disk database that stores finalized states efficiently. /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores @@ -852,6 +854,48 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + /// Import historical blobs. + pub fn import_historical_blobs( + &self, + historical_blobs: Vec<(Hash256, BlobSidecarList)>, + ) -> Result<(), Error> { + if historical_blobs.is_empty() { + return Ok(()); + } + + let mut total_imported = 0; + + for chunk in historical_blobs.chunks(HISTORICAL_BLOB_BATCH_SIZE) { + let mut ops = Vec::with_capacity(chunk.len()); + + for (block_root, blobs) in chunk { + // Verify block exists. + if !self.block_exists(block_root)? { + warn!( + self.log, + "Skipping import of blobs; block root does not exist."; + "block_root" => ?block_root, + "num_blobs" => blobs.len(), + ); + continue; + } + + self.blobs_as_kv_store_ops(block_root, blobs.clone(), &mut ops); + total_imported += blobs.len(); + } + + self.blobs_db.do_atomically(ops)?; + } + + debug!( + self.log, + "Imported historical blobs."; + "total_imported" => total_imported, + ); + + Ok(()) + } + pub fn blobs_as_kv_store_ops( &self, key: &Hash256, From 054f0d71d5a614c73adc8e51c85a67f99bbc3fdd Mon Sep 17 00:00:00 2001 From: Mac L Date: Fri, 6 Dec 2024 17:48:52 +1100 Subject: [PATCH 2/8] Additional checks and support SSZ --- beacon_node/http_api/src/lib.rs | 80 +++++++++++++++++++++---- beacon_node/store/src/hot_cold_store.rs | 65 +++++++++++++++----- common/eth2/src/lighthouse.rs | 17 ++++++ common/eth2/src/types.rs | 5 ++ 4 files changed, 140 insertions(+), 27 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 343b619e2b2..b751071cd2e 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -34,9 +34,10 @@ use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; use crate::version::fork_versioned_response; use beacon_chain::{ - attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome, - validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, - BeaconChainTypes, WhenSlotSkipped, + attestation_verification::VerifiedAttestation, blob_verification::verify_kzg_for_blob_list, + observed_operations::ObservationOutcome, validator_monitor::timestamp_now, + AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, + WhenSlotSkipped, }; use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; pub use block_id::BlockId; @@ -65,7 +66,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; -use ssz::Encode; +use ssz::{Decode, Encode}; pub use state_id::StateId; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -85,11 +86,12 @@ use tokio_stream::{ }; use types::{ fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, - AttesterSlashing, BeaconStateError, ChainSpec, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, - ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, - RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, - SyncCommitteeMessage, SyncContributionData, + AttesterSlashing, BeaconStateError, BlobSidecarList, ChainSpec, CommitteeCache, + ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, + ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, + SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, + SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, + SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -4450,13 +4452,68 @@ pub fn serve( let post_lighthouse_database_import_blobs = database_path .and(warp::path("import_blobs")) .and(warp::path::end()) + .and(warp::query::()) .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( - |blobs, task_spawner: TaskSpawner, chain: Arc>| { + |query: api_types::ImportBlobsQuery, + blob_lists: Vec>, + task_spawner: TaskSpawner, + chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { - match chain.store.import_historical_blobs(blobs) { + if query.verify { + for blob_list in &blob_lists { + match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { + Ok(()) => (), + Err(e) => { + return Err(warp_utils::reject::custom_server_error(format!( + "{e:?}" + ))) + } + } + } + } + + match chain.store.import_blobs_batch(blob_lists) { + Ok(()) => Ok(()), + Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), + } + }) + }, + ); + + // POST lighthouse/database/import_blobs_ssz + let post_lighthouse_database_import_blobs_ssz = database_path + .and(warp::path("import_blobs_ssz")) + .and(warp::path::end()) + .and(warp::query::()) + .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: api_types::ImportBlobsQuery, + body: Bytes, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let blob_lists = Vec::>::from_ssz_bytes(&body) + .map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?; + + if query.verify { + for blob_list in &blob_lists { + match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { + Ok(()) => (), + Err(e) => { + return Err(warp_utils::reject::custom_server_error(format!( + "{e:?}" + ))) + } + } + } + } + + match chain.store.import_blobs_batch(blob_lists) { Ok(()) => Ok(()), Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), } @@ -4826,6 +4883,7 @@ pub fn serve( .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) .uor(post_lighthouse_database_import_blobs) + .uor(post_lighthouse_database_import_blobs_ssz) .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 286af3739b5..c6aaa7ffefe 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -41,8 +41,6 @@ use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidec use types::*; use zstd::{Decoder, Encoder}; -const HISTORICAL_BLOB_BATCH_SIZE: usize = 1000; - /// On-disk database that stores finalized states efficiently. /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores @@ -854,10 +852,15 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Import historical blobs. - pub fn import_historical_blobs( + /// Import a batch of blobs. + /// Implements the following checks: + /// - Checks that `block_root` is consistent across each `BlobSidecarList`. + /// - Checks that `block_root` exists in the database. + /// - Checks if a `BlobSidecarList` is already stored for that `block_root`. + /// If it is, ensure it matches the `BlobSidecarList` we are attempting to store. + pub fn import_blobs_batch( &self, - historical_blobs: Vec<(Hash256, BlobSidecarList)>, + historical_blobs: Vec>, ) -> Result<(), Error> { if historical_blobs.is_empty() { return Ok(()); @@ -865,28 +868,58 @@ impl, Cold: ItemStore> HotColdDB let mut total_imported = 0; - for chunk in historical_blobs.chunks(HISTORICAL_BLOB_BATCH_SIZE) { - let mut ops = Vec::with_capacity(chunk.len()); + let mut ops = vec![]; - for (block_root, blobs) in chunk { - // Verify block exists. - if !self.block_exists(block_root)? { - warn!( + for blob_list in historical_blobs { + // Ensure all block_roots in the blob list are the same. + let block_root = { + let first_block_root = blob_list[0].block_root(); + if !blob_list + .iter() + .all(|blob| blob.block_root() == first_block_root) + { + return Err(Error::InvalidBlobImport( + "Inconsistent block roots".to_string(), + )); + } + first_block_root + }; + + // Verify block exists. + if !self.block_exists(&block_root)? { + warn!( + self.log, + "Aborting blob import; block root does not exist."; + "block_root" => ?block_root, + "num_blobs" => blob_list.len(), + ); + return Err(Error::InvalidBlobImport("Missing block root".to_string())); + } + + // Check if a blob_list is already stored for this block root. + if let Some(existing_blob_list) = self.get_blobs(&block_root)? { + if existing_blob_list == blob_list { + debug!( self.log, - "Skipping import of blobs; block root does not exist."; + "Skipping blob import as identical blob exists"; "block_root" => ?block_root, - "num_blobs" => blobs.len(), + "num_blobs" => blob_list.len(), ); continue; } - self.blobs_as_kv_store_ops(block_root, blobs.clone(), &mut ops); - total_imported += blobs.len(); + return Err(Error::InvalidBlobImport(format!( + "Conflicting blobs exist for block root {:?}", + block_root + ))); } - self.blobs_db.do_atomically(ops)?; + self.blobs_as_kv_store_ops(&block_root, blob_list.clone(), &mut ops); + total_imported += blob_list.len(); } + self.blobs_db.do_atomically(ops)?; + debug!( self.log, "Imported historical blobs."; diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index badc4857c4c..8a460aade9e 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -13,6 +13,7 @@ use crate::{ }, BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot, }; +use bytes::Bytes; use proto_array::core::ProtoArray; use serde::{Deserialize, Serialize}; use ssz::four_byte_option_impl; @@ -406,6 +407,22 @@ impl BeaconNodeHttpClient { self.post_with_response(path, &()).await } + /// `POST lighthouse/database/import_blobs_ssz` + pub async fn post_lighthouse_database_import_blobs_ssz( + &self, + blobs: Bytes, + ) -> Result { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("database") + .push("import_blobs_ssz"); + + self.post_with_response(path, &blobs).await + } + /* Analysis endpoints. */ diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 59374f629d6..3035c953bd1 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -811,6 +811,11 @@ pub struct LightClientUpdateResponseChunk { pub payload: Vec, } +#[derive(Clone, Serialize, Deserialize)] +pub struct ImportBlobsQuery { + pub verify: bool, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub struct BeaconCommitteeSubscription { #[serde(with = "serde_utils::quoted_u64")] From 3c0f7be8f1979346f0de620e9b5e14271097f117 Mon Sep 17 00:00:00 2001 From: Mac L Date: Thu, 20 Feb 2025 02:29:35 +1100 Subject: [PATCH 3/8] Update blobs_manager --- Cargo.lock | 4 + beacon_node/http_api/src/lib.rs | 75 ++++++++++++-- beacon_node/store/src/hot_cold_store.rs | 67 ++++++++----- common/eth2/Cargo.toml | 1 + common/eth2/src/lib.rs | 21 ++++ common/eth2/src/lighthouse.rs | 33 +++++- common/eth2/src/types.rs | 8 +- database_manager/Cargo.toml | 3 + database_manager/src/blobs_manager/cli.rs | 100 +++++++++++++++++++ database_manager/src/blobs_manager/export.rs | 63 ++++++++++++ database_manager/src/blobs_manager/import.rs | 36 +++++++ database_manager/src/blobs_manager/mod.rs | 6 ++ database_manager/src/blobs_manager/verify.rs | 51 ++++++++++ database_manager/src/cli.rs | 5 +- database_manager/src/lib.rs | 18 ++++ 15 files changed, 456 insertions(+), 35 deletions(-) create mode 100644 database_manager/src/blobs_manager/cli.rs create mode 100644 database_manager/src/blobs_manager/export.rs create mode 100644 database_manager/src/blobs_manager/import.rs create mode 100644 database_manager/src/blobs_manager/mod.rs create mode 100644 database_manager/src/blobs_manager/verify.rs diff --git a/Cargo.lock b/Cargo.lock index 4581fb9ce05..501c9b58ac8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1975,7 +1975,10 @@ dependencies = [ "clap", "clap_utils", "environment", + "eth2", + "ethereum_ssz", "hex", + "sensitive_url", "serde", "slog", "store", @@ -2603,6 +2606,7 @@ dependencies = [ name = "eth2" version = "0.1.0" dependencies = [ + "bytes", "derivative", "either", "eth2_keystore", diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b751071cd2e..f4a8740bed4 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -86,7 +86,7 @@ use tokio_stream::{ }; use types::{ fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, - AttesterSlashing, BeaconStateError, BlobSidecarList, ChainSpec, CommitteeCache, + AttesterSlashing, BeaconStateError, BlobSidecar, BlobSidecarList, ChainSpec, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, @@ -4448,6 +4448,8 @@ pub fn serve( }, ); + // POST lighthouse/database/verify_blobs + // POST lighthouse/database/import_blobs let post_lighthouse_database_import_blobs = database_path .and(warp::path("import_blobs")) @@ -4462,7 +4464,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { - if query.verify { + if query.verify == Some(true) { for blob_list in &blob_lists { match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { Ok(()) => (), @@ -4497,11 +4499,30 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { - let blob_lists = Vec::>::from_ssz_bytes(&body) + let blob_lists = Vec::>>>::from_ssz_bytes( + &body, + ) + .map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?; + + if blob_lists.is_empty() { + return Err(warp_utils::reject::custom_server_error( + "Blob list must not be empty".to_string(), + )); + } + + // Build `BlobSidecarList`s from the `Vec`s. + let blob_sidecar_lists: Vec> = blob_lists + .into_iter() + .map(|blob_sidecars| { + let max_blobs_at_epoch = + chain.spec.max_blobs_per_block(blob_sidecars[0].epoch()) as usize; + BlobSidecarList::new(blob_sidecars, max_blobs_at_epoch) + }) + .collect::, _>>() .map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?; - if query.verify { - for blob_list in &blob_lists { + if query.verify == Some(true) { + for blob_list in &blob_sidecar_lists { match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { Ok(()) => (), Err(e) => { @@ -4513,7 +4534,7 @@ pub fn serve( } } - match chain.store.import_blobs_batch(blob_lists) { + match chain.store.import_blobs_batch(blob_sidecar_lists) { Ok(()) => Ok(()), Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), } @@ -4521,6 +4542,47 @@ pub fn serve( }, ); + // GET lighthouse/database/verify_blobs + let get_lighthouse_database_verify_blobs = database_path + .and(warp::path("verify_blobs")) + .and(warp::path::end()) + .and(warp::query::()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: api_types::VerifyBlobsQuery, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let mut results = Vec::new(); + for slot in query.start_slot.as_u64()..=query.end_slot.as_u64() { + if let Ok((root, _, _)) = BlockId::from_slot(Slot::from(slot)).root(&chain) + { + if let Ok(blob_list_res) = chain.store.get_blobs(&root) { + if let Some(blob_list) = blob_list_res.blobs() { + if let Err(e) = + verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) + { + results.push(format!( + "slot: {slot}, block_root: {root:?}, error: {e:?}" + )); + } + } + } + } + } + + if results.is_empty() { + Ok(api_types::GenericResponse::from( + "All blobs verified successfully".to_string(), + )) + } else { + Ok(api_types::GenericResponse::from(results.join("\n"))) + } + }) + }, + ); + // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") .and(warp::path("analysis")) @@ -4820,6 +4882,7 @@ pub fn serve( .uor(get_lighthouse_eth1_deposit_cache) .uor(get_lighthouse_staking) .uor(get_lighthouse_database_info) + .uor(get_lighthouse_database_verify_blobs) .uor(get_lighthouse_block_rewards) .uor(get_lighthouse_attestation_performance) .uor( diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c6aaa7ffefe..06b7b69c72e 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -860,9 +860,9 @@ impl, Cold: ItemStore> HotColdDB /// If it is, ensure it matches the `BlobSidecarList` we are attempting to store. pub fn import_blobs_batch( &self, - historical_blobs: Vec>, + historical_blob_sidecars: Vec>, ) -> Result<(), Error> { - if historical_blobs.is_empty() { + if historical_blob_sidecars.is_empty() { return Ok(()); } @@ -870,13 +870,13 @@ impl, Cold: ItemStore> HotColdDB let mut ops = vec![]; - for blob_list in historical_blobs { + for blob_sidecar_list in historical_blob_sidecars { // Ensure all block_roots in the blob list are the same. let block_root = { - let first_block_root = blob_list[0].block_root(); - if !blob_list + let first_block_root = blob_sidecar_list[0].block_root(); + if !blob_sidecar_list .iter() - .all(|blob| blob.block_root() == first_block_root) + .all(|blob_sidecar| blob_sidecar.block_root() == first_block_root) { return Err(Error::InvalidBlobImport( "Inconsistent block roots".to_string(), @@ -885,37 +885,54 @@ impl, Cold: ItemStore> HotColdDB first_block_root }; - // Verify block exists. + // Check block is stored for this block_root. if !self.block_exists(&block_root)? { warn!( self.log, - "Aborting blob import; block root does not exist."; + "Aborting blob import; block does not exist."; "block_root" => ?block_root, - "num_blobs" => blob_list.len(), + "num_blob_sidecars" => blob_sidecar_list.len(), ); - return Err(Error::InvalidBlobImport("Missing block root".to_string())); + return Err(Error::InvalidBlobImport("Missing block".to_string())); } - // Check if a blob_list is already stored for this block root. - if let Some(existing_blob_list) = self.get_blobs(&block_root)? { - if existing_blob_list == blob_list { - debug!( + // Check if a `blob_sidecar_list` is already stored for this block root. + match self.get_blobs(&block_root) { + Ok(BlobSidecarListFromRoot::Blobs(existing_blob_sidecar_list)) => { + // If blobs already exist, only proceed if they match exactly. + if existing_blob_sidecar_list == blob_sidecar_list { + debug!( + self.log, + "Skipping blob sidecar import as identical blob exists"; + "block_root" => ?block_root, + "num_blob_sidecars" => blob_sidecar_list.len(), + ); + continue; + } else { + return Err(Error::InvalidBlobImport(format!( + "Conflicting blobs exist for block root {:?}", + block_root + ))); + } + } + Ok(BlobSidecarListFromRoot::NoRoot) => { + // This block has no existing blobs: proceed with import. + self.blobs_as_kv_store_ops(&block_root, blob_sidecar_list.clone(), &mut ops); + total_imported += blob_sidecar_list.len(); + } + Ok(BlobSidecarListFromRoot::NoBlobs) => { + // This block should not have any blobs: reject the import. + warn!( self.log, - "Skipping blob import as identical blob exists"; + "Aborting blob import; blobs should not exist for this block_root."; "block_root" => ?block_root, - "num_blobs" => blob_list.len(), ); - continue; + return Err(Error::InvalidBlobImport( + "No blobs should exist for this block_root".to_string(), + )); } - - return Err(Error::InvalidBlobImport(format!( - "Conflicting blobs exist for block root {:?}", - block_root - ))); + Err(e) => return Err(Error::InvalidBlobImport(format!("{e:?}"))), } - - self.blobs_as_kv_store_ops(&block_root, blob_list.clone(), &mut ops); - total_imported += blob_list.len(); } self.blobs_db.do_atomically(ops)?; diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index a1bc9d025bd..bd60d695df7 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -6,6 +6,7 @@ edition = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bytes = { workspace = true } derivative = { workspace = true } either = { workspace = true } eth2_keystore = { workspace = true } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 73e9d57abc0..ce3a4d9cac5 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -470,6 +470,27 @@ impl BeaconNodeHttpClient { ok_or_error(response).await } + /// Generic POST function supporting arbitrary responses and timeouts without setting Consensus Version. + async fn post_generic_with_ssz_body, U: IntoUrl>( + &self, + url: U, + body: T, + timeout: Option, + ) -> Result { + let mut builder = self.client.post(url); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + let mut headers = HeaderMap::new(); + + headers.insert( + "Content-Type", + HeaderValue::from_static("application/octet-stream"), + ); + let response = builder.headers(headers).body(body).send().await?; + ok_or_error(response).await + } + /// `GET beacon/genesis` /// /// ## Errors diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 8a460aade9e..e8285dcf38e 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -29,6 +29,7 @@ pub use block_packing_efficiency::{ }; pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use lighthouse_network::{types::SyncState, PeerInfo}; +use reqwest::Response; pub use standard_block_rewards::StandardBlockReward; pub use sync_committee_rewards::SyncCommitteeReward; @@ -411,8 +412,10 @@ impl BeaconNodeHttpClient { pub async fn post_lighthouse_database_import_blobs_ssz( &self, blobs: Bytes, - ) -> Result { + skip_verification: bool, + ) -> Result { let mut path = self.server.full.clone(); + let verify = !skip_verification; path.path_segments_mut() .map_err(|()| Error::InvalidUrl(self.server.clone()))? @@ -420,7 +423,33 @@ impl BeaconNodeHttpClient { .push("database") .push("import_blobs_ssz"); - self.post_with_response(path, &blobs).await + if skip_verification { + path.query_pairs_mut() + .append_pair("verify", &verify.to_string()); + } + + self.post_generic_with_ssz_body(path, blobs, None).await + } + + /// `POST lighthouse/database/verify_blobs` + pub async fn get_lighthouse_database_verify_blobs( + &self, + start_slot: Slot, + end_slot: Slot, + ) -> Result { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("database") + .push("verify_blobs"); + + path.query_pairs_mut() + .append_pair("start_slot", &start_slot.to_string()) + .append_pair("end_slot", &end_slot.to_string()); + + self.get(path).await } /* diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 3035c953bd1..69e68010e69 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -813,7 +813,13 @@ pub struct LightClientUpdateResponseChunk { #[derive(Clone, Serialize, Deserialize)] pub struct ImportBlobsQuery { - pub verify: bool, + pub verify: Option, +} + +#[derive(Debug, Deserialize)] +pub struct VerifyBlobsQuery { + pub start_slot: Slot, + pub end_slot: Slot, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml index a7a54b1416c..4b9e5f8afc6 100644 --- a/database_manager/Cargo.toml +++ b/database_manager/Cargo.toml @@ -9,7 +9,10 @@ beacon_node = { workspace = true } clap = { workspace = true } clap_utils = { workspace = true } environment = { workspace = true } +eth2 = { workspace = true } +ethereum_ssz = { workspace = true } hex = { workspace = true } +sensitive_url = { workspace = true } serde = { workspace = true } slog = { workspace = true } store = { workspace = true } diff --git a/database_manager/src/blobs_manager/cli.rs b/database_manager/src/blobs_manager/cli.rs new file mode 100644 index 00000000000..410ac7e7b41 --- /dev/null +++ b/database_manager/src/blobs_manager/cli.rs @@ -0,0 +1,100 @@ +use clap::{Parser, Subcommand}; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +#[derive(Subcommand, Clone, Deserialize, Serialize, Debug)] +#[clap(rename_all = "kebab-case")] +pub enum BlobsManager { + Verify(VerifyBlobs), + Import(ImportBlobs), + Export(ExportBlobs), +} + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Perform KZG verification for blobs in the database.")] +pub struct VerifyBlobs { + #[clap( + long, + value_name = "URL", + help = "The beacon node to verify blobs. Defaults to http://localhost:5052", + display_order = 0 + )] + pub beacon_node: Option, + #[clap( + long, + value_name = "SLOT", + help = "The slot at which to begin blob verification. Defaults to the Deneb start slot", + display_order = 0 + )] + pub start_slot: Option, + #[clap( + long, + value_name = "SLOT", + help = "The slot at which to stop blob verification. Defaults to the latest slot", + display_order = 0 + )] + pub end_slot: Option, +} + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Import a batch of blobs in SSZ format into the database.")] +pub struct ImportBlobs { + #[clap( + long, + value_name = "URL", + help = "The beacon node to import blobs to", + display_order = 0 + )] + pub beacon_node: String, + + #[clap( + long, + value_name = "FILE", + help = "Input file containing blobs to import", + display_order = 0 + )] + pub input_file: PathBuf, + + #[clap( + long, + help = "Skip verification of blobs before import", + default_value = "false" + )] + pub skip_verification: bool, +} + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Export a batch of blobs in SSZ format from the database.")] +pub struct ExportBlobs { + #[clap( + long, + value_name = "URL", + help = "The beacon node to export blobs from.", + display_order = 0 + )] + pub beacon_node: String, + + #[clap( + long, + value_name = "DIR", + help = "Output dir to export blobs to.", + display_order = 0 + )] + pub output_dir: PathBuf, + + #[clap( + long, + value_name = "SLOT", + help = "The slot at which to start exporting blobs from.", + display_order = 0 + )] + pub start_slot: u64, + + #[clap( + long, + value_name = "SLOT", + help = "The slot at which to stop exporting blobs to (inclusive).", + display_order = 0 + )] + pub end_slot: u64, +} diff --git a/database_manager/src/blobs_manager/export.rs b/database_manager/src/blobs_manager/export.rs new file mode 100644 index 00000000000..67ad83fbabb --- /dev/null +++ b/database_manager/src/blobs_manager/export.rs @@ -0,0 +1,63 @@ +use crate::blobs_manager::cli::ExportBlobs; +use eth2::{ + types::{BlobSidecar, BlobSidecarList, BlockId, EthSpec, Slot}, + BeaconNodeHttpClient, Timeouts, +}; +use sensitive_url::SensitiveUrl; +use slog::{info, warn, Logger}; +use ssz::{Decode, Encode}; +use std::sync::Arc; +use std::time::Duration; + +pub async fn export_blobs(config: &ExportBlobs, log: Logger) -> Result<(), String> { + let beacon_node = SensitiveUrl::parse(&config.beacon_node) + .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; + + let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); + + let start_slot = config.start_slot; + let end_slot = config.end_slot; + + if !config.output_dir.is_dir() { + return Err("Please set `--output-dir` to a valid directory.".to_string()); + } + + let filename = config + .output_dir + .join(format!("{start_slot}_{end_slot}.ssz")); + + // TODO(blob_manager): Ensure node is synced for start_slot -> end_slot. + + let mut blobs_to_export: Vec> = vec![]; + + info!(log, "Beginning blob export"; "end_slot" => end_slot, "start_slot" => start_slot, "output_dir" => ?config.output_dir); + + for slot in start_slot..=end_slot { + if let Some(blobs) = client + .get_blobs::(BlockId::Slot(Slot::from(slot)), None) + .await + .map_err(|e| format!("Failed to export blobs: {e:?}"))? + { + let blob_sidecar_list = blobs.data; + if !blob_sidecar_list.is_empty() { + blobs_to_export.push(blob_sidecar_list); + } + } else { + // No blobs exist for this slot. + continue; + } + } + + let ssz_bytes = blobs_to_export.as_ssz_bytes(); + + // Check blobs exist so we don't create an empty file. + if !blobs_to_export.is_empty() { + std::fs::write(&filename, ssz_bytes) + .map_err(|e| format!("Failed to write blob file: {}", e))?; + info!(log, "Completed blob export"; "blobs_exported" => blobs_to_export.len()); + } else { + warn!(log, "No blobs were found for this slot range"); + } + + Ok(()) +} diff --git a/database_manager/src/blobs_manager/import.rs b/database_manager/src/blobs_manager/import.rs new file mode 100644 index 00000000000..98c076500a3 --- /dev/null +++ b/database_manager/src/blobs_manager/import.rs @@ -0,0 +1,36 @@ +use crate::blobs_manager::cli::ImportBlobs; +use eth2::{types::EthSpec, BeaconNodeHttpClient, Timeouts}; +use sensitive_url::SensitiveUrl; +use slog::{info, warn, Logger}; +use std::time::Duration; + +pub async fn import_blobs(config: &ImportBlobs, log: Logger) -> Result<(), String> { + let beacon_node = SensitiveUrl::parse(&config.beacon_node) + .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; + + let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); + + let blobs_ssz = std::fs::read(&config.input_file) + .map_err(|e| format!("Failed to read input file: {e:?}"))?; + + // TODO(blob_manager): We could _technically_ parse the slot numbers from the SSZ file + // generated during export. + + info!(log, "Beginning blob import"); + + if config.skip_verification { + warn!(log, "Skipping blob verification"); + } + + client + .post_lighthouse_database_import_blobs_ssz(blobs_ssz.into(), config.skip_verification) + .await + .map_err(|e| format!("Failed to import blobs: {e:?}"))?; + + if !config.skip_verification { + info!(log, "All blobs successfully verified"); + } + info!(log, "Completed blob import"); + + Ok(()) +} diff --git a/database_manager/src/blobs_manager/mod.rs b/database_manager/src/blobs_manager/mod.rs new file mode 100644 index 00000000000..8359d7a5244 --- /dev/null +++ b/database_manager/src/blobs_manager/mod.rs @@ -0,0 +1,6 @@ +pub mod cli; +pub mod export; +pub mod import; +pub mod verify; + +const DEFAULT_BEACON_NODE: &str = "http://localhost:5052"; diff --git a/database_manager/src/blobs_manager/verify.rs b/database_manager/src/blobs_manager/verify.rs new file mode 100644 index 00000000000..4a6b1ac40b6 --- /dev/null +++ b/database_manager/src/blobs_manager/verify.rs @@ -0,0 +1,51 @@ +use crate::blobs_manager::{cli::VerifyBlobs, DEFAULT_BEACON_NODE}; +use beacon_node::ClientConfig; +use eth2::{ + types::{ChainSpec, EthSpec, Slot}, + BeaconNodeHttpClient, SensitiveUrl, Timeouts, +}; +use slog::Logger; +use std::time::Duration; + +pub async fn verify_blobs( + config: &VerifyBlobs, + spec: &ChainSpec, + log: Logger, +) -> Result<(), String> { + let beacon_node = SensitiveUrl::parse( + &config + .beacon_node + .clone() + .unwrap_or(DEFAULT_BEACON_NODE.to_string()), + ) + .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; + let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); + + let deneb_start_slot = spec + .deneb_fork_epoch + .unwrap() // todo + .start_slot(E::slots_per_epoch()) + .as_u64(); + let start_slot = config.start_slot.unwrap_or(deneb_start_slot); + let end_slot = config.end_slot.unwrap(); + + if start_slot < deneb_start_slot { + return Err("Start slot cannot be pre-Deneb".to_string()); + } + if end_slot < start_slot { + return Err("End slot cannot be earlier than start slot".to_string()); + } + + let min_epochs_for_blob_sidecars_requests = spec.min_epochs_for_blob_sidecars_requests; + + let slots_verified = 0; + + let response = client + .get_lighthouse_database_verify_blobs(Slot::from(start_slot), Slot::from(end_slot)) + .await + .map_err(|e| format!("Failed to verify blobs: {e:?}"))?; + + eprintln!("Response: {:?}", response); + + Ok(()) +} diff --git a/database_manager/src/cli.rs b/database_manager/src/cli.rs index c62da1206f1..d6efd94ab5c 100644 --- a/database_manager/src/cli.rs +++ b/database_manager/src/cli.rs @@ -1,4 +1,5 @@ -pub use clap::{Arg, ArgAction, Args, Command, FromArgMatches, Parser}; +use crate::blobs_manager::cli::BlobsManager; +pub use clap::{Arg, ArgAction, Args, Command, FromArgMatches, Parser, Subcommand}; use clap_utils::get_color_style; use clap_utils::FLAG_HEADER; use serde::{Deserialize, Serialize}; @@ -80,6 +81,8 @@ pub enum DatabaseManagerSubcommand { PruneBlobs(PruneBlobs), PruneStates(PruneStates), Compact(Compact), + #[clap(subcommand)] + Blobs(BlobsManager), } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index bed90df9df0..5eb618ba962 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -1,4 +1,9 @@ +pub mod blobs_manager; pub mod cli; + +use crate::blobs_manager::{ + cli::BlobsManager, export::export_blobs, import::import_blobs, verify::verify_blobs, +}; use crate::cli::DatabaseManager; use crate::cli::Migrate; use crate::cli::PruneStates; @@ -483,6 +488,7 @@ pub fn run( ) -> Result<(), String> { let client_config = parse_client_config(cli_args, db_manager_config, &env)?; let context = env.core_context(); + let spec = context.eth2_config.spec.clone(); let log = context.log().clone(); let format_err = |e| format!("Fatal error: {:?}", e); @@ -536,5 +542,17 @@ pub fn run( let compact_config = parse_compact_config(compact_config)?; compact_db::(compact_config, client_config, log).map_err(format_err) } + cli::DatabaseManagerSubcommand::Blobs(blobs_manager_cmd) => match blobs_manager_cmd { + BlobsManager::Verify(verify_config) => { + env.runtime() + .block_on(verify_blobs::(verify_config, &spec.clone(), log)) + } + BlobsManager::Export(export_config) => env + .runtime() + .block_on(export_blobs::(export_config, log)), + BlobsManager::Import(import_config) => env + .runtime() + .block_on(import_blobs::(import_config, log)), + }, } } From 66a41bc2494e97b58ccbb6cd972f9c38570fa23e Mon Sep 17 00:00:00 2001 From: Mac L Date: Fri, 14 Mar 2025 05:47:08 +1100 Subject: [PATCH 4/8] Add verify draft --- beacon_node/http_api/src/lib.rs | 65 ++++++++++++++----- common/eth2/src/lighthouse.rs | 20 ++++-- .../eth2/src/lighthouse/blobs_verification.rs | 11 ++++ common/eth2/src/types.rs | 4 +- database_manager/src/blobs_manager/cli.rs | 21 ++++++ database_manager/src/blobs_manager/export.rs | 16 +++-- database_manager/src/blobs_manager/import.rs | 11 +++- database_manager/src/blobs_manager/mod.rs | 12 ++++ database_manager/src/blobs_manager/verify.rs | 46 +++++++------ 9 files changed, 155 insertions(+), 51 deletions(-) create mode 100644 common/eth2/src/lighthouse/blobs_verification.rs diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index daf5dcef5e4..6516dfc5dae 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -51,7 +51,10 @@ use eth2::types::{ LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId, ValidatorStatus, ValidatorsRequestBody, }; -use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; +use eth2::{ + lighthouse::BlobsVerificationData, CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, + SSZ_CONTENT_TYPE_HEADER, +}; use health_metrics::observe::Observe; use lighthouse_network::rpc::methods::MetaData; use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; @@ -4424,8 +4427,6 @@ pub fn serve( }, ); - // POST lighthouse/database/verify_blobs - // POST lighthouse/database/import_blobs let post_lighthouse_database_import_blobs = database_path .and(warp::path("import_blobs")) @@ -4529,32 +4530,62 @@ pub fn serve( |query: api_types::VerifyBlobsQuery, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { + task_spawner.spawn_async_with_rejection(Priority::P1, async move { let mut results = Vec::new(); - for slot in query.start_slot.as_u64()..=query.end_slot.as_u64() { + let deneb_start_slot = + if let Some(deneb_fork_epoch) = chain.spec.deneb_fork_epoch { + deneb_fork_epoch.start_slot(T::EthSpec::slots_per_epoch()) + } else { + return Err(warp_utils::reject::custom_bad_request( + "No Deneb fork scheduled".to_string(), + )); + }; + let start_slot = query.start_slot.unwrap_or(deneb_start_slot); + if start_slot < deneb_start_slot { + return Err(warp_utils::reject::custom_bad_request(format!( + "start_slot ({}) must be >= deneb fork slot ({})", + start_slot, deneb_start_slot + ))); + } + // Maybe use chain.canonical_head.cached_head().head_slot()?? + let Ok(current_head_slot) = chain.slot() else { + return Err(warp_utils::reject::custom_bad_request( + "Failed to get current head slot".to_string(), + )); + }; + + let end_slot = query.end_slot.unwrap_or(current_head_slot); + if end_slot < start_slot { + return Err(warp_utils::reject::custom_bad_request(format!( + "end_slot ({}) must be >= start_slot ({})", + end_slot, start_slot + ))); + } + + for slot in start_slot.as_u64()..=end_slot.as_u64() { if let Ok((root, _, _)) = BlockId::from_slot(Slot::from(slot)).root(&chain) { if let Ok(blob_list_res) = chain.store.get_blobs(&root) { if let Some(blob_list) = blob_list_res.blobs() { - if let Err(e) = + if let Err(_e) = verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { - results.push(format!( - "slot: {slot}, block_root: {root:?}, error: {e:?}" - )); + results.push(BlobsVerificationData { + block_root: root, + slot: slot.into(), + blobs_exist: true, + blobs_stored: false, + blobs_verified: true, + }); } } } } } - - if results.is_empty() { - Ok(api_types::GenericResponse::from( - "All blobs verified successfully".to_string(), - )) - } else { - Ok(api_types::GenericResponse::from(results.join("\n"))) - } + Ok::<_, warp::reject::Rejection>( + warp::reply::json(&api_types::GenericResponse::from(results)) + .into_response(), + ) }) }, ); diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index e8285dcf38e..45454ce1c0c 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -2,6 +2,7 @@ mod attestation_performance; pub mod attestation_rewards; +mod blobs_verification; mod block_packing_efficiency; mod block_rewards; mod standard_block_rewards; @@ -24,6 +25,7 @@ pub use attestation_performance::{ AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, }; pub use attestation_rewards::StandardAttestationRewards; +pub use blobs_verification::BlobsVerificationData; pub use block_packing_efficiency::{ BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, }; @@ -434,9 +436,9 @@ impl BeaconNodeHttpClient { /// `POST lighthouse/database/verify_blobs` pub async fn get_lighthouse_database_verify_blobs( &self, - start_slot: Slot, - end_slot: Slot, - ) -> Result { + start_slot: Option, + end_slot: Option, + ) -> Result, Error> { let mut path = self.server.full.clone(); path.path_segments_mut() @@ -445,9 +447,15 @@ impl BeaconNodeHttpClient { .push("database") .push("verify_blobs"); - path.query_pairs_mut() - .append_pair("start_slot", &start_slot.to_string()) - .append_pair("end_slot", &end_slot.to_string()); + if let Some(start_slot) = start_slot { + path.query_pairs_mut() + .append_pair("start_slot", &start_slot.to_string()); + } + + if let Some(end_slot) = end_slot { + path.query_pairs_mut() + .append_pair("end_slot", &end_slot.to_string()); + } self.get(path).await } diff --git a/common/eth2/src/lighthouse/blobs_verification.rs b/common/eth2/src/lighthouse/blobs_verification.rs new file mode 100644 index 00000000000..4acd2c38c6d --- /dev/null +++ b/common/eth2/src/lighthouse/blobs_verification.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; +use types::{Hash256, Slot}; + +#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] +pub struct BlobsVerificationData { + pub block_root: Hash256, + pub slot: Slot, + pub blobs_exist: bool, + pub blobs_stored: bool, + pub blobs_verified: bool, +} diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 69e68010e69..553035baa5e 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -818,8 +818,8 @@ pub struct ImportBlobsQuery { #[derive(Debug, Deserialize)] pub struct VerifyBlobsQuery { - pub start_slot: Slot, - pub end_slot: Slot, + pub start_slot: Option, + pub end_slot: Option, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] diff --git a/database_manager/src/blobs_manager/cli.rs b/database_manager/src/blobs_manager/cli.rs index 410ac7e7b41..201953c34b0 100644 --- a/database_manager/src/blobs_manager/cli.rs +++ b/database_manager/src/blobs_manager/cli.rs @@ -34,6 +34,13 @@ pub struct VerifyBlobs { display_order = 0 )] pub end_slot: Option, + #[clap( + long, + help = "Perform verification even if the beacon node is not synced", + display_order = 0, + default_value = "false" + )] + pub allow_unsynced: bool, } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] @@ -61,6 +68,13 @@ pub struct ImportBlobs { default_value = "false" )] pub skip_verification: bool, + #[clap( + long, + help = "Attempt import even if the beacon node is not synced", + display_order = 0, + default_value = "false" + )] + pub allow_unsynced: bool, } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] @@ -97,4 +111,11 @@ pub struct ExportBlobs { display_order = 0 )] pub end_slot: u64, + #[clap( + long, + help = "Attempt export even if the beacon node is not synced", + display_order = 0, + default_value = "false" + )] + pub allow_unsynced: bool, } diff --git a/database_manager/src/blobs_manager/export.rs b/database_manager/src/blobs_manager/export.rs index 67ad83fbabb..168102af57a 100644 --- a/database_manager/src/blobs_manager/export.rs +++ b/database_manager/src/blobs_manager/export.rs @@ -1,12 +1,11 @@ -use crate::blobs_manager::cli::ExportBlobs; +use crate::blobs_manager::{cli::ExportBlobs, ensure_node_synced}; use eth2::{ - types::{BlobSidecar, BlobSidecarList, BlockId, EthSpec, Slot}, + types::{BlobSidecarList, BlockId, EthSpec, Slot}, BeaconNodeHttpClient, Timeouts, }; use sensitive_url::SensitiveUrl; use slog::{info, warn, Logger}; -use ssz::{Decode, Encode}; -use std::sync::Arc; +use ssz::Encode; use std::time::Duration; pub async fn export_blobs(config: &ExportBlobs, log: Logger) -> Result<(), String> { @@ -15,6 +14,15 @@ pub async fn export_blobs(config: &ExportBlobs, log: Logger) -> Resu let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); + let (_, is_synced) = ensure_node_synced(&client).await?; + if !is_synced { + if config.allow_unsynced { + warn!(log, "Beacon node is not synced"); + } else { + return Err("Beacon node is not synced".to_string()); + } + } + let start_slot = config.start_slot; let end_slot = config.end_slot; diff --git a/database_manager/src/blobs_manager/import.rs b/database_manager/src/blobs_manager/import.rs index 98c076500a3..741e2c88b70 100644 --- a/database_manager/src/blobs_manager/import.rs +++ b/database_manager/src/blobs_manager/import.rs @@ -1,4 +1,4 @@ -use crate::blobs_manager::cli::ImportBlobs; +use crate::blobs_manager::{cli::ImportBlobs, ensure_node_synced}; use eth2::{types::EthSpec, BeaconNodeHttpClient, Timeouts}; use sensitive_url::SensitiveUrl; use slog::{info, warn, Logger}; @@ -10,6 +10,15 @@ pub async fn import_blobs(config: &ImportBlobs, log: Logger) -> Resu let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); + let (_, is_synced) = ensure_node_synced(&client).await?; + if !is_synced { + if config.allow_unsynced { + warn!(log, "Beacon node is not synced"); + } else { + return Err("Beacon node is not synced".to_string()); + } + } + let blobs_ssz = std::fs::read(&config.input_file) .map_err(|e| format!("Failed to read input file: {e:?}"))?; diff --git a/database_manager/src/blobs_manager/mod.rs b/database_manager/src/blobs_manager/mod.rs index 8359d7a5244..f8e60638ceb 100644 --- a/database_manager/src/blobs_manager/mod.rs +++ b/database_manager/src/blobs_manager/mod.rs @@ -3,4 +3,16 @@ pub mod export; pub mod import; pub mod verify; +use eth2::{types::Slot, BeaconNodeHttpClient}; + const DEFAULT_BEACON_NODE: &str = "http://localhost:5052"; + +async fn ensure_node_synced(client: &BeaconNodeHttpClient) -> Result<(Slot, bool), String> { + let res = client + .get_node_syncing() + .await + .map_err(|e| format!("{e:?}"))? + .data; + + Ok((res.head_slot, !res.is_syncing)) +} diff --git a/database_manager/src/blobs_manager/verify.rs b/database_manager/src/blobs_manager/verify.rs index 4a6b1ac40b6..9f9e9898df0 100644 --- a/database_manager/src/blobs_manager/verify.rs +++ b/database_manager/src/blobs_manager/verify.rs @@ -1,10 +1,10 @@ -use crate::blobs_manager::{cli::VerifyBlobs, DEFAULT_BEACON_NODE}; -use beacon_node::ClientConfig; +use crate::blobs_manager::{cli::VerifyBlobs, ensure_node_synced, DEFAULT_BEACON_NODE}; use eth2::{ + lighthouse::BlobsVerificationData, types::{ChainSpec, EthSpec, Slot}, BeaconNodeHttpClient, SensitiveUrl, Timeouts, }; -use slog::Logger; +use slog::{info, warn, Logger}; use std::time::Duration; pub async fn verify_blobs( @@ -21,31 +21,35 @@ pub async fn verify_blobs( .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); - let deneb_start_slot = spec - .deneb_fork_epoch - .unwrap() // todo - .start_slot(E::slots_per_epoch()) - .as_u64(); - let start_slot = config.start_slot.unwrap_or(deneb_start_slot); - let end_slot = config.end_slot.unwrap(); - - if start_slot < deneb_start_slot { - return Err("Start slot cannot be pre-Deneb".to_string()); - } - if end_slot < start_slot { - return Err("End slot cannot be earlier than start slot".to_string()); + let (_head_slot, is_synced) = ensure_node_synced(&client).await?; + if !is_synced { + if config.allow_unsynced { + warn!(log, "Beacon node is not synced"); + } else { + return Err("Beacon node is not synced".to_string()); + } } - let min_epochs_for_blob_sidecars_requests = spec.min_epochs_for_blob_sidecars_requests; + let _min_epochs_for_blob_sidecars_requests = spec.min_epochs_for_blob_sidecars_requests; - let slots_verified = 0; + let _slots_verified = 0; - let response = client - .get_lighthouse_database_verify_blobs(Slot::from(start_slot), Slot::from(end_slot)) + let verification_data: Vec = client + .get_lighthouse_database_verify_blobs( + config.start_slot.map(Slot::from), + config.end_slot.map(Slot::from), + ) .await .map_err(|e| format!("Failed to verify blobs: {e:?}"))?; - eprintln!("Response: {:?}", response); + let mut missing_slots = vec![]; + for data in verification_data { + if data.blobs_exist && !data.blobs_stored { + missing_slots.push(data.slot); + } + } + + info!(log, "Missing slots: {}", missing_slots.len()); Ok(()) } From 4559e2aab58fef81d20cb7d591487f99177cf08c Mon Sep 17 00:00:00 2001 From: Mac L Date: Tue, 3 Jun 2025 23:56:29 +1000 Subject: [PATCH 5/8] Update exporter --- database_manager/src/blobs_manager/cli.rs | 30 +++++-- database_manager/src/blobs_manager/export.rs | 90 ++++++++++++++++++-- 2 files changed, 107 insertions(+), 13 deletions(-) diff --git a/database_manager/src/blobs_manager/cli.rs b/database_manager/src/blobs_manager/cli.rs index 201953c34b0..da8c2d5422b 100644 --- a/database_manager/src/blobs_manager/cli.rs +++ b/database_manager/src/blobs_manager/cli.rs @@ -95,22 +95,42 @@ pub struct ExportBlobs { display_order = 0 )] pub output_dir: PathBuf, - #[clap( long, value_name = "SLOT", help = "The slot at which to start exporting blobs from.", - display_order = 0 + display_order = 0, + conflicts_with_all = &["start_epoch", "end_epoch"] )] - pub start_slot: u64, + pub start_slot: Option, #[clap( long, value_name = "SLOT", help = "The slot at which to stop exporting blobs to (inclusive).", - display_order = 0 + display_order = 0, + conflicts_with_all = &["start_epoch", "end_epoch"] + )] + pub end_slot: Option, + + #[clap( + long, + value_name = "EPOCH", + help = "The epoch at which to start exporting blobs from.", + display_order = 0, + conflicts_with_all = &["start_slot", "end_slot"] )] - pub end_slot: u64, + pub start_epoch: Option, + + #[clap( + long, + value_name = "EPOCH", + help = "The epoch at which to stop exporting blobs to (inclusive).", + display_order = 0, + conflicts_with_all = &["start_slot", "end_slot"] + )] + pub end_epoch: Option, + #[clap( long, help = "Attempt export even if the beacon node is not synced", diff --git a/database_manager/src/blobs_manager/export.rs b/database_manager/src/blobs_manager/export.rs index aa2a5045d19..c8a16c11f51 100644 --- a/database_manager/src/blobs_manager/export.rs +++ b/database_manager/src/blobs_manager/export.rs @@ -1,6 +1,6 @@ use crate::blobs_manager::{cli::ExportBlobs, ensure_node_synced}; use eth2::{ - types::{BlobSidecarList, BlockId, ChainSpec, EthSpec, Slot}, + types::{BlobSidecarList, BlockId, ChainSpec, Epoch, EthSpec, Slot}, BeaconNodeHttpClient, Timeouts, }; use sensitive_url::SensitiveUrl; @@ -8,6 +8,21 @@ use ssz::Encode; use std::time::Duration; use tracing::{info, warn}; +#[derive(PartialEq, Eq)] +enum ExportMode { + Epochs, + Slots, +} + +impl std::fmt::Display for ExportMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ExportMode::Epochs => write!(f, "epoch"), + ExportMode::Slots => write!(f, "slot"), + } + } +} + pub async fn export_blobs( config: &ExportBlobs, spec: &ChainSpec, @@ -26,22 +41,81 @@ pub async fn export_blobs( } } - let start_slot = config.start_slot; - let end_slot = config.end_slot; + // Ensure Deneb fork is enabled. + let deneb_fork_epoch = if let Some(deneb_fork_epoch) = spec.deneb_fork_epoch { + deneb_fork_epoch.as_u64() + } else { + return Err("Deneb fork epoch not set in chain spec".to_string()); + }; + + let mut export_mode = ExportMode::Epochs; + + // Export either epochs or slots. Defaults to epochs. + let start = if let Some(start_epoch) = config.start_epoch { + start_epoch + } else if let Some(start_slot) = config.start_slot { + // Since start_slot and start_epoch are mutually exclusive, we can safely assume that we are in slot mode. + export_mode = ExportMode::Slots; + start_slot + } else { + deneb_fork_epoch + }; + + let end = if let Some(end_epoch) = config.end_epoch { + end_epoch + } else if let Some(end_slot) = config.end_slot { + end_slot + } else { + return Err(format!("End {export_mode} not set")); + }; + + if end <= start { + return Err(format!( + "End {export_mode} must be greater than start {export_mode}" + )); + } + + // Ensure start is at or after Deneb fork + if export_mode == ExportMode::Epochs { + if start < deneb_fork_epoch { + return Err(format!( + "Start epoch {} is before Deneb fork epoch {}", + start, deneb_fork_epoch + )); + } + } else { + let deneb_start_slot = Epoch::new(deneb_fork_epoch) + .start_slot(E::slots_per_epoch()) + .as_u64(); + if start < deneb_start_slot { + return Err(format!( + "Start slot {} is before Deneb fork start slot {}", + start, deneb_start_slot + )); + } + } if !config.output_dir.is_dir() { return Err("Please set `--output-dir` to a valid directory.".to_string()); } - let filename = config - .output_dir - .join(format!("{start_slot}_{end_slot}.ssz")); + let filename = config.output_dir.join(format!("{start}_{end}.ssz")); // TODO(blob_manager): Ensure node is synced for start_slot -> end_slot. let mut blobs_to_export: Vec> = vec![]; - info!(end_slot, start_slot, output_dir = ?config.output_dir, "Beginning blob export"); + // Generate start and end slots for each mode. + let (start_slot, end_slot) = if export_mode == ExportMode::Epochs { + info!(start_epoch = start, end_epoch = end, output_dir = ?config.output_dir, "Beginning blob export"); + ( + Epoch::new(start).start_slot(E::slots_per_epoch()).as_u64(), + Epoch::new(end).end_slot(E::slots_per_epoch()).as_u64(), + ) + } else { + info!(start_slot = start, end_slot = end, output_dir = ?config.output_dir, "Beginning blob export"); + (start, end) + }; for slot in start_slot..=end_slot { if let Some(blobs) = client @@ -70,7 +144,7 @@ pub async fn export_blobs( "Completed blob export" ); } else { - warn!("No blobs were found for this slot range"); + warn!("No blobs were found for this {} range", export_mode); } Ok(()) From 7d1ae6e60c402fdabb31d76796df590602d5f680 Mon Sep 17 00:00:00 2001 From: Mac L Date: Wed, 4 Jun 2025 02:48:26 +1000 Subject: [PATCH 6/8] Add blob database verifier --- beacon_node/http_api/src/lib.rs | 72 ++++++---- common/eth2/src/lighthouse.rs | 22 +-- .../eth2/src/lighthouse/blobs_verification.rs | 9 +- common/eth2/src/types.rs | 5 +- database_manager/src/blobs_manager/cli.rs | 15 +- database_manager/src/blobs_manager/verify.rs | 129 ++++++++++++++++-- 6 files changed, 196 insertions(+), 56 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b3422da520f..e52648dd138 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -79,6 +79,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; +use store::BlobSidecarListFromRoot; use sysinfo::{System, SystemExt}; use system_health::{observe_nat, observe_system_health_bn}; use task_spawner::{Priority, TaskSpawner}; @@ -4747,7 +4748,6 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>| { task_spawner.spawn_async_with_rejection(Priority::P1, async move { - let mut results = Vec::new(); let deneb_start_slot = if let Some(deneb_fork_epoch) = chain.spec.deneb_fork_epoch { deneb_fork_epoch.start_slot(T::EthSpec::slots_per_epoch()) @@ -4756,21 +4756,16 @@ pub fn serve( "No Deneb fork scheduled".to_string(), )); }; - let start_slot = query.start_slot.unwrap_or(deneb_start_slot); + let start_slot = query.start_slot; + if start_slot < deneb_start_slot { return Err(warp_utils::reject::custom_bad_request(format!( "start_slot ({}) must be >= deneb fork slot ({})", start_slot, deneb_start_slot ))); } - // Maybe use chain.canonical_head.cached_head().head_slot()?? - let Ok(current_head_slot) = chain.slot() else { - return Err(warp_utils::reject::custom_bad_request( - "Failed to get current head slot".to_string(), - )); - }; - let end_slot = query.end_slot.unwrap_or(current_head_slot); + let end_slot = query.end_slot; if end_slot < start_slot { return Err(warp_utils::reject::custom_bad_request(format!( "end_slot ({}) must be >= start_slot ({})", @@ -4778,29 +4773,60 @@ pub fn serve( ))); } + let verify = query.verify.unwrap_or(true); + + let mut blob_count = 0; + let mut blobs_missing: Vec = Vec::new(); + let mut blobs_invalid: Vec = Vec::new(); + for slot in start_slot.as_u64()..=end_slot.as_u64() { if let Ok((root, _, _)) = BlockId::from_slot(Slot::from(slot)).root(&chain) { - if let Ok(blob_list_res) = chain.store.get_blobs(&root) { - if let Some(blob_list) = blob_list_res.blobs() { - if let Err(_e) = - verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) - { - results.push(BlobsVerificationData { - block_root: root, - slot: slot.into(), - blobs_exist: true, - blobs_stored: false, - blobs_verified: true, - }); + match chain.store.get_blobs(&root) { + Ok(blob_list) => { + match blob_list { + BlobSidecarListFromRoot::NoBlobs => { + // This means that no blobs exist for this slot. + continue; + } + BlobSidecarListFromRoot::NoRoot => { + // This means that there are blobs missing for this slot. + blobs_missing.push(slot); + } + BlobSidecarListFromRoot::Blobs(blob_list) => { + blob_count += blob_list.len(); + // Optionally verify each blob_list. + if verify + && verify_kzg_for_blob_list( + blob_list.iter(), + &chain.kzg, + ) + .is_err() + { + blobs_invalid.push(slot); + } + } } } + Err(_) => { + // An error here means that we could not decode the blob list. + // This likely means a corrupted database. + blobs_invalid.push(slot); + } } } + // An Err here means the block does not exist. This is fine assuming the node is synced. } + Ok::<_, warp::reject::Rejection>( - warp::reply::json(&api_types::GenericResponse::from(results)) - .into_response(), + warp::reply::json(&api_types::GenericResponse::from( + BlobsVerificationData { + blob_count, + blobs_missing, + blobs_invalid, + }, + )) + .into_response(), ) }) }, diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 1e882e5aa75..f2cb0161b90 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -395,12 +395,13 @@ impl BeaconNodeHttpClient { self.post_generic_with_ssz_body(path, blobs, None).await } - /// `POST lighthouse/database/verify_blobs` + /// `GET lighthouse/database/verify_blobs` pub async fn get_lighthouse_database_verify_blobs( &self, - start_slot: Option, - end_slot: Option, - ) -> Result, Error> { + start_slot: Slot, + end_slot: Slot, + verify: Option, + ) -> Result { let mut path = self.server.full.clone(); path.path_segments_mut() @@ -409,14 +410,15 @@ impl BeaconNodeHttpClient { .push("database") .push("verify_blobs"); - if let Some(start_slot) = start_slot { - path.query_pairs_mut() - .append_pair("start_slot", &start_slot.to_string()); - } + path.query_pairs_mut() + .append_pair("start_slot", &start_slot.to_string()); - if let Some(end_slot) = end_slot { + path.query_pairs_mut() + .append_pair("end_slot", &end_slot.to_string()); + + if let Some(verify) = verify { path.query_pairs_mut() - .append_pair("end_slot", &end_slot.to_string()); + .append_pair("verify", &verify.to_string()); } self.get(path).await diff --git a/common/eth2/src/lighthouse/blobs_verification.rs b/common/eth2/src/lighthouse/blobs_verification.rs index 4acd2c38c6d..ed357291e0a 100644 --- a/common/eth2/src/lighthouse/blobs_verification.rs +++ b/common/eth2/src/lighthouse/blobs_verification.rs @@ -1,11 +1,8 @@ use serde::{Deserialize, Serialize}; -use types::{Hash256, Slot}; #[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] pub struct BlobsVerificationData { - pub block_root: Hash256, - pub slot: Slot, - pub blobs_exist: bool, - pub blobs_stored: bool, - pub blobs_verified: bool, + pub blob_count: usize, + pub blobs_missing: Vec, + pub blobs_invalid: Vec, } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 36f6da194ff..b9004d970ee 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -821,8 +821,9 @@ pub struct ImportBlobsQuery { #[derive(Debug, Deserialize)] pub struct VerifyBlobsQuery { - pub start_slot: Option, - pub end_slot: Option, + pub start_slot: Slot, + pub end_slot: Slot, + pub verify: Option, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] diff --git a/database_manager/src/blobs_manager/cli.rs b/database_manager/src/blobs_manager/cli.rs index da8c2d5422b..2673262f876 100644 --- a/database_manager/src/blobs_manager/cli.rs +++ b/database_manager/src/blobs_manager/cli.rs @@ -20,6 +20,7 @@ pub struct VerifyBlobs { display_order = 0 )] pub beacon_node: Option, + #[clap( long, value_name = "SLOT", @@ -27,6 +28,7 @@ pub struct VerifyBlobs { display_order = 0 )] pub start_slot: Option, + #[clap( long, value_name = "SLOT", @@ -34,13 +36,22 @@ pub struct VerifyBlobs { display_order = 0 )] pub end_slot: Option, + #[clap( long, - help = "Perform verification even if the beacon node is not synced", + help = "Perform checks even if the beacon node is not synced", display_order = 0, default_value = "false" )] pub allow_unsynced: bool, + + #[clap( + long, + help = "Skip KZG verification and only perform blob availability checks", + display_order = 0, + default_value = "false" + )] + pub skip_verification: bool, } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] @@ -68,6 +79,7 @@ pub struct ImportBlobs { default_value = "false" )] pub skip_verification: bool, + #[clap( long, help = "Attempt import even if the beacon node is not synced", @@ -95,6 +107,7 @@ pub struct ExportBlobs { display_order = 0 )] pub output_dir: PathBuf, + #[clap( long, value_name = "SLOT", diff --git a/database_manager/src/blobs_manager/verify.rs b/database_manager/src/blobs_manager/verify.rs index 4b1b3a7d35f..2265d1e690b 100644 --- a/database_manager/src/blobs_manager/verify.rs +++ b/database_manager/src/blobs_manager/verify.rs @@ -1,11 +1,16 @@ use crate::blobs_manager::{cli::VerifyBlobs, ensure_node_synced, DEFAULT_BEACON_NODE}; use eth2::{ - lighthouse::BlobsVerificationData, types::{ChainSpec, EthSpec, Slot}, BeaconNodeHttpClient, SensitiveUrl, Timeouts, }; use std::time::Duration; -use tracing::{info, warn}; +use tracing::{error, info, warn}; + +#[derive(Debug, PartialEq, Eq)] +enum BlobErrorType { + Invalid, + Missing, +} pub async fn verify_blobs( config: &VerifyBlobs, @@ -20,7 +25,7 @@ pub async fn verify_blobs( .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); - let (_head_slot, is_synced) = ensure_node_synced(&client).await?; + let (head_slot, is_synced) = ensure_node_synced(&client).await?; if !is_synced { if config.allow_unsynced { warn!("Beacon node is not synced"); @@ -29,26 +34,122 @@ pub async fn verify_blobs( } } + let deneb_start_slot = if let Some(deneb_fork_epoch) = spec.deneb_fork_epoch { + deneb_fork_epoch.start_slot(E::slots_per_epoch()) + } else { + return Err("Deneb fork not set in spec".to_string()); + }; + + // I believe this is the blob expiry window. let _min_epochs_for_blob_sidecars_requests = spec.min_epochs_for_blob_sidecars_requests; - let _slots_verified = 0; + // Get start and end slots depending on config, + let start_slot = config + .start_slot + .map(Slot::from) + .unwrap_or(deneb_start_slot); + let end_slot = config.end_slot.map(Slot::from).unwrap_or(head_slot); + + let verify = !config.skip_verification; + + if !verify { + warn!("Skipping verification") + } + + info!( + start_slot = start_slot.as_u64(), + end_slot = end_slot.as_u64(), + verify, + "Checking blobs in range" + ); - let verification_data: Vec = client - .get_lighthouse_database_verify_blobs( - config.start_slot.map(Slot::from), - config.end_slot.map(Slot::from), - ) + let blobs_verification_data = client + .get_lighthouse_database_verify_blobs(start_slot, end_slot, Some(verify)) .await .map_err(|e| format!("Failed to verify blobs: {e:?}"))?; - let mut missing_slots = vec![]; - for data in verification_data { - if data.blobs_exist && !data.blobs_stored { - missing_slots.push(data.slot); + // Total number of individual blobs found (including invalid ones). + let blob_count = blobs_verification_data.blob_count; + // Number of slots which contain missing blobs. + let blobs_missing = blobs_verification_data.blobs_missing.len(); + // Number of slots which contain invalid blobs. + let blobs_invalid = blobs_verification_data.blobs_invalid.len(); + + log_slot_ranges( + blobs_verification_data.blobs_missing, + BlobErrorType::Missing, + ); + log_slot_ranges( + blobs_verification_data.blobs_invalid, + BlobErrorType::Invalid, + ); + + info!("Checks complete."); + info!( + "Slot range: {}-{}, Number of blobs: {}", + start_slot.as_u64(), + end_slot.as_u64(), + blob_count + ); + + if verify { + info!("Verification complete."); + if blobs_invalid == 0 { + info!("All blobs verified"); + } else { + error!("Invalid blobs: {}", blobs_invalid); } } - info!(missing_slots = missing_slots.len(), "Slots missing"); + if blobs_missing > 0 { + warn!("Missing blobs: {}", blobs_missing); + } Ok(()) } + +fn log_slot_ranges(slots: Vec, error_type: BlobErrorType) { + if slots.is_empty() { + return; + } + + // This may be unnecessary. + let mut sorted_slots = slots; + sorted_slots.sort(); + + let mut range_start = sorted_slots[0]; + let mut range_end = sorted_slots[0]; + + for &slot in sorted_slots.iter().skip(1) { + if slot == range_end + 1 { + range_end = slot; + } else { + if range_start == range_end { + if error_type == BlobErrorType::Invalid { + error!("{:?} slot: {}", error_type, range_start); + } else { + warn!("{:?} slot: {}", error_type, range_start); + } + } else if error_type == BlobErrorType::Invalid { + error!("{:?} slot range: {}-{}", error_type, range_start, range_end); + } else { + warn!("{:?} slot range: {}-{}", error_type, range_start, range_end); + } + range_start = slot; + range_end = slot; + } + } + + // Log the final range. + if range_start == range_end { + if error_type == BlobErrorType::Invalid { + error!("{:?} slot: {}", error_type, range_start); + } else { + warn!("{:?} slot: {}", error_type, range_start); + } + } else if error_type == BlobErrorType::Invalid { + error!("{:?} slot range: {}-{}", error_type, range_start, range_end); + } else { + warn!("{:?} slot range: {}-{}", error_type, range_start, range_end); + } +} From 8c506db0bd499e2b24be2403beb1377d31021663 Mon Sep 17 00:00:00 2001 From: Mac L Date: Wed, 4 Jun 2025 03:19:27 +1000 Subject: [PATCH 7/8] Handle empty blobsidecarlists --- beacon_node/http_api/src/lib.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index e52648dd138..412b758ccc4 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4705,15 +4705,26 @@ pub fn serve( } // Build `BlobSidecarList`s from the `Vec`s. - let blob_sidecar_lists: Vec> = blob_lists + let blob_sidecar_lists: Result< + Vec>, + warp::Rejection, + > = blob_lists .into_iter() .map(|blob_sidecars| { + let first_blob_sidecar = blob_sidecars.first().ok_or_else(|| { + warp_utils::reject::custom_server_error( + "Blob sidecar list must not be empty".to_string(), + ) + })?; + let max_blobs_at_epoch = - chain.spec.max_blobs_per_block(blob_sidecars[0].epoch()) as usize; - BlobSidecarList::new(blob_sidecars, max_blobs_at_epoch) + chain.spec.max_blobs_per_block(first_blob_sidecar.epoch()) as usize; + BlobSidecarList::new(blob_sidecars, max_blobs_at_epoch).map_err(|e| { + warp_utils::reject::custom_server_error(format!("{e:?}")) + }) }) - .collect::, _>>() - .map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?; + .collect(); + let blob_sidecar_lists = blob_sidecar_lists?; if query.verify == Some(true) { for blob_list in &blob_sidecar_lists { From 66da335f50e32d7e6254b56386b4341561cb5c8e Mon Sep 17 00:00:00 2001 From: Mac L Date: Wed, 11 Jun 2025 04:18:44 +1000 Subject: [PATCH 8/8] Cleanup --- beacon_node/http_api/src/lib.rs | 44 +++----------- beacon_node/store/src/hot_cold_store.rs | 2 + common/eth2/src/lighthouse.rs | 8 +-- database_manager/src/blobs_manager/cli.rs | 20 ++++++- database_manager/src/blobs_manager/export.rs | 27 +++++---- database_manager/src/blobs_manager/import.rs | 62 ++++++++++++++------ database_manager/src/blobs_manager/verify.rs | 43 +++++++++----- 7 files changed, 117 insertions(+), 89 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 412b758ccc4..81fed9d0ba0 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4644,41 +4644,6 @@ pub fn serve( }, ); - // POST lighthouse/database/import_blobs - let post_lighthouse_database_import_blobs = database_path - .and(warp::path("import_blobs")) - .and(warp::path::end()) - .and(warp::query::()) - .and(warp_utils::json::json()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |query: api_types::ImportBlobsQuery, - blob_lists: Vec>, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { - if query.verify == Some(true) { - for blob_list in &blob_lists { - match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { - Ok(()) => (), - Err(e) => { - return Err(warp_utils::reject::custom_server_error(format!( - "{e:?}" - ))) - } - } - } - } - - match chain.store.import_blobs_batch(blob_lists) { - Ok(()) => Ok(()), - Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), - } - }) - }, - ); - // POST lighthouse/database/import_blobs_ssz let post_lighthouse_database_import_blobs_ssz = database_path .and(warp::path("import_blobs_ssz")) @@ -4736,6 +4701,14 @@ pub fn serve( ))) } } + + for blob_sidecar in blob_list { + if !blob_sidecar.verify_blob_sidecar_inclusion_proof() { + return Err(warp_utils::reject::custom_server_error( + "Found an invalid blob sidecar inclusion proof".to_string(), + )); + } + } } } @@ -5194,7 +5167,6 @@ pub fn serve( .uor(post_validator_liveness_epoch) .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) - .uor(post_lighthouse_database_import_blobs) .uor(post_lighthouse_database_import_blobs_ssz) .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 5bc3c5a3daa..f12d019ec83 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -884,6 +884,8 @@ impl, Cold: ItemStore> HotColdDB return Err(Error::InvalidBlobImport("Missing block".to_string())); } + // Instead of the above, fully load the block. + // Check if a `blob_sidecar_list` is already stored for this block root. match self.get_blobs(&block_root) { Ok(BlobSidecarListFromRoot::Blobs(existing_blob_sidecar_list)) => { diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index f2cb0161b90..178f9994e69 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -19,6 +19,7 @@ use proto_array::core::ProtoArray; use serde::{Deserialize, Serialize}; use ssz::four_byte_option_impl; use ssz_derive::{Decode, Encode}; +use std::time::Duration; pub use attestation_performance::{ AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, @@ -376,10 +377,9 @@ impl BeaconNodeHttpClient { pub async fn post_lighthouse_database_import_blobs_ssz( &self, blobs: Bytes, - skip_verification: bool, + verify: Option, ) -> Result { let mut path = self.server.full.clone(); - let verify = !skip_verification; path.path_segments_mut() .map_err(|()| Error::InvalidUrl(self.server.clone()))? @@ -387,7 +387,7 @@ impl BeaconNodeHttpClient { .push("database") .push("import_blobs_ssz"); - if skip_verification { + if let Some(verify) = verify { path.query_pairs_mut() .append_pair("verify", &verify.to_string()); } @@ -421,7 +421,7 @@ impl BeaconNodeHttpClient { .append_pair("verify", &verify.to_string()); } - self.get(path).await + self.get_with_timeout(path, Duration::MAX).await } /// `POST lighthouse/add_peer` diff --git a/database_manager/src/blobs_manager/cli.rs b/database_manager/src/blobs_manager/cli.rs index 2673262f876..51348f4b12f 100644 --- a/database_manager/src/blobs_manager/cli.rs +++ b/database_manager/src/blobs_manager/cli.rs @@ -63,7 +63,7 @@ pub struct ImportBlobs { help = "The beacon node to import blobs to", display_order = 0 )] - pub beacon_node: String, + pub beacon_node: Option, #[clap( long, @@ -73,9 +73,19 @@ pub struct ImportBlobs { )] pub input_file: PathBuf, + #[clap( + long, + value_name = "SIZE", + help = "Chunk size in slots for importing blobs", + display_order = 0, + default_value = "320" + )] + pub chunk_size_slots: usize, + #[clap( long, help = "Skip verification of blobs before import", + display_order = 0, default_value = "false" )] pub skip_verification: bool, @@ -95,10 +105,10 @@ pub struct ExportBlobs { #[clap( long, value_name = "URL", - help = "The beacon node to export blobs from.", + help = "The beacon node to export blobs from. Defaults to http://localhost:5052.", display_order = 0 )] - pub beacon_node: String, + pub beacon_node: Option, #[clap( long, @@ -113,6 +123,7 @@ pub struct ExportBlobs { value_name = "SLOT", help = "The slot at which to start exporting blobs from.", display_order = 0, + requires = "end_slot", conflicts_with_all = &["start_epoch", "end_epoch"] )] pub start_slot: Option, @@ -122,6 +133,7 @@ pub struct ExportBlobs { value_name = "SLOT", help = "The slot at which to stop exporting blobs to (inclusive).", display_order = 0, + requires = "start_slot", conflicts_with_all = &["start_epoch", "end_epoch"] )] pub end_slot: Option, @@ -131,6 +143,7 @@ pub struct ExportBlobs { value_name = "EPOCH", help = "The epoch at which to start exporting blobs from.", display_order = 0, + requires = "end_epoch", conflicts_with_all = &["start_slot", "end_slot"] )] pub start_epoch: Option, @@ -140,6 +153,7 @@ pub struct ExportBlobs { value_name = "EPOCH", help = "The epoch at which to stop exporting blobs to (inclusive).", display_order = 0, + requires = "start_epoch", conflicts_with_all = &["start_slot", "end_slot"] )] pub end_epoch: Option, diff --git a/database_manager/src/blobs_manager/export.rs b/database_manager/src/blobs_manager/export.rs index c8a16c11f51..d375c6cfa83 100644 --- a/database_manager/src/blobs_manager/export.rs +++ b/database_manager/src/blobs_manager/export.rs @@ -1,4 +1,4 @@ -use crate::blobs_manager::{cli::ExportBlobs, ensure_node_synced}; +use crate::blobs_manager::{cli::ExportBlobs, ensure_node_synced, DEFAULT_BEACON_NODE}; use eth2::{ types::{BlobSidecarList, BlockId, ChainSpec, Epoch, EthSpec, Slot}, BeaconNodeHttpClient, Timeouts, @@ -27,17 +27,22 @@ pub async fn export_blobs( config: &ExportBlobs, spec: &ChainSpec, ) -> Result<(), String> { - let beacon_node = SensitiveUrl::parse(&config.beacon_node) - .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; + let beacon_node = SensitiveUrl::parse( + &config + .beacon_node + .clone() + .unwrap_or(DEFAULT_BEACON_NODE.to_string()), + ) + .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); - let (_, is_synced) = ensure_node_synced(&client).await?; - if !is_synced { - if config.allow_unsynced { - warn!("Beacon node is not synced"); - } else { - return Err("Beacon node is not synced".to_string()); + if !config.allow_unsynced { + let (_, is_synced) = ensure_node_synced(&client).await?; + if !is_synced { + return Err( + "Beacon node is not synced. Use --allow-unsynced to skip this check.".to_string(), + ); } } @@ -69,9 +74,9 @@ pub async fn export_blobs( return Err(format!("End {export_mode} not set")); }; - if end <= start { + if end < start { return Err(format!( - "End {export_mode} must be greater than start {export_mode}" + "End {export_mode} must be greater than or equal to start {export_mode}" )); } diff --git a/database_manager/src/blobs_manager/import.rs b/database_manager/src/blobs_manager/import.rs index b8bb097d4c9..2eff1c7cd19 100644 --- a/database_manager/src/blobs_manager/import.rs +++ b/database_manager/src/blobs_manager/import.rs @@ -1,40 +1,64 @@ -use crate::blobs_manager::{cli::ImportBlobs, ensure_node_synced}; +use crate::blobs_manager::{cli::ImportBlobs, ensure_node_synced, DEFAULT_BEACON_NODE}; use eth2::{types::EthSpec, BeaconNodeHttpClient, Timeouts}; use sensitive_url::SensitiveUrl; +use ssz::{Decode, Encode}; +use std::sync::Arc; use std::time::Duration; -use tracing::{info, warn}; +use tracing::{error, info, warn}; +use types::BlobSidecar; pub async fn import_blobs(config: &ImportBlobs) -> Result<(), String> { - let beacon_node = SensitiveUrl::parse(&config.beacon_node) - .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; + let beacon_node = SensitiveUrl::parse( + &config + .beacon_node + .clone() + .unwrap_or(DEFAULT_BEACON_NODE.to_string()), + ) + .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); - let (_, is_synced) = ensure_node_synced(&client).await?; - if !is_synced { - if config.allow_unsynced { - warn!("Beacon node is not synced"); - } else { - return Err("Beacon node is not synced".to_string()); + if !config.allow_unsynced { + let (_, is_synced) = ensure_node_synced(&client).await?; + if !is_synced { + return Err( + "Beacon node is not synced. Use --allow-unsynced to skip this check.".to_string(), + ); } } let blobs_ssz = std::fs::read(&config.input_file) .map_err(|e| format!("Failed to read input file: {e:?}"))?; - // TODO(blob_manager): We could _technically_ parse the slot numbers from the SSZ file - // generated during export. - info!("Beginning blob import"); - if config.skip_verification { - warn!("Skipping blob verification"); + let verify = !config.skip_verification; + + if !verify { + warn!("Skipping blob verification") } - client - .post_lighthouse_database_import_blobs_ssz(blobs_ssz.into(), config.skip_verification) - .await - .map_err(|e| format!("Failed to import blobs: {e:?}"))?; + let blobs_vec = Vec::>>>::from_ssz_bytes(&blobs_ssz) + .map_err(|e| format!("Failed to decode blobs: {e:?}"))?; + + let chunks = blobs_vec.chunks(config.chunk_size_slots); + + for chunk in chunks { + match client + .post_lighthouse_database_import_blobs_ssz( + chunk.iter().collect::>().as_ssz_bytes().into(), + Some(!config.skip_verification), + ) + .await + .map_err(|e| format!("Failed to import blobs: {e:?}")) + { + Ok(_) => {} + Err(e) => { + error!("Some or all blobs failed to import. Please check the input file and try again."); + return Err(e); + } + } + } if !config.skip_verification { info!("All blobs successfully verified"); diff --git a/database_manager/src/blobs_manager/verify.rs b/database_manager/src/blobs_manager/verify.rs index 2265d1e690b..f1653781602 100644 --- a/database_manager/src/blobs_manager/verify.rs +++ b/database_manager/src/blobs_manager/verify.rs @@ -6,6 +6,8 @@ use eth2::{ use std::time::Duration; use tracing::{error, info, warn}; +const DEFAULT_SLOT_RANGE: u64 = 3200; + #[derive(Debug, PartialEq, Eq)] enum BlobErrorType { Invalid, @@ -25,14 +27,20 @@ pub async fn verify_blobs( .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); - let (head_slot, is_synced) = ensure_node_synced(&client).await?; - if !is_synced { - if config.allow_unsynced { - warn!("Beacon node is not synced"); - } else { - return Err("Beacon node is not synced".to_string()); + let head_slot = if !config.allow_unsynced { + let (head_slot, is_synced) = ensure_node_synced(&client).await?; + if !is_synced { + return Err( + "Beacon node is not synced. Use --allow-unsynced to skip this check.".to_string(), + ); } - } + Some(head_slot) + } else { + if config.end_slot.is_none() { + warn!("Skipping sync check so no head slot can be computed. Using default range (100 epochs)."); + } + None + }; let deneb_start_slot = if let Some(deneb_fork_epoch) = spec.deneb_fork_epoch { deneb_fork_epoch.start_slot(E::slots_per_epoch()) @@ -40,7 +48,7 @@ pub async fn verify_blobs( return Err("Deneb fork not set in spec".to_string()); }; - // I believe this is the blob expiry window. + // I believe this is the blob expiry window. We might want to add a mode which only checks these blobs. let _min_epochs_for_blob_sidecars_requests = spec.min_epochs_for_blob_sidecars_requests; // Get start and end slots depending on config, @@ -48,7 +56,11 @@ pub async fn verify_blobs( .start_slot .map(Slot::from) .unwrap_or(deneb_start_slot); - let end_slot = config.end_slot.map(Slot::from).unwrap_or(head_slot); + + let end_slot = config + .end_slot + .map(Slot::from) + .unwrap_or(head_slot.unwrap_or(start_slot + DEFAULT_SLOT_RANGE)); let verify = !config.skip_verification; @@ -108,19 +120,18 @@ pub async fn verify_blobs( Ok(()) } -fn log_slot_ranges(slots: Vec, error_type: BlobErrorType) { +fn log_slot_ranges(mut slots: Vec, error_type: BlobErrorType) { if slots.is_empty() { return; } - // This may be unnecessary. - let mut sorted_slots = slots; - sorted_slots.sort(); + // We can probably skip sorting since it _should_ arrived pre-sorted from the API. + slots.sort_unstable(); - let mut range_start = sorted_slots[0]; - let mut range_end = sorted_slots[0]; + let mut range_start = slots[0]; + let mut range_end = slots[0]; - for &slot in sorted_slots.iter().skip(1) { + for &slot in slots.iter().skip(1) { if slot == range_end + 1 { range_end = slot; } else {