Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
47a80e5
Use scoped rayon pool for chain segment backfill.
jimmygchen Aug 22, 2025
5cbfdb5
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Aug 28, 2025
cc1b870
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Aug 29, 2025
28f6ad2
Create RayonManager struct
eserilev Sep 1, 2025
583aaba
Linting
eserilev Sep 1, 2025
16a1b48
Fix comment
eserilev Sep 1, 2025
f81f5ea
Introduce a high prio threadpool for use when backfill rate limiting …
eserilev Sep 1, 2025
067b599
Fix test
eserilev Sep 2, 2025
23bd8bf
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Sep 2, 2025
4e884bb
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Sep 2, 2025
f1144e4
fix test
eserilev Sep 2, 2025
ba65b98
Merge branch 'unstable' into backfill-verify-kzg-use-scoped-rayon
eserilev Sep 3, 2025
09433eb
Add default impl
eserilev Sep 15, 2025
b5ff451
Remove scoped high prio thread pool in favor of the global thread pool
eserilev Sep 15, 2025
0debde9
Merge branch 'backfill-verify-kzg-use-scoped-rayon' of https://github…
eserilev Sep 16, 2025
1694020
USe the global rayon pool
eserilev Sep 16, 2025
456af43
FMT
eserilev Sep 17, 2025
b289f49
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ba…
eserilev Sep 17, 2025
9b57acc
fix test
eserilev Sep 17, 2025
dabbdc7
Fix
eserilev Sep 17, 2025
3f57327
ugh
eserilev Sep 17, 2025
091aafb
fix
eserilev Sep 17, 2025
6a525cb
low prio
eserilev Sep 17, 2025
2d7e57a
fix comment
eserilev Sep 17, 2025
c0467f4
update minimum thread count to 1
eserilev Sep 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/beacon_processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ logging = { workspace = true }
metrics = { workspace = true }
num_cpus = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
slot_clock = { workspace = true }
strum = { workspace = true }
Expand Down
31 changes: 29 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
//! task.

use crate::rayon_manager::RayonManager;
use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
};
Expand All @@ -47,6 +48,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use logging::TimeLatch;
use logging::crit;
use parking_lot::Mutex;
use rayon::ThreadPool;
pub use scheduler::work_reprocessing_queue;
use serde::{Deserialize, Serialize};
use slot_clock::SlotClock;
Expand Down Expand Up @@ -74,6 +76,7 @@ use work_reprocessing_queue::{
};

mod metrics;
pub mod rayon_manager;
pub mod scheduler;

/// The maximum size of the channel for work events to the `BeaconProcessor`.
Expand Down Expand Up @@ -603,7 +606,7 @@ pub enum Work<E: EthSpec> {
process_fn: BlockingFn,
},
ChainSegment(AsyncFn),
ChainSegmentBackfill(AsyncFn),
ChainSegmentBackfill(BlockingFn),
Status(BlockingFn),
BlocksByRangeRequest(AsyncFn),
BlocksByRootsRequest(AsyncFn),
Expand Down Expand Up @@ -807,6 +810,7 @@ pub struct BeaconProcessor<E: EthSpec> {
pub network_globals: Arc<NetworkGlobals<E>>,
pub executor: TaskExecutor,
pub current_workers: usize,
pub rayon_manager: RayonManager,
pub config: BeaconProcessorConfig,
}

Expand Down Expand Up @@ -1605,7 +1609,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_async(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ChainSegmentBackfill(process_fn) => {
let thread_pool = if self.config.enable_backfill_rate_limiting {
self.rayon_manager.low_priority_threadpool.clone()
} else {
self.rayon_manager.high_priority_threadpool.clone()
};
task_spawner.spawn_blocking_with_rayon(thread_pool, process_fn)
}
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
Expand Down Expand Up @@ -1667,6 +1678,22 @@ impl TaskSpawner {
WORKER_TASK_NAME,
)
}

/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
fn spawn_blocking_with_rayon<F>(self, thread_pool: Arc<ThreadPool>, task: F)
where
F: FnOnce() + Send + 'static,
{
self.executor.spawn_blocking(
move || {
thread_pool.install(|| {
task();
});
drop(self.send_idle_on_drop)
},
WORKER_TASK_NAME,
)
}
}

/// This struct will send a message on `self.tx` when it is dropped. An error will be logged
Expand Down
37 changes: 37 additions & 0 deletions beacon_node/beacon_processor/src/rayon_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::sync::Arc;

pub const DEFAULT_LOW_PRIORITY_DIVISOR: usize = 4;
const MINIMUM_LOW_PRIORITY_THREAD_COUNT: usize = 2;

