diff --git a/Cargo.lock b/Cargo.lock index 48a39cf304d..8cb44e53d2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2206,7 +2206,10 @@ dependencies = [ "clap", "clap_utils", "environment", + "eth2", + "ethereum_ssz", "hex", + "sensitive_url", "serde", "store", "strum", @@ -2834,6 +2837,7 @@ dependencies = [ name = "eth2" version = "0.1.0" dependencies = [ + "bytes", "derivative", "either", "enr", diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 2eaa33a9648..81fed9d0ba0 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -36,9 +36,10 @@ use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; use crate::version::beacon_response; use beacon_chain::{ - attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome, - validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, - BeaconChainTypes, WhenSlotSkipped, + attestation_verification::VerifiedAttestation, blob_verification::verify_kzg_for_blob_list, + observed_operations::ObservationOutcome, validator_monitor::timestamp_now, + AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, + WhenSlotSkipped, }; use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; pub use block_id::BlockId; @@ -51,7 +52,10 @@ use eth2::types::{ ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId, ValidatorStatus, ValidatorsRequestBody, }; -use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; +use eth2::{ + lighthouse::BlobsVerificationData, CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, + SSZ_CONTENT_TYPE_HEADER, +}; use health_metrics::observe::Observe; use lighthouse_network::rpc::methods::MetaData; use lighthouse_network::{types::SyncState, Enr, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; @@ -66,7 +70,7 @@ pub use publish_blocks::{ use serde::{Deserialize, Serialize}; use serde_json::Value; use slot_clock::SlotClock; -use ssz::Encode; +use ssz::{Decode, Encode}; pub use state_id::StateId; use std::collections::HashSet; use std::future::Future; @@ -75,6 +79,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; +use store::BlobSidecarListFromRoot; use sysinfo::{System, SystemExt}; use system_health::{observe_nat, observe_system_health_bn}; use task_spawner::{Priority, TaskSpawner}; @@ -89,11 +94,12 @@ use tokio_stream::{ use tracing::{debug, error, info, warn}; use types::AttestationData; use types::{ - Attestation, AttestationShufflingId, AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, - CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, ProposerPreparationData, - ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, - SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, + Attestation, AttestationShufflingId, AttesterSlashing, BeaconStateError, BlobSidecar, + BlobSidecarList, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, + ForkName, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, + SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, + SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -4638,6 +4644,178 @@ pub fn serve( }, ); + // POST lighthouse/database/import_blobs_ssz + let post_lighthouse_database_import_blobs_ssz = database_path + .and(warp::path("import_blobs_ssz")) + .and(warp::path::end()) + .and(warp::query::()) + .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: api_types::ImportBlobsQuery, + body: Bytes, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let blob_lists = Vec::>>>::from_ssz_bytes( + &body, + ) + .map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?; + + if blob_lists.is_empty() { + return Err(warp_utils::reject::custom_server_error( + "Blob list must not be empty".to_string(), + )); + } + + // Build `BlobSidecarList`s from the `Vec`s. + let blob_sidecar_lists: Result< + Vec>, + warp::Rejection, + > = blob_lists + .into_iter() + .map(|blob_sidecars| { + let first_blob_sidecar = blob_sidecars.first().ok_or_else(|| { + warp_utils::reject::custom_server_error( + "Blob sidecar list must not be empty".to_string(), + ) + })?; + + let max_blobs_at_epoch = + chain.spec.max_blobs_per_block(first_blob_sidecar.epoch()) as usize; + BlobSidecarList::new(blob_sidecars, max_blobs_at_epoch).map_err(|e| { + warp_utils::reject::custom_server_error(format!("{e:?}")) + }) + }) + .collect(); + let blob_sidecar_lists = blob_sidecar_lists?; + + if query.verify == Some(true) { + for blob_list in &blob_sidecar_lists { + match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { + Ok(()) => (), + Err(e) => { + return Err(warp_utils::reject::custom_server_error(format!( + "{e:?}" + ))) + } + } + + for blob_sidecar in blob_list { + if !blob_sidecar.verify_blob_sidecar_inclusion_proof() { + return Err(warp_utils::reject::custom_server_error( + "Found an invalid blob sidecar inclusion proof".to_string(), + )); + } + } + } + } + + match chain.store.import_blobs_batch(blob_sidecar_lists) { + Ok(()) => Ok(()), + Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), + } + }) + }, + ); + + // GET lighthouse/database/verify_blobs + let get_lighthouse_database_verify_blobs = database_path + .and(warp::path("verify_blobs")) + .and(warp::path::end()) + .and(warp::query::()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: api_types::VerifyBlobsQuery, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P1, async move { + let deneb_start_slot = + if let Some(deneb_fork_epoch) = chain.spec.deneb_fork_epoch { + deneb_fork_epoch.start_slot(T::EthSpec::slots_per_epoch()) + } else { + return Err(warp_utils::reject::custom_bad_request( + "No Deneb fork scheduled".to_string(), + )); + }; + let start_slot = query.start_slot; + + if start_slot < deneb_start_slot { + return Err(warp_utils::reject::custom_bad_request(format!( + "start_slot ({}) must be >= deneb fork slot ({})", + start_slot, deneb_start_slot + ))); + } + + let end_slot = query.end_slot; + if end_slot < start_slot { + return Err(warp_utils::reject::custom_bad_request(format!( + "end_slot ({}) must be >= start_slot ({})", + end_slot, start_slot + ))); + } + + let verify = query.verify.unwrap_or(true); + + let mut blob_count = 0; + let mut blobs_missing: Vec = Vec::new(); + let mut blobs_invalid: Vec = Vec::new(); + + for slot in start_slot.as_u64()..=end_slot.as_u64() { + if let Ok((root, _, _)) = BlockId::from_slot(Slot::from(slot)).root(&chain) + { + match chain.store.get_blobs(&root) { + Ok(blob_list) => { + match blob_list { + BlobSidecarListFromRoot::NoBlobs => { + // This means that no blobs exist for this slot. + continue; + } + BlobSidecarListFromRoot::NoRoot => { + // This means that there are blobs missing for this slot. + blobs_missing.push(slot); + } + BlobSidecarListFromRoot::Blobs(blob_list) => { + blob_count += blob_list.len(); + // Optionally verify each blob_list. + if verify + && verify_kzg_for_blob_list( + blob_list.iter(), + &chain.kzg, + ) + .is_err() + { + blobs_invalid.push(slot); + } + } + } + } + Err(_) => { + // An error here means that we could not decode the blob list. + // This likely means a corrupted database. + blobs_invalid.push(slot); + } + } + } + // An Err here means the block does not exist. This is fine assuming the node is synced. + } + + Ok::<_, warp::reject::Rejection>( + warp::reply::json(&api_types::GenericResponse::from( + BlobsVerificationData { + blob_count, + blobs_missing, + blobs_invalid, + }, + )) + .into_response(), + ) + }) + }, + ); + // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") .and(warp::path("analysis")) @@ -4938,6 +5116,7 @@ pub fn serve( .uor(get_lighthouse_eth1_deposit_cache) .uor(get_lighthouse_staking) .uor(get_lighthouse_database_info) + .uor(get_lighthouse_database_verify_blobs) .uor(get_lighthouse_block_rewards) .uor(get_lighthouse_attestation_performance) .uor(get_beacon_light_client_optimistic_update) @@ -4988,6 +5167,7 @@ pub fn serve( .uor(post_validator_liveness_epoch) .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) + .uor(post_lighthouse_database_import_blobs_ssz) .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index cff08bc6557..3dcc2887cfc 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -50,10 +50,6 @@ pub enum Error { MissingGenesisState, MissingSnapshot(Slot), BlockReplayError(BlockReplayError), - AddPayloadLogicError, - InvalidKey, - InvalidBytes, - InconsistentFork(InconsistentFork), #[cfg(feature = "leveldb")] LevelDbError(LevelDBError), #[cfg(feature = "redb")] @@ -68,6 +64,11 @@ pub enum Error { state_root: Hash256, slot: Slot, }, + AddPayloadLogicError, + InvalidKey, + InvalidBytes, + InvalidBlobImport(String), + InconsistentFork(InconsistentFork), Hdiff(hdiff::Error), ForwardsIterInvalidColumn(DBColumn), ForwardsIterGap(DBColumn, Slot, Slot), diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 1663ec7b4d4..f12d019ec83 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -841,6 +841,95 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + /// Import a batch of blobs. + /// Implements the following checks: + /// - Checks that `block_root` is consistent across each `BlobSidecarList`. + /// - Checks that `block_root` exists in the database. + /// - Checks if a `BlobSidecarList` is already stored for that `block_root`. + /// If it is, ensure it matches the `BlobSidecarList` we are attempting to store. + pub fn import_blobs_batch( + &self, + historical_blob_sidecars: Vec>, + ) -> Result<(), Error> { + if historical_blob_sidecars.is_empty() { + return Ok(()); + } + + let mut total_imported = 0; + + let mut ops = vec![]; + + for blob_sidecar_list in historical_blob_sidecars { + // Ensure all block_roots in the blob list are the same. + let block_root = { + let first_block_root = blob_sidecar_list[0].block_root(); + if !blob_sidecar_list + .iter() + .all(|blob_sidecar| blob_sidecar.block_root() == first_block_root) + { + return Err(Error::InvalidBlobImport( + "Inconsistent block roots".to_string(), + )); + } + first_block_root + }; + + // Check block is stored for this block_root. + if !self.block_exists(&block_root)? { + warn!( + %block_root, + num_blob_sidecars = blob_sidecar_list.len(), + "Aborting blob import; block does not exist" + ); + return Err(Error::InvalidBlobImport("Missing block".to_string())); + } + + // Instead of the above, fully load the block. + + // Check if a `blob_sidecar_list` is already stored for this block root. + match self.get_blobs(&block_root) { + Ok(BlobSidecarListFromRoot::Blobs(existing_blob_sidecar_list)) => { + // If blobs already exist, only proceed if they match exactly. + if existing_blob_sidecar_list == blob_sidecar_list { + debug!( + block_root = ?block_root, + num_blob_sidecars = blob_sidecar_list.len(), + "Skipping blob sidecar import as identical blob exists" + ); + continue; + } else { + return Err(Error::InvalidBlobImport(format!( + "Conflicting blobs exist for block root {:?}", + block_root + ))); + } + } + Ok(BlobSidecarListFromRoot::NoRoot) => { + // This block has no existing blobs: proceed with import. + self.blobs_as_kv_store_ops(&block_root, blob_sidecar_list.clone(), &mut ops); + total_imported += blob_sidecar_list.len(); + } + Ok(BlobSidecarListFromRoot::NoBlobs) => { + // This block should not have any blobs: reject the import. + warn!( + %block_root, + "Aborting blob import; blobs should not exist for this block_root" + ); + return Err(Error::InvalidBlobImport( + "No blobs should exist for this block_root".to_string(), + )); + } + Err(e) => return Err(Error::InvalidBlobImport(format!("{e:?}"))), + } + } + + self.blobs_db.do_atomically(ops)?; + + debug!(total_imported, "Imported historical blobs"); + + Ok(()) + } + pub fn blobs_as_kv_store_ops( &self, key: &Hash256, diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index 81666a64216..ba5057d8127 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -9,6 +9,7 @@ default = ["lighthouse"] lighthouse = [] [dependencies] +bytes = { workspace = true } derivative = { workspace = true } either = { workspace = true } enr = { version = "0.13.0", features = ["ed25519"] } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 52cc91ba298..3ccaa44fbdf 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -520,6 +520,27 @@ impl BeaconNodeHttpClient { ok_or_error(response).await } + /// Generic POST function supporting arbitrary responses and timeouts without setting Consensus Version. + async fn post_generic_with_ssz_body, U: IntoUrl>( + &self, + url: U, + body: T, + timeout: Option, + ) -> Result { + let mut builder = self.client.post(url); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + let mut headers = HeaderMap::new(); + + headers.insert( + "Content-Type", + HeaderValue::from_static("application/octet-stream"), + ); + let response = builder.headers(headers).body(body).send().await?; + ok_or_error(response).await + } + /// `GET beacon/genesis` /// /// ## Errors diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 9a5d9100cf5..178f9994e69 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -1,6 +1,7 @@ //! This module contains endpoints that are non-standard and only available on Lighthouse servers. mod attestation_performance; +mod blobs_verification; mod block_packing_efficiency; mod block_rewards; pub mod sync_state; @@ -13,18 +14,22 @@ use crate::{ }, BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot, }; +use bytes::Bytes; use proto_array::core::ProtoArray; use serde::{Deserialize, Serialize}; use ssz::four_byte_option_impl; use ssz_derive::{Decode, Encode}; +use std::time::Duration; pub use attestation_performance::{ AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, }; +pub use blobs_verification::BlobsVerificationData; pub use block_packing_efficiency::{ BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, }; pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; +use reqwest::Response; // Define "legacy" implementations of `Option` which use four bytes for encoding the union // selector. @@ -368,6 +373,57 @@ impl BeaconNodeHttpClient { self.post_with_response(path, &()).await } + /// `POST lighthouse/database/import_blobs_ssz` + pub async fn post_lighthouse_database_import_blobs_ssz( + &self, + blobs: Bytes, + verify: Option, + ) -> Result { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("database") + .push("import_blobs_ssz"); + + if let Some(verify) = verify { + path.query_pairs_mut() + .append_pair("verify", &verify.to_string()); + } + + self.post_generic_with_ssz_body(path, blobs, None).await + } + + /// `GET lighthouse/database/verify_blobs` + pub async fn get_lighthouse_database_verify_blobs( + &self, + start_slot: Slot, + end_slot: Slot, + verify: Option, + ) -> Result { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("database") + .push("verify_blobs"); + + path.query_pairs_mut() + .append_pair("start_slot", &start_slot.to_string()); + + path.query_pairs_mut() + .append_pair("end_slot", &end_slot.to_string()); + + if let Some(verify) = verify { + path.query_pairs_mut() + .append_pair("verify", &verify.to_string()); + } + + self.get_with_timeout(path, Duration::MAX).await + } + /// `POST lighthouse/add_peer` pub async fn post_lighthouse_add_peer(&self, req: AdminPeer) -> Result<(), Error> { let mut path = self.server.full.clone(); diff --git a/common/eth2/src/lighthouse/blobs_verification.rs b/common/eth2/src/lighthouse/blobs_verification.rs new file mode 100644 index 00000000000..ed357291e0a --- /dev/null +++ b/common/eth2/src/lighthouse/blobs_verification.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] +pub struct BlobsVerificationData { + pub blob_count: usize, + pub blobs_missing: Vec, + pub blobs_invalid: Vec, +} diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index b8c74d4dcdc..b9004d970ee 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -814,6 +814,18 @@ pub struct LightClientUpdateResponseChunkInner { pub payload: Vec, } +#[derive(Clone, Serialize, Deserialize)] +pub struct ImportBlobsQuery { + pub verify: Option, +} + +#[derive(Debug, Deserialize)] +pub struct VerifyBlobsQuery { + pub start_slot: Slot, + pub end_slot: Slot, + pub verify: Option, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub struct BeaconCommitteeSubscription { #[serde(with = "serde_utils::quoted_u64")] diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml index 99bef75a72c..c1ab9b2617d 100644 --- a/database_manager/Cargo.toml +++ b/database_manager/Cargo.toml @@ -9,7 +9,10 @@ beacon_node = { workspace = true } clap = { workspace = true } clap_utils = { workspace = true } environment = { workspace = true } +eth2 = { workspace = true } +ethereum_ssz = { workspace = true } hex = { workspace = true } +sensitive_url = { workspace = true } serde = { workspace = true } store = { workspace = true } strum = { workspace = true } diff --git a/database_manager/src/blobs_manager/cli.rs b/database_manager/src/blobs_manager/cli.rs new file mode 100644 index 00000000000..51348f4b12f --- /dev/null +++ b/database_manager/src/blobs_manager/cli.rs @@ -0,0 +1,168 @@ +use clap::{Parser, Subcommand}; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +#[derive(Subcommand, Clone, Deserialize, Serialize, Debug)] +#[clap(rename_all = "kebab-case")] +pub enum BlobsManager { + Verify(VerifyBlobs), + Import(ImportBlobs), + Export(ExportBlobs), +} + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Perform KZG verification for blobs in the database.")] +pub struct VerifyBlobs { + #[clap( + long, + value_name = "URL", + help = "The beacon node to verify blobs. Defaults to http://localhost:5052", + display_order = 0 + )] + pub beacon_node: Option, + + #[clap( + long, + value_name = "SLOT", + help = "The slot at which to begin blob verification. Defaults to the Deneb start slot", + display_order = 0 + )] + pub start_slot: Option, + + #[clap( + long, + value_name = "SLOT", + help = "The slot at which to stop blob verification. Defaults to the latest slot", + display_order = 0 + )] + pub end_slot: Option, + + #[clap( + long, + help = "Perform checks even if the beacon node is not synced", + display_order = 0, + default_value = "false" + )] + pub allow_unsynced: bool, + + #[clap( + long, + help = "Skip KZG verification and only perform blob availability checks", + display_order = 0, + default_value = "false" + )] + pub skip_verification: bool, +} + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Import a batch of blobs in SSZ format into the database.")] +pub struct ImportBlobs { + #[clap( + long, + value_name = "URL", + help = "The beacon node to import blobs to", + display_order = 0 + )] + pub beacon_node: Option, + + #[clap( + long, + value_name = "FILE", + help = "Input file containing blobs to import", + display_order = 0 + )] + pub input_file: PathBuf, + + #[clap( + long, + value_name = "SIZE", + help = "Chunk size in slots for importing blobs", + display_order = 0, + default_value = "320" + )] + pub chunk_size_slots: usize, + + #[clap( + long, + help = "Skip verification of blobs before import", + display_order = 0, + default_value = "false" + )] + pub skip_verification: bool, + + #[clap( + long, + help = "Attempt import even if the beacon node is not synced", + display_order = 0, + default_value = "false" + )] + pub allow_unsynced: bool, +} + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Export a batch of blobs in SSZ format from the database.")] +pub struct ExportBlobs { + #[clap( + long, + value_name = "URL", + help = "The beacon node to export blobs from. Defaults to http://localhost:5052.", + display_order = 0 + )] + pub beacon_node: Option, + + #[clap( + long, + value_name = "DIR", + help = "Output dir to export blobs to.", + display_order = 0 + )] + pub output_dir: PathBuf, + + #[clap( + long, + value_name = "SLOT", + help = "The slot at which to start exporting blobs from.", + display_order = 0, + requires = "end_slot", + conflicts_with_all = &["start_epoch", "end_epoch"] + )] + pub start_slot: Option, + + #[clap( + long, + value_name = "SLOT", + help = "The slot at which to stop exporting blobs to (inclusive).", + display_order = 0, + requires = "start_slot", + conflicts_with_all = &["start_epoch", "end_epoch"] + )] + pub end_slot: Option, + + #[clap( + long, + value_name = "EPOCH", + help = "The epoch at which to start exporting blobs from.", + display_order = 0, + requires = "end_epoch", + conflicts_with_all = &["start_slot", "end_slot"] + )] + pub start_epoch: Option, + + #[clap( + long, + value_name = "EPOCH", + help = "The epoch at which to stop exporting blobs to (inclusive).", + display_order = 0, + requires = "start_epoch", + conflicts_with_all = &["start_slot", "end_slot"] + )] + pub end_epoch: Option, + + #[clap( + long, + help = "Attempt export even if the beacon node is not synced", + display_order = 0, + default_value = "false" + )] + pub allow_unsynced: bool, +} diff --git a/database_manager/src/blobs_manager/export.rs b/database_manager/src/blobs_manager/export.rs new file mode 100644 index 00000000000..d375c6cfa83 --- /dev/null +++ b/database_manager/src/blobs_manager/export.rs @@ -0,0 +1,156 @@ +use crate::blobs_manager::{cli::ExportBlobs, ensure_node_synced, DEFAULT_BEACON_NODE}; +use eth2::{ + types::{BlobSidecarList, BlockId, ChainSpec, Epoch, EthSpec, Slot}, + BeaconNodeHttpClient, Timeouts, +}; +use sensitive_url::SensitiveUrl; +use ssz::Encode; +use std::time::Duration; +use tracing::{info, warn}; + +#[derive(PartialEq, Eq)] +enum ExportMode { + Epochs, + Slots, +} + +impl std::fmt::Display for ExportMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ExportMode::Epochs => write!(f, "epoch"), + ExportMode::Slots => write!(f, "slot"), + } + } +} + +pub async fn export_blobs( + config: &ExportBlobs, + spec: &ChainSpec, +) -> Result<(), String> { + let beacon_node = SensitiveUrl::parse( + &config + .beacon_node + .clone() + .unwrap_or(DEFAULT_BEACON_NODE.to_string()), + ) + .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; + + let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); + + if !config.allow_unsynced { + let (_, is_synced) = ensure_node_synced(&client).await?; + if !is_synced { + return Err( + "Beacon node is not synced. Use --allow-unsynced to skip this check.".to_string(), + ); + } + } + + // Ensure Deneb fork is enabled. + let deneb_fork_epoch = if let Some(deneb_fork_epoch) = spec.deneb_fork_epoch { + deneb_fork_epoch.as_u64() + } else { + return Err("Deneb fork epoch not set in chain spec".to_string()); + }; + + let mut export_mode = ExportMode::Epochs; + + // Export either epochs or slots. Defaults to epochs. + let start = if let Some(start_epoch) = config.start_epoch { + start_epoch + } else if let Some(start_slot) = config.start_slot { + // Since start_slot and start_epoch are mutually exclusive, we can safely assume that we are in slot mode. + export_mode = ExportMode::Slots; + start_slot + } else { + deneb_fork_epoch + }; + + let end = if let Some(end_epoch) = config.end_epoch { + end_epoch + } else if let Some(end_slot) = config.end_slot { + end_slot + } else { + return Err(format!("End {export_mode} not set")); + }; + + if end < start { + return Err(format!( + "End {export_mode} must be greater than or equal to start {export_mode}" + )); + } + + // Ensure start is at or after Deneb fork + if export_mode == ExportMode::Epochs { + if start < deneb_fork_epoch { + return Err(format!( + "Start epoch {} is before Deneb fork epoch {}", + start, deneb_fork_epoch + )); + } + } else { + let deneb_start_slot = Epoch::new(deneb_fork_epoch) + .start_slot(E::slots_per_epoch()) + .as_u64(); + if start < deneb_start_slot { + return Err(format!( + "Start slot {} is before Deneb fork start slot {}", + start, deneb_start_slot + )); + } + } + + if !config.output_dir.is_dir() { + return Err("Please set `--output-dir` to a valid directory.".to_string()); + } + + let filename = config.output_dir.join(format!("{start}_{end}.ssz")); + + // TODO(blob_manager): Ensure node is synced for start_slot -> end_slot. + + let mut blobs_to_export: Vec> = vec![]; + + // Generate start and end slots for each mode. + let (start_slot, end_slot) = if export_mode == ExportMode::Epochs { + info!(start_epoch = start, end_epoch = end, output_dir = ?config.output_dir, "Beginning blob export"); + ( + Epoch::new(start).start_slot(E::slots_per_epoch()).as_u64(), + Epoch::new(end).end_slot(E::slots_per_epoch()).as_u64(), + ) + } else { + info!(start_slot = start, end_slot = end, output_dir = ?config.output_dir, "Beginning blob export"); + (start, end) + }; + + for slot in start_slot..=end_slot { + if let Some(blobs) = client + .get_blobs::(BlockId::Slot(Slot::from(slot)), None, spec) + .await + .map_err(|e| format!("Failed to export blobs: {e:?}"))? + { + let blob_sidecar_list = blobs.into_data(); + if !blob_sidecar_list.is_empty() { + blobs_to_export.push(blob_sidecar_list); + } + } else { + // No blobs exist for this slot. + continue; + } + } + + let ssz_bytes = blobs_to_export.as_ssz_bytes(); + + // Check blobs exist so we don't create an empty file. + if !blobs_to_export.is_empty() { + std::fs::write(&filename, ssz_bytes) + .map_err(|e| format!("Failed to write blob file: {}", e))?; + info!( + blobs_exported = blobs_to_export.len(), + "Completed blob export" + ); + } else { + warn!("No blobs were found for this {} range", export_mode); + } + + Ok(()) +} diff --git a/database_manager/src/blobs_manager/import.rs b/database_manager/src/blobs_manager/import.rs new file mode 100644 index 00000000000..2eff1c7cd19 --- /dev/null +++ b/database_manager/src/blobs_manager/import.rs @@ -0,0 +1,69 @@ +use crate::blobs_manager::{cli::ImportBlobs, ensure_node_synced, DEFAULT_BEACON_NODE}; +use eth2::{types::EthSpec, BeaconNodeHttpClient, Timeouts}; +use sensitive_url::SensitiveUrl; +use ssz::{Decode, Encode}; +use std::sync::Arc; +use std::time::Duration; +use tracing::{error, info, warn}; +use types::BlobSidecar; + +pub async fn import_blobs(config: &ImportBlobs) -> Result<(), String> { + let beacon_node = SensitiveUrl::parse( + &config + .beacon_node + .clone() + .unwrap_or(DEFAULT_BEACON_NODE.to_string()), + ) + .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; + + let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); + + if !config.allow_unsynced { + let (_, is_synced) = ensure_node_synced(&client).await?; + if !is_synced { + return Err( + "Beacon node is not synced. Use --allow-unsynced to skip this check.".to_string(), + ); + } + } + + let blobs_ssz = std::fs::read(&config.input_file) + .map_err(|e| format!("Failed to read input file: {e:?}"))?; + + info!("Beginning blob import"); + + let verify = !config.skip_verification; + + if !verify { + warn!("Skipping blob verification") + } + + let blobs_vec = Vec::>>>::from_ssz_bytes(&blobs_ssz) + .map_err(|e| format!("Failed to decode blobs: {e:?}"))?; + + let chunks = blobs_vec.chunks(config.chunk_size_slots); + + for chunk in chunks { + match client + .post_lighthouse_database_import_blobs_ssz( + chunk.iter().collect::>().as_ssz_bytes().into(), + Some(!config.skip_verification), + ) + .await + .map_err(|e| format!("Failed to import blobs: {e:?}")) + { + Ok(_) => {} + Err(e) => { + error!("Some or all blobs failed to import. Please check the input file and try again."); + return Err(e); + } + } + } + + if !config.skip_verification { + info!("All blobs successfully verified"); + } + info!("Completed blob import"); + + Ok(()) +} diff --git a/database_manager/src/blobs_manager/mod.rs b/database_manager/src/blobs_manager/mod.rs new file mode 100644 index 00000000000..f8e60638ceb --- /dev/null +++ b/database_manager/src/blobs_manager/mod.rs @@ -0,0 +1,18 @@ +pub mod cli; +pub mod export; +pub mod import; +pub mod verify; + +use eth2::{types::Slot, BeaconNodeHttpClient}; + +const DEFAULT_BEACON_NODE: &str = "http://localhost:5052"; + +async fn ensure_node_synced(client: &BeaconNodeHttpClient) -> Result<(Slot, bool), String> { + let res = client + .get_node_syncing() + .await + .map_err(|e| format!("{e:?}"))? + .data; + + Ok((res.head_slot, !res.is_syncing)) +} diff --git a/database_manager/src/blobs_manager/verify.rs b/database_manager/src/blobs_manager/verify.rs new file mode 100644 index 00000000000..f1653781602 --- /dev/null +++ b/database_manager/src/blobs_manager/verify.rs @@ -0,0 +1,166 @@ +use crate::blobs_manager::{cli::VerifyBlobs, ensure_node_synced, DEFAULT_BEACON_NODE}; +use eth2::{ + types::{ChainSpec, EthSpec, Slot}, + BeaconNodeHttpClient, SensitiveUrl, Timeouts, +}; +use std::time::Duration; +use tracing::{error, info, warn}; + +const DEFAULT_SLOT_RANGE: u64 = 3200; + +#[derive(Debug, PartialEq, Eq)] +enum BlobErrorType { + Invalid, + Missing, +} + +pub async fn verify_blobs( + config: &VerifyBlobs, + spec: &ChainSpec, +) -> Result<(), String> { + let beacon_node = SensitiveUrl::parse( + &config + .beacon_node + .clone() + .unwrap_or(DEFAULT_BEACON_NODE.to_string()), + ) + .map_err(|e| format!("Unable to parse beacon node url: {e:?}"))?; + let client = BeaconNodeHttpClient::new(beacon_node, Timeouts::set_all(Duration::from_secs(12))); + + let head_slot = if !config.allow_unsynced { + let (head_slot, is_synced) = ensure_node_synced(&client).await?; + if !is_synced { + return Err( + "Beacon node is not synced. Use --allow-unsynced to skip this check.".to_string(), + ); + } + Some(head_slot) + } else { + if config.end_slot.is_none() { + warn!("Skipping sync check so no head slot can be computed. Using default range (100 epochs)."); + } + None + }; + + let deneb_start_slot = if let Some(deneb_fork_epoch) = spec.deneb_fork_epoch { + deneb_fork_epoch.start_slot(E::slots_per_epoch()) + } else { + return Err("Deneb fork not set in spec".to_string()); + }; + + // I believe this is the blob expiry window. We might want to add a mode which only checks these blobs. + let _min_epochs_for_blob_sidecars_requests = spec.min_epochs_for_blob_sidecars_requests; + + // Get start and end slots depending on config, + let start_slot = config + .start_slot + .map(Slot::from) + .unwrap_or(deneb_start_slot); + + let end_slot = config + .end_slot + .map(Slot::from) + .unwrap_or(head_slot.unwrap_or(start_slot + DEFAULT_SLOT_RANGE)); + + let verify = !config.skip_verification; + + if !verify { + warn!("Skipping verification") + } + + info!( + start_slot = start_slot.as_u64(), + end_slot = end_slot.as_u64(), + verify, + "Checking blobs in range" + ); + + let blobs_verification_data = client + .get_lighthouse_database_verify_blobs(start_slot, end_slot, Some(verify)) + .await + .map_err(|e| format!("Failed to verify blobs: {e:?}"))?; + + // Total number of individual blobs found (including invalid ones). + let blob_count = blobs_verification_data.blob_count; + // Number of slots which contain missing blobs. + let blobs_missing = blobs_verification_data.blobs_missing.len(); + // Number of slots which contain invalid blobs. + let blobs_invalid = blobs_verification_data.blobs_invalid.len(); + + log_slot_ranges( + blobs_verification_data.blobs_missing, + BlobErrorType::Missing, + ); + log_slot_ranges( + blobs_verification_data.blobs_invalid, + BlobErrorType::Invalid, + ); + + info!("Checks complete."); + info!( + "Slot range: {}-{}, Number of blobs: {}", + start_slot.as_u64(), + end_slot.as_u64(), + blob_count + ); + + if verify { + info!("Verification complete."); + if blobs_invalid == 0 { + info!("All blobs verified"); + } else { + error!("Invalid blobs: {}", blobs_invalid); + } + } + + if blobs_missing > 0 { + warn!("Missing blobs: {}", blobs_missing); + } + + Ok(()) +} + +fn log_slot_ranges(mut slots: Vec, error_type: BlobErrorType) { + if slots.is_empty() { + return; + } + + // We can probably skip sorting since it _should_ arrived pre-sorted from the API. + slots.sort_unstable(); + + let mut range_start = slots[0]; + let mut range_end = slots[0]; + + for &slot in slots.iter().skip(1) { + if slot == range_end + 1 { + range_end = slot; + } else { + if range_start == range_end { + if error_type == BlobErrorType::Invalid { + error!("{:?} slot: {}", error_type, range_start); + } else { + warn!("{:?} slot: {}", error_type, range_start); + } + } else if error_type == BlobErrorType::Invalid { + error!("{:?} slot range: {}-{}", error_type, range_start, range_end); + } else { + warn!("{:?} slot range: {}-{}", error_type, range_start, range_end); + } + range_start = slot; + range_end = slot; + } + } + + // Log the final range. + if range_start == range_end { + if error_type == BlobErrorType::Invalid { + error!("{:?} slot: {}", error_type, range_start); + } else { + warn!("{:?} slot: {}", error_type, range_start); + } + } else if error_type == BlobErrorType::Invalid { + error!("{:?} slot range: {}-{}", error_type, range_start, range_end); + } else { + warn!("{:?} slot range: {}-{}", error_type, range_start, range_end); + } +} diff --git a/database_manager/src/cli.rs b/database_manager/src/cli.rs index c62da1206f1..d6efd94ab5c 100644 --- a/database_manager/src/cli.rs +++ b/database_manager/src/cli.rs @@ -1,4 +1,5 @@ -pub use clap::{Arg, ArgAction, Args, Command, FromArgMatches, Parser}; +use crate::blobs_manager::cli::BlobsManager; +pub use clap::{Arg, ArgAction, Args, Command, FromArgMatches, Parser, Subcommand}; use clap_utils::get_color_style; use clap_utils::FLAG_HEADER; use serde::{Deserialize, Serialize}; @@ -80,6 +81,8 @@ pub enum DatabaseManagerSubcommand { PruneBlobs(PruneBlobs), PruneStates(PruneStates), Compact(Compact), + #[clap(subcommand)] + Blobs(BlobsManager), } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index d15a8419dfc..99731aa3cc7 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -1,4 +1,9 @@ +pub mod blobs_manager; pub mod cli; + +use crate::blobs_manager::{ + cli::BlobsManager, export::export_blobs, import::import_blobs, verify::verify_blobs, +}; use crate::cli::DatabaseManager; use crate::cli::Migrate; use crate::cli::PruneStates; @@ -455,6 +460,8 @@ pub fn run( ) -> Result<(), String> { let client_config = parse_client_config(cli_args, db_manager_config, &env)?; let context = env.core_context(); + let spec = context.eth2_config.spec.clone(); + let format_err = |e| format!("Fatal error: {:?}", e); let get_genesis_state = || { @@ -504,5 +511,16 @@ pub fn run( let compact_config = parse_compact_config(compact_config)?; compact_db::(compact_config, client_config).map_err(format_err) } + cli::DatabaseManagerSubcommand::Blobs(blobs_manager_cmd) => match blobs_manager_cmd { + BlobsManager::Verify(verify_config) => env + .runtime() + .block_on(verify_blobs::(verify_config, &spec.clone())), + BlobsManager::Export(export_config) => env + .runtime() + .block_on(export_blobs::(export_config, &context.eth2_config.spec)), + BlobsManager::Import(import_config) => { + env.runtime().block_on(import_blobs::(import_config)) + } + }, } }