Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
651567d
Add release helper script.
jimmygchen Jul 11, 2025
07f80fa
Add experimental complete-blob-backfill flag
michaelsproul Jul 17, 2025
3af44b4
Merge branch 'unstable' into blob-backfill
michaelsproul Jul 22, 2025
515ef25
Fix CLI flag!!
michaelsproul Jul 22, 2025
47a80e5
Use scoped rayon pool for chain segment backfill.
jimmygchen Aug 22, 2025
a9da143
Merge branch 'unstable' into blob-backfill
michaelsproul Aug 27, 2025
3de41b3
Fix test
michaelsproul Aug 27, 2025
5cbfdb5
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Aug 28, 2025
cc1b870
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Aug 29, 2025
28f6ad2
Create RayonManager struct
eserilev Sep 1, 2025
583aaba
Linting
eserilev Sep 1, 2025
16a1b48
Fix comment
eserilev Sep 1, 2025
f81f5ea
Introduce a high prio threadpool for use when backfill rate limiting …
eserilev Sep 1, 2025
067b599
Fix test
eserilev Sep 2, 2025
23bd8bf
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Sep 2, 2025
4e884bb
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Sep 2, 2025
f1144e4
fix test
eserilev Sep 2, 2025
ba65b98
Merge branch 'unstable' into backfill-verify-kzg-use-scoped-rayon
eserilev Sep 3, 2025
8263ada
Merge branch 'unstable' into blob-backfill
michaelsproul Sep 3, 2025
d798a1e
Merge branch 'unstable' into blob-backfill
michaelsproul Sep 10, 2025
09433eb
Add default impl
eserilev Sep 15, 2025
b5ff451
Remove scoped high prio thread pool in favor of the global thread pool
eserilev Sep 15, 2025
0debde9
Merge branch 'backfill-verify-kzg-use-scoped-rayon' of https://github…
eserilev Sep 16, 2025
1694020
USe the global rayon pool
eserilev Sep 16, 2025
a66e158
Fix reprocess queue memory leak
michaelsproul Sep 17, 2025
2d749ae
Add regression tests
michaelsproul Sep 17, 2025
456af43
FMT
eserilev Sep 17, 2025
b289f49
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Sep 17, 2025
9b57acc
fix test
eserilev Sep 17, 2025
dabbdc7
Fix
eserilev Sep 17, 2025
3f57327
ugh
eserilev Sep 17, 2025
091aafb
fix
eserilev Sep 17, 2025
6a525cb
low prio
eserilev Sep 17, 2025
2d7e57a
fix comment
eserilev Sep 17, 2025
c0467f4
update minimum thread count to 1
eserilev Sep 17, 2025
1f3d1da
Disable blob pruning when set
michaelsproul Sep 18, 2025
55dd21a
Merge remote-tracking branch 'origin/unstable' into blob-backfill
michaelsproul Sep 18, 2025
9f4c65d
Merge of #8065
mergify[bot] Sep 18, 2025
f6a9465
Merge of #7751
mergify[bot] Sep 18, 2025
5bf9d92
Merge of #7924
mergify[bot] Sep 18, 2025
4fa501c
Merge of #7737
mergify[bot] Sep 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ where
let genesis_time = head_snapshot.beacon_state.genesis_time();
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let shuffling_cache_size = self.chain_config.shuffling_cache_size;
let complete_blob_backfill = self.chain_config.complete_blob_backfill;