pub struct RayonManager {
/// Smaller rayon thread pool for lower-priority, compute-intensive tasks.
/// By default ~25% of CPUs or a minimum of 2 threads.
pub low_priority_threadpool: Arc<ThreadPool>,
/// Larger rayon thread pool for high-priority, compute-intensive tasks.
/// By default 100% of CPUs.
pub high_priority_threadpool: Arc<ThreadPool>,
}

impl RayonManager {
pub fn new(low_prio_cpu_divisor: usize) -> Self {
let low_prio_threads =
(num_cpus::get() / low_prio_cpu_divisor).max(MINIMUM_LOW_PRIORITY_THREAD_COUNT);
let low_priority_threadpool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(low_prio_threads)
.build()
.expect("failed to build low-priority rayon pool"),
);
let high_priority_threadpool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.build()
.expect("failed to build high-priority rayon pool"),
);
Self {
low_priority_threadpool,
high_priority_threadpool,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub struct IgnoredRpcBlock {
}

/// A backfill batch work that has been queued for processing later.
pub struct QueuedBackfillBatch(pub AsyncFn);
pub struct QueuedBackfillBatch(pub BlockingFn);

pub struct QueuedColumnReconstruction {
pub block_root: Hash256,
Expand Down Expand Up @@ -1042,7 +1042,7 @@ mod tests {
// Now queue a backfill sync batch.
work_reprocessing_tx
.try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch(
Box::pin(async {}),
Box::new(|| {}),
)))
.unwrap();
tokio::task::yield_now().await;
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use beacon_chain::{
store::{HotColdDB, ItemStore, StoreConfig},
};
use beacon_chain::{Kzg, LightClientProducerEvent};
use beacon_processor::rayon_manager::{DEFAULT_LOW_PRIORITY_DIVISOR, RayonManager};
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths};
use environment::RuntimeContext;
Expand Down Expand Up @@ -680,6 +681,7 @@ where
executor: beacon_processor_context.executor.clone(),
current_workers: 0,
config: beacon_processor_config,
rayon_manager: RayonManager::new(DEFAULT_LOW_PRIORITY_DIVISOR),
}
.spawn_manager(
beacon_processor_channels.beacon_processor_rx,
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use beacon_chain::{
};
use beacon_processor::{
BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths,
rayon_manager::{DEFAULT_LOW_PRIORITY_DIVISOR, RayonManager},
};
use directory::DEFAULT_ROOT_DIR;
use eth2::{BeaconNodeHttpClient, Timeouts};
Expand Down Expand Up @@ -214,6 +215,7 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
executor: test_runtime.task_executor.clone(),
current_workers: 0,
config: beacon_processor_config,
rayon_manager: RayonManager::new(DEFAULT_LOW_PRIORITY_DIVISOR),
}
.spawn_manager(
beacon_processor_rx,
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/lighthouse_tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block";
pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs";
pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns";
pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill";

/// RPC methods root spans
pub const SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST: &str = "handle_blocks_by_range_request";
Expand Down Expand Up @@ -51,6 +52,7 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[
SPAN_PROCESS_RPC_BLOBS,
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
SPAN_PROCESS_CHAIN_SEGMENT,
SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL,
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST,
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST,
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST,
Expand Down
38 changes: 13 additions & 25 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use beacon_chain::data_column_verification::{GossipDataColumnError, observe_goss
use beacon_chain::fetch_blobs::{
EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs,
};
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
use beacon_processor::{
BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
WorkEvent as BeaconWorkEvent,
Expand Down Expand Up @@ -500,33 +498,23 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
process_id: ChainSegmentProcessId,
blocks: Vec<RpcBlock<T::EthSpec>>,
) -> Result<(), Error<T::EthSpec>> {
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process");

let processor = self.clone();
let process_fn = async move {
let notify_execution_layer = if processor
.network_globals
.sync_state
.read()
.is_syncing_finalized()
{
NotifyExecutionLayer::No
} else {
NotifyExecutionLayer::Yes
};
processor
.process_chain_segment(process_id, blocks, notify_execution_layer)
.await;
};
let process_fn = Box::pin(process_fn);

// Back-sync batches are dispatched with a different `Work` variant so
// they can be rate-limited.
let work = if is_backfill {
Work::ChainSegmentBackfill(process_fn)
} else {
Work::ChainSegment(process_fn)
let work = match process_id {
ChainSegmentProcessId::RangeBatchId(_, _) => {
let process_fn = async move {
processor.process_chain_segment(process_id, blocks).await;
};
Work::ChainSegment(Box::pin(process_fn))
}
ChainSegmentProcessId::BackSyncBatchId(_) => {
let process_fn =
move || processor.process_chain_segment_backfill(process_id, blocks);
Work::ChainSegmentBackfill(Box::new(process_fn))
}
};

self.try_send(BeaconWorkEvent {
Expand Down
Loading
Loading