Skip to content
Open
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

200 changes: 190 additions & 10 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::beacon_response;
use beacon_chain::{
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome,
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
BeaconChainTypes, WhenSlotSkipped,
attestation_verification::VerifiedAttestation, blob_verification::verify_kzg_for_blob_list,
observed_operations::ObservationOutcome, validator_monitor::timestamp_now,
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
WhenSlotSkipped,
};
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
pub use block_id::BlockId;
Expand All @@ -51,7 +52,10 @@ use eth2::types::{
ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody,
ValidatorId, ValidatorStatus, ValidatorsRequestBody,
};
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use eth2::{
lighthouse::BlobsVerificationData, CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER,
SSZ_CONTENT_TYPE_HEADER,
};
use health_metrics::observe::Observe;
use lighthouse_network::rpc::methods::MetaData;
use lighthouse_network::{types::SyncState, Enr, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
Expand All @@ -66,7 +70,7 @@ pub use publish_blocks::{
use serde::{Deserialize, Serialize};
use serde_json::Value;
use slot_clock::SlotClock;
use ssz::Encode;
use ssz::{Decode, Encode};
pub use state_id::StateId;
use std::collections::HashSet;
use std::future::Future;
Expand All @@ -75,6 +79,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use store::BlobSidecarListFromRoot;
use sysinfo::{System, SystemExt};
use system_health::{observe_nat, observe_system_health_bn};
use task_spawner::{Priority, TaskSpawner};
Expand All @@ -89,11 +94,12 @@ use tokio_stream::{
use tracing::{debug, error, info, warn};
use types::AttestationData;
use types::{
Attestation, AttestationShufflingId, AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint,
CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, ProposerPreparationData,
ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData,
SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData,
Attestation, AttestationShufflingId, AttesterSlashing, BeaconStateError, BlobSidecar,
BlobSidecarList, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
ForkName, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
Expand Down Expand Up @@ -4638,6 +4644,178 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// POST lighthouse/database/import_blobs_ssz
let post_lighthouse_database_import_blobs_ssz = database_path
.and(warp::path("import_blobs_ssz"))
.and(warp::path::end())
.and(warp::query::<api_types::ImportBlobsQuery>())
.and(warp::body::bytes())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|query: api_types::ImportBlobsQuery,
body: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let blob_lists = Vec::<Vec<Arc<BlobSidecar<T::EthSpec>>>>::from_ssz_bytes(
&body,
)
.map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?;

if blob_lists.is_empty() {
return Err(warp_utils::reject::custom_server_error(
"Blob list must not be empty".to_string(),
));
}

// Build `BlobSidecarList`s from the `Vec<BlobSidecar>`s.
let blob_sidecar_lists: Result<
Vec<BlobSidecarList<T::EthSpec>>,
warp::Rejection,
> = blob_lists
.into_iter()
.map(|blob_sidecars| {
let first_blob_sidecar = blob_sidecars.first().ok_or_else(|| {
warp_utils::reject::custom_server_error(
"Blob sidecar list must not be empty".to_string(),
)
})?;

let max_blobs_at_epoch =
chain.spec.max_blobs_per_block(first_blob_sidecar.epoch()) as usize;
BlobSidecarList::new(blob_sidecars, max_blobs_at_epoch).map_err(|e| {
warp_utils::reject::custom_server_error(format!("{e:?}"))
})
})
.collect();
let blob_sidecar_lists = blob_sidecar_lists?;

if query.verify == Some(true) {
for blob_list in &blob_sidecar_lists {
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
Ok(()) => (),
Err(e) => {
return Err(warp_utils::reject::custom_server_error(format!(
"{e:?}"
)))
}
}

for blob_sidecar in blob_list {
if !blob_sidecar.verify_blob_sidecar_inclusion_proof() {
return Err(warp_utils::reject::custom_server_error(
"Found an invalid blob sidecar inclusion proof".to_string(),
));
}
}
}
}

match chain.store.import_blobs_batch(blob_sidecar_lists) {
Ok(()) => Ok(()),
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
}
})
},
);

// GET lighthouse/database/verify_blobs
let get_lighthouse_database_verify_blobs = database_path
.and(warp::path("verify_blobs"))
.and(warp::path::end())
.and(warp::query::<api_types::VerifyBlobsQuery>())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|query: api_types::VerifyBlobsQuery,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.spawn_async_with_rejection(Priority::P1, async move {
let deneb_start_slot =
if let Some(deneb_fork_epoch) = chain.spec.deneb_fork_epoch {
deneb_fork_epoch.start_slot(T::EthSpec::slots_per_epoch())
} else {
return Err(warp_utils::reject::custom_bad_request(
"No Deneb fork scheduled".to_string(),
));
};
let start_slot = query.start_slot;

if start_slot < deneb_start_slot {
return Err(warp_utils::reject::custom_bad_request(format!(
"start_slot ({}) must be >= deneb fork slot ({})",
start_slot, deneb_start_slot
)));
}

let end_slot = query.end_slot;
if end_slot < start_slot {
return Err(warp_utils::reject::custom_bad_request(format!(
"end_slot ({}) must be >= start_slot ({})",
end_slot, start_slot
)));
}

let verify = query.verify.unwrap_or(true);

let mut blob_count = 0;
let mut blobs_missing: Vec<u64> = Vec::new();
let mut blobs_invalid: Vec<u64> = Vec::new();

for slot in start_slot.as_u64()..=end_slot.as_u64() {
if let Ok((root, _, _)) = BlockId::from_slot(Slot::from(slot)).root(&chain)
{
match chain.store.get_blobs(&root) {
Ok(blob_list) => {
match blob_list {
BlobSidecarListFromRoot::NoBlobs => {
// This means that no blobs exist for this slot.
continue;
}
BlobSidecarListFromRoot::NoRoot => {
// This means that there are blobs missing for this slot.
blobs_missing.push(slot);
}
BlobSidecarListFromRoot::Blobs(blob_list) => {
blob_count += blob_list.len();
// Optionally verify each blob_list.
if verify
&& verify_kzg_for_blob_list(
blob_list.iter(),
&chain.kzg,
)
.is_err()
{
blobs_invalid.push(slot);
}
}
}
}
Err(_) => {
// An error here means that we could not decode the blob list.
// This likely means a corrupted database.
blobs_invalid.push(slot);
}
}
}
// An Err here means the block does not exist. This is fine assuming the node is synced.
}

Ok::<_, warp::reject::Rejection>(
warp::reply::json(&api_types::GenericResponse::from(
BlobsVerificationData {
blob_count,
blobs_missing,
blobs_invalid,
},
))
.into_response(),
)
})
},
);