// Calculate the weak subjectivity point in which to backfill blocks to.
let genesis_backfill_slot = if self.chain_config.genesis_backfill {
Expand Down Expand Up @@ -1013,6 +1014,7 @@ where
genesis_backfill_slot,
data_availability_checker: Arc::new(
DataAvailabilityChecker::new(
complete_blob_backfill,
slot_clock,
self.kzg.clone(),
store,
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub struct ChainConfig {
/// If using a weak-subjectivity sync, whether we should download blocks all the way back to
/// genesis.
pub genesis_backfill: bool,
/// EXPERIMENTAL: backfill blobs and data columns beyond the data availability window.
pub complete_blob_backfill: bool,
/// Whether to send payload attributes every slot, regardless of connected proposers.
///
/// This is useful for block builders and testing.
Expand Down Expand Up @@ -144,6 +146,7 @@ impl Default for ChainConfig {
optimistic_finalized_sync: true,
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
genesis_backfill: false,
complete_blob_backfill: false,
always_prepare_payload: false,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
enable_light_client_server: true,
Expand Down
27 changes: 22 additions & 5 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get();
/// proposer. Having a capacity > 1 is an optimization to prevent sync lookup from having re-fetch
/// data during moments of unstable network conditions.
pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
complete_blob_backfill: bool,
availability_cache: Arc<DataAvailabilityCheckerInner<T>>,
slot_clock: T::SlotClock,
kzg: Arc<Kzg>,
Expand Down Expand Up @@ -116,6 +117,7 @@ impl<E: EthSpec> Debug for Availability<E> {

impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn new(
complete_blob_backfill: bool,
slot_clock: T::SlotClock,
kzg: Arc<Kzg>,
store: BeaconStore<T>,
Expand All @@ -129,6 +131,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
spec.clone(),
)?;
Ok(Self {
complete_blob_backfill,
availability_cache: Arc::new(inner),
slot_clock,
kzg,
Expand Down Expand Up @@ -518,9 +521,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
let current_epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch());
self.spec
.min_epoch_data_availability_boundary(current_epoch)
let fork_epoch = self.spec.deneb_fork_epoch?;

if self.complete_blob_backfill {
Some(fork_epoch)
} else {
let current_epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch());
self.spec
.min_epoch_data_availability_boundary(current_epoch)
}
}

/// Returns true if the given epoch lies within the da boundary and false otherwise.
Expand Down Expand Up @@ -1076,7 +1085,15 @@ mod test {
let kzg = get_kzg(&spec);
let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap());
let custody_context = Arc::new(CustodyContext::new(false));
DataAvailabilityChecker::new(slot_clock, kzg, store, custody_context, spec)
.expect("should initialise data availability checker")
let complete_blob_backfill = false;
DataAvailabilityChecker::new(
complete_blob_backfill,
slot_clock,
kzg,
store,
custody_context,
spec,
)
.expect("should initialise data availability checker")
}
}
1 change: 1 addition & 0 deletions beacon_node/beacon_processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ logging = { workspace = true }
metrics = { workspace = true }
num_cpus = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
slot_clock = { workspace = true }
strum = { workspace = true }
Expand Down
34 changes: 32 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
//! task.

use crate::rayon_manager::RayonManager;
use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
};
Expand All @@ -47,6 +48,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use logging::TimeLatch;
use logging::crit;
use parking_lot::Mutex;
use rayon::ThreadPool;
pub use scheduler::work_reprocessing_queue;
use serde::{Deserialize, Serialize};
use slot_clock::SlotClock;
Expand Down Expand Up @@ -74,6 +76,7 @@ use work_reprocessing_queue::{
};

mod metrics;
pub mod rayon_manager;
pub mod scheduler;

/// The maximum size of the channel for work events to the `BeaconProcessor`.
Expand Down Expand Up @@ -603,7 +606,7 @@ pub enum Work<E: EthSpec> {
process_fn: BlockingFn,
},
ChainSegment(AsyncFn),
ChainSegmentBackfill(AsyncFn),
ChainSegmentBackfill(BlockingFn),
Status(BlockingFn),
BlocksByRangeRequest(AsyncFn),
BlocksByRootsRequest(AsyncFn),
Expand Down Expand Up @@ -807,6 +810,7 @@ pub struct BeaconProcessor<E: EthSpec> {
pub network_globals: Arc<NetworkGlobals<E>>,
pub executor: TaskExecutor,
pub current_workers: usize,
pub rayon_manager: RayonManager,
pub config: BeaconProcessorConfig,
}

Expand Down Expand Up @@ -1603,7 +1607,17 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_async(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ChainSegmentBackfill(process_fn) => {
if self.config.enable_backfill_rate_limiting {
task_spawner.spawn_blocking_with_rayon(
self.rayon_manager.low_priority_threadpool.clone(),
process_fn,
)
} else {
// use the global rayon thread pool if backfill rate limiting is disabled.
task_spawner.spawn_blocking(process_fn)
}
}
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
Expand Down Expand Up @@ -1665,6 +1679,22 @@ impl TaskSpawner {
WORKER_TASK_NAME,
)
}

/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
fn spawn_blocking_with_rayon<F>(self, thread_pool: Arc<ThreadPool>, task: F)
where
F: FnOnce() + Send + 'static,
{
self.executor.spawn_blocking(
move || {
thread_pool.install(|| {
task();
});
drop(self.send_idle_on_drop)
},
WORKER_TASK_NAME,
)
}
}

/// This struct will send a message on `self.tx` when it is dropped. An error will be logged
Expand Down
27 changes: 27 additions & 0 deletions beacon_node/beacon_processor/src/rayon_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::sync::Arc;

const DEFAULT_LOW_PRIORITY_DIVISOR: usize = 4;
const MINIMUM_LOW_PRIORITY_THREAD_COUNT: usize = 1;

pub struct RayonManager {
/// Smaller rayon thread pool for lower-priority, compute-intensive tasks.
/// By default ~25% of CPUs or a minimum of 1 thread.
pub low_priority_threadpool: Arc<ThreadPool>,
}

impl Default for RayonManager {
fn default() -> Self {
let low_prio_threads =
(num_cpus::get() / DEFAULT_LOW_PRIORITY_DIVISOR).max(MINIMUM_LOW_PRIORITY_THREAD_COUNT);
let low_priority_threadpool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(low_prio_threads)
.build()
.expect("failed to build low-priority rayon pool"),
);
Self {
low_priority_threadpool,
}
}
}
Loading