Skip to content

Commit 5ede901

Browse files
committed
First try
1 parent 1fd86f8 commit 5ede901

File tree

3 files changed

+64
-0
lines changed

3 files changed

+64
-0
lines changed

beacon_node/http_api/src/lib.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4395,6 +4395,24 @@ pub fn serve<T: BeaconChainTypes>(
43954395
},
43964396
);
43974397

4398+
// POST lighthouse/database/import_blobs
4399+
let post_lighthouse_database_import_blobs = database_path
4400+
.and(warp::path("import_blobs"))
4401+
.and(warp::path::end())
4402+
.and(warp_utils::json::json())
4403+
.and(task_spawner_filter.clone())
4404+
.and(chain_filter.clone())
4405+
.then(
4406+
|blobs, task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
4407+
task_spawner.blocking_json_task(Priority::P1, move || {
4408+
match chain.store.import_historical_blobs(blobs) {
4409+
Ok(()) => Ok(()),
4410+
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
4411+
}
4412+
})
4413+
},
4414+
);
4415+
43984416
// GET lighthouse/analysis/block_rewards
43994417
let get_lighthouse_block_rewards = warp::path("lighthouse")
44004418
.and(warp::path("analysis"))
@@ -4752,6 +4770,7 @@ pub fn serve<T: BeaconChainTypes>(
47524770
.uor(post_validator_liveness_epoch)
47534771
.uor(post_lighthouse_liveness)
47544772
.uor(post_lighthouse_database_reconstruct)
4773+
.uor(post_lighthouse_database_import_blobs)
47554774
.uor(post_lighthouse_block_rewards)
47564775
.uor(post_lighthouse_ui_validator_metrics)
47574776
.uor(post_lighthouse_ui_validator_info)

beacon_node/store/src/errors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub enum Error {
5959
AddPayloadLogicError,
6060
InvalidKey,
6161
InvalidBytes,
62+
InvalidBlobImport(String),
6263
InconsistentFork(InconsistentFork),
6364
Hdiff(hdiff::Error),
6465
CacheBuildError(EpochCacheError),

beacon_node/store/src/hot_cold_store.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidec
4343
use types::*;
4444
use zstd::{Decoder, Encoder};
4545

46+
const HISTORICAL_BLOB_BATCH_SIZE: usize = 1000;
47+
4648
/// On-disk database that stores finalized states efficiently.
4749
///
4850
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
@@ -866,6 +868,48 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
866868
Ok(())
867869
}
868870

871+
/// Import historical blobs.
872+
pub fn import_historical_blobs(
873+
&self,
874+
historical_blobs: Vec<(Hash256, BlobSidecarList<E>)>,
875+
) -> Result<(), Error> {
876+
if historical_blobs.is_empty() {
877+
return Ok(());
878+
}
879+
880+
let mut total_imported = 0;
881+
882+
for chunk in historical_blobs.chunks(HISTORICAL_BLOB_BATCH_SIZE) {
883+
let mut ops = Vec::with_capacity(chunk.len());
884+
885+
for (block_root, blobs) in chunk {
886+
// Verify block exists.
887+
if !self.block_exists(block_root)? {
888+
warn!(
889+
self.log,
890+
"Skipping import of blobs; block root does not exist.";
891+
"block_root" => ?block_root,
892+
"num_blobs" => blobs.len(),
893+
);
894+
continue;
895+
}
896+
897+
self.blobs_as_kv_store_ops(block_root, blobs.clone(), &mut ops);
898+
total_imported += blobs.len();
899+
}
900+
901+
self.blobs_db.do_atomically(ops)?;
902+
}
903+
904+
debug!(
905+
self.log,
906+
"Imported historical blobs.";
907+
"total_imported" => total_imported,
908+
);
909+
910+
Ok(())
911+
}
912+
869913
pub fn blobs_as_kv_store_ops(
870914
&self,
871915
key: &Hash256,

0 commit comments

Comments
 (0)