// GET lighthouse/analysis/block_rewards
let get_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis"))
Expand Down Expand Up @@ -4938,6 +5116,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_lighthouse_eth1_deposit_cache)
.uor(get_lighthouse_staking)
.uor(get_lighthouse_database_info)
.uor(get_lighthouse_database_verify_blobs)
.uor(get_lighthouse_block_rewards)
.uor(get_lighthouse_attestation_performance)
.uor(get_beacon_light_client_optimistic_update)
Expand Down Expand Up @@ -4988,6 +5167,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_validator_liveness_epoch)
.uor(post_lighthouse_liveness)
.uor(post_lighthouse_database_reconstruct)
.uor(post_lighthouse_database_import_blobs_ssz)
.uor(post_lighthouse_block_rewards)
.uor(post_lighthouse_ui_validator_metrics)
.uor(post_lighthouse_ui_validator_info)
Expand Down
9 changes: 5 additions & 4 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub enum Error {
MissingGenesisState,
MissingSnapshot(Slot),
BlockReplayError(BlockReplayError),
AddPayloadLogicError,
InvalidKey,
InvalidBytes,
InconsistentFork(InconsistentFork),
#[cfg(feature = "leveldb")]
LevelDbError(LevelDBError),
#[cfg(feature = "redb")]
Expand All @@ -68,6 +64,11 @@ pub enum Error {
state_root: Hash256,
slot: Slot,
},
AddPayloadLogicError,
InvalidKey,
InvalidBytes,
InvalidBlobImport(String),
InconsistentFork(InconsistentFork),
Hdiff(hdiff::Error),
ForwardsIterInvalidColumn(DBColumn),
ForwardsIterGap(DBColumn, Slot, Slot),
Expand Down
89 changes: 89 additions & 0 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,95 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}

/// Import a batch of blobs.
/// Implements the following checks:
/// - Checks that `block_root` is consistent across each `BlobSidecarList`.
/// - Checks that `block_root` exists in the database.
/// - Checks if a `BlobSidecarList` is already stored for that `block_root`.
/// If it is, ensure it matches the `BlobSidecarList` we are attempting to store.
pub fn import_blobs_batch(
&self,
historical_blob_sidecars: Vec<BlobSidecarList<E>>,
) -> Result<(), Error> {
if historical_blob_sidecars.is_empty() {
return Ok(());
}

let mut total_imported = 0;

let mut ops = vec![];

for blob_sidecar_list in historical_blob_sidecars {
// Ensure all block_roots in the blob list are the same.
let block_root = {
let first_block_root = blob_sidecar_list[0].block_root();
if !blob_sidecar_list
.iter()
.all(|blob_sidecar| blob_sidecar.block_root() == first_block_root)
{
return Err(Error::InvalidBlobImport(
"Inconsistent block roots".to_string(),
));
}
first_block_root
};

// Check block is stored for this block_root.
if !self.block_exists(&block_root)? {
warn!(
%block_root,
num_blob_sidecars = blob_sidecar_list.len(),
"Aborting blob import; block does not exist"
);
return Err(Error::InvalidBlobImport("Missing block".to_string()));
}

// Instead of the above, fully load the block.

// Check if a `blob_sidecar_list` is already stored for this block root.
match self.get_blobs(&block_root) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets try adding a log with the time that this step takes

Ok(BlobSidecarListFromRoot::Blobs(existing_blob_sidecar_list)) => {
// If blobs already exist, only proceed if they match exactly.
if existing_blob_sidecar_list == blob_sidecar_list {
debug!(
block_root = ?block_root,
num_blob_sidecars = blob_sidecar_list.len(),
"Skipping blob sidecar import as identical blob exists"
);
continue;
} else {
return Err(Error::InvalidBlobImport(format!(
"Conflicting blobs exist for block root {:?}",
block_root
)));
}
}
Ok(BlobSidecarListFromRoot::NoRoot) => {
// This block has no existing blobs: proceed with import.
self.blobs_as_kv_store_ops(&block_root, blob_sidecar_list.clone(), &mut ops);
total_imported += blob_sidecar_list.len();
}
Ok(BlobSidecarListFromRoot::NoBlobs) => {
// This block should not have any blobs: reject the import.
warn!(
%block_root,
"Aborting blob import; blobs should not exist for this block_root"
);
return Err(Error::InvalidBlobImport(
"No blobs should exist for this block_root".to_string(),
));
}
Err(e) => return Err(Error::InvalidBlobImport(format!("{e:?}"))),
}
}

self.blobs_db.do_atomically(ops)?;

debug!(total_imported, "Imported historical blobs");

Ok(())
}

pub fn blobs_as_kv_store_ops(
&self,
key: &Hash256,
Expand Down
Loading