Skip to content

Commit 9a8b9b8

Browse files
authored
Merge of #7924
2 parents 5aed118 + c0467f4 commit 9a8b9b8

File tree

11 files changed

+230
-110
lines changed

11 files changed

+230
-110
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/beacon_processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ logging = { workspace = true }
1212
metrics = { workspace = true }
1313
num_cpus = { workspace = true }
1414
parking_lot = { workspace = true }
15+
rayon = { workspace = true }
1516
serde = { workspace = true }
1617
slot_clock = { workspace = true }
1718
strum = { workspace = true }

beacon_node/beacon_processor/src/lib.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
3939
//! task.
4040
41+
use crate::rayon_manager::RayonManager;
4142
use crate::work_reprocessing_queue::{
4243
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
4344
};
@@ -47,6 +48,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
4748
use logging::TimeLatch;
4849
use logging::crit;
4950
use parking_lot::Mutex;
51+
use rayon::ThreadPool;
5052
pub use scheduler::work_reprocessing_queue;
5153
use serde::{Deserialize, Serialize};
5254
use slot_clock::SlotClock;
@@ -74,6 +76,7 @@ use work_reprocessing_queue::{
7476
};
7577

7678
mod metrics;
79+
pub mod rayon_manager;
7780
pub mod scheduler;
7881

7982
/// The maximum size of the channel for work events to the `BeaconProcessor`.
@@ -603,7 +606,7 @@ pub enum Work<E: EthSpec> {
603606
process_fn: BlockingFn,
604607
},
605608
ChainSegment(AsyncFn),
606-
ChainSegmentBackfill(AsyncFn),
609+
ChainSegmentBackfill(BlockingFn),
607610
Status(BlockingFn),
608611
BlocksByRangeRequest(AsyncFn),
609612
BlocksByRootsRequest(AsyncFn),
@@ -807,6 +810,7 @@ pub struct BeaconProcessor<E: EthSpec> {
807810
pub network_globals: Arc<NetworkGlobals<E>>,
808811
pub executor: TaskExecutor,
809812
pub current_workers: usize,
813+
pub rayon_manager: RayonManager,
810814
pub config: BeaconProcessorConfig,
811815
}
812816

@@ -1603,7 +1607,17 @@ impl<E: EthSpec> BeaconProcessor<E> {
16031607
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
16041608
task_spawner.spawn_async(work)
16051609
}
1606-
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
1610+
Work::ChainSegmentBackfill(process_fn) => {
1611+
if self.config.enable_backfill_rate_limiting {
1612+
task_spawner.spawn_blocking_with_rayon(
1613+
self.rayon_manager.low_priority_threadpool.clone(),
1614+
process_fn,
1615+
)
1616+
} else {
1617+
// use the global rayon thread pool if backfill rate limiting is disabled.
1618+
task_spawner.spawn_blocking(process_fn)
1619+
}
1620+
}
16071621
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
16081622
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
16091623
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
@@ -1665,6 +1679,22 @@ impl TaskSpawner {
16651679
WORKER_TASK_NAME,
16661680
)
16671681
}
1682+
1683+
/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
1684+
fn spawn_blocking_with_rayon<F>(self, thread_pool: Arc<ThreadPool>, task: F)
1685+
where
1686+
F: FnOnce() + Send + 'static,
1687+
{
1688+
self.executor.spawn_blocking(
1689+
move || {
1690+
thread_pool.install(|| {
1691+
task();
1692+
});
1693+
drop(self.send_idle_on_drop)
1694+
},
1695+
WORKER_TASK_NAME,
1696+
)
1697+
}
16681698
}
16691699

16701700
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use rayon::{ThreadPool, ThreadPoolBuilder};
2+
use std::sync::Arc;
3+
4+
const DEFAULT_LOW_PRIORITY_DIVISOR: usize = 4;
5+
const MINIMUM_LOW_PRIORITY_THREAD_COUNT: usize = 1;
6+
7+
pub struct RayonManager {
8+
/// Smaller rayon thread pool for lower-priority, compute-intensive tasks.
9+
/// By default ~25% of CPUs or a minimum of 1 thread.
10+
pub low_priority_threadpool: Arc<ThreadPool>,
11+
}
12+
13+
impl Default for RayonManager {
14+
fn default() -> Self {
15+
let low_prio_threads =
16+
(num_cpus::get() / DEFAULT_LOW_PRIORITY_DIVISOR).max(MINIMUM_LOW_PRIORITY_THREAD_COUNT);
17+
let low_priority_threadpool = Arc::new(
18+
ThreadPoolBuilder::new()
19+
.num_threads(low_prio_threads)
20+
.build()
21+
.expect("failed to build low-priority rayon pool"),
22+
);
23+
Self {
24+
low_priority_threadpool,
25+
}
26+
}
27+
}

beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ pub struct IgnoredRpcBlock {
173173
}
174174

175175
/// A backfill batch work that has been queued for processing later.
176-
pub struct QueuedBackfillBatch(pub AsyncFn);
176+
pub struct QueuedBackfillBatch(pub BlockingFn);
177177

178178
pub struct QueuedColumnReconstruction {
179179
pub block_root: Hash256,
@@ -1084,7 +1084,7 @@ mod tests {
10841084
// Now queue a backfill sync batch.
10851085
work_reprocessing_tx
10861086
.try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch(
1087-
Box::pin(async {}),
1087+
Box::new(|| {}),
10881088
)))
10891089
.unwrap();
10901090
tokio::task::yield_now().await;

beacon_node/client/src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use beacon_chain::{
1717
store::{HotColdDB, ItemStore, StoreConfig},
1818
};
1919
use beacon_chain::{Kzg, LightClientProducerEvent};
20+
use beacon_processor::rayon_manager::RayonManager;
2021
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
2122
use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths};
2223
use environment::RuntimeContext;
@@ -680,6 +681,7 @@ where
680681
executor: beacon_processor_context.executor.clone(),
681682
current_workers: 0,
682683
config: beacon_processor_config,
684+
rayon_manager: RayonManager::default(),
683685
}
684686
.spawn_manager(
685687
beacon_processor_channels.beacon_processor_rx,

beacon_node/http_api/src/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use beacon_chain::{
55
};
66
use beacon_processor::{
77
BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths,
8+
rayon_manager::RayonManager,
89
};
910
use directory::DEFAULT_ROOT_DIR;
1011
use eth2::{BeaconNodeHttpClient, Timeouts};
@@ -247,6 +248,7 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
247248
executor: test_runtime.task_executor.clone(),
248249
current_workers: 0,
249250
config: beacon_processor_config,
251+
rayon_manager: RayonManager::default(),
250252
}
251253
.spawn_manager(
252254
beacon_processor_rx,

beacon_node/lighthouse_tracing/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block";
2626
pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs";
2727
pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns";
2828
pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
29+
pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill";
2930

3031
/// Fork choice root spans
3132
pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot";
@@ -61,6 +62,7 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[
6162
SPAN_PROCESS_RPC_BLOBS,
6263
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
6364
SPAN_PROCESS_CHAIN_SEGMENT,
65+
SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL,
6466
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST,
6567
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST,
6668
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST,

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ use beacon_chain::data_column_verification::{GossipDataColumnError, observe_goss
66
use beacon_chain::fetch_blobs::{
77
EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs,
88
};
9-
use beacon_chain::{
10-
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
11-
};
9+
use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
1210
use beacon_processor::{
1311
BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
1412
WorkEvent as BeaconWorkEvent,
@@ -500,33 +498,23 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
500498
process_id: ChainSegmentProcessId,
501499
blocks: Vec<RpcBlock<T::EthSpec>>,
502500
) -> Result<(), Error<T::EthSpec>> {
503-
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
504501
debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process");
505-
506502
let processor = self.clone();
507-
let process_fn = async move {
508-
let notify_execution_layer = if processor
509-
.network_globals
510-
.sync_state
511-
.read()
512-
.is_syncing_finalized()
513-
{
514-
NotifyExecutionLayer::No
515-
} else {
516-
NotifyExecutionLayer::Yes
517-
};
518-
processor
519-
.process_chain_segment(process_id, blocks, notify_execution_layer)
520-
.await;
521-
};
522-
let process_fn = Box::pin(process_fn);
523503

524504
// Back-sync batches are dispatched with a different `Work` variant so
525505
// they can be rate-limited.
526-
let work = if is_backfill {
527-
Work::ChainSegmentBackfill(process_fn)
528-
} else {
529-
Work::ChainSegment(process_fn)
506+
let work = match process_id {
507+
ChainSegmentProcessId::RangeBatchId(_, _) => {
508+
let process_fn = async move {
509+
processor.process_chain_segment(process_id, blocks).await;
510+
};
511+
Work::ChainSegment(Box::pin(process_fn))
512+
}
513+
ChainSegmentProcessId::BackSyncBatchId(_) => {
514+
let process_fn =
515+
move || processor.process_chain_segment_backfill(process_id, blocks);
516+
Work::ChainSegmentBackfill(Box::new(process_fn))
517+
}
530518
};
531519

532520
self.try_send(BeaconWorkEvent {

0 commit comments

Comments
 (0)