Skip to content

Commit 4548209

Browse files
authored
perf: rm pending queue from MultiproofManager (#19178)
1 parent f8845c6 commit 4548209

File tree

2 files changed

+125
-55
lines changed

2 files changed

+125
-55
lines changed

crates/engine/tree/src/tree/payload_processor/multiproof.rs

Lines changed: 46 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use reth_trie_parallel::{
2424
root::ParallelStateRootError,
2525
};
2626
use std::{
27-
collections::{BTreeMap, VecDeque},
27+
collections::BTreeMap,
2828
ops::DerefMut,
2929
sync::{
3030
mpsc::{channel, Receiver, Sender},
@@ -34,10 +34,6 @@ use std::{
3434
};
3535
use tracing::{debug, error, instrument, trace};
3636

37-
/// Default upper bound for inflight multiproof calculations. These would be sitting in the queue
38-
/// waiting to be processed.
39-
const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 128;
40-
4137
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
4238
/// state.
4339
#[derive(Default, Debug)]
@@ -337,17 +333,10 @@ impl MultiproofInput {
337333
}
338334

339335
/// Manages concurrent multiproof calculations.
340-
/// Takes care of not having more calculations in flight than a given maximum
341-
/// concurrency, further calculation requests are queued and spawn later, after
342-
/// availability has been signaled.
343336
#[derive(Debug)]
344337
pub struct MultiproofManager {
345-
/// Maximum number of proof calculations allowed to be inflight at once.
346-
inflight_limit: usize,
347338
/// Currently running calculations.
348339
inflight: usize,
349-
/// Queued calculations.
350-
pending: VecDeque<PendingMultiproofTask>,
351340
/// Executor for tasks
352341
executor: WorkloadExecutor,
353342
/// Handle to the proof worker pools (storage and account).
@@ -376,22 +365,16 @@ impl MultiproofManager {
376365
proof_worker_handle: ProofWorkerHandle,
377366
) -> Self {
378367
Self {
379-
pending: VecDeque::with_capacity(DEFAULT_MULTIPROOF_INFLIGHT_LIMIT),
380-
inflight_limit: DEFAULT_MULTIPROOF_INFLIGHT_LIMIT,
381-
executor,
382368
inflight: 0,
369+
executor,
383370
metrics,
384371
proof_worker_handle,
385372
missed_leaves_storage_roots: Default::default(),
386373
}
387374
}
388375

389-
const fn is_full(&self) -> bool {
390-
self.inflight >= self.inflight_limit
391-
}
392-
393-
/// Spawns a new multiproof calculation or enqueues it if the inflight limit is reached.
394-
fn spawn_or_queue(&mut self, input: PendingMultiproofTask) {
376+
/// Spawns a new multiproof calculation.
377+
fn spawn(&mut self, input: PendingMultiproofTask) {
395378
// If there are no proof targets, we can just send an empty multiproof back immediately
396379
if input.proof_targets_is_empty() {
397380
debug!(
@@ -402,27 +385,9 @@ impl MultiproofManager {
402385
return
403386
}
404387

405-
if self.is_full() {
406-
self.pending.push_back(input);
407-
self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
408-
return;
409-
}
410-
411388
self.spawn_multiproof_task(input);
412389
}
413390

414-
/// Signals that a multiproof calculation has finished and there's room to
415-
/// spawn a new calculation if needed.
416-
fn on_calculation_complete(&mut self) {
417-
self.inflight = self.inflight.saturating_sub(1);
418-
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
419-
420-
if let Some(input) = self.pending.pop_front() {
421-
self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
422-
self.spawn_multiproof_task(input);
423-
}
424-
}
425-
426391
/// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage
427392
/// multiproof, and dispatching to `spawn_multiproof` otherwise.
428393
fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask) {
@@ -508,6 +473,24 @@ impl MultiproofManager {
508473

509474
self.inflight += 1;
510475
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
476+
self.metrics
477+
.pending_storage_multiproofs_histogram
478+
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
479+
self.metrics
480+
.pending_account_multiproofs_histogram
481+
.record(self.proof_worker_handle.pending_account_tasks() as f64);
482+
}
483+
484+
/// Signals that a multiproof calculation has finished.
485+
fn on_calculation_complete(&mut self) {
486+
self.inflight = self.inflight.saturating_sub(1);
487+
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
488+
self.metrics
489+
.pending_storage_multiproofs_histogram
490+
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
491+
self.metrics
492+
.pending_account_multiproofs_histogram
493+
.record(self.proof_worker_handle.pending_account_tasks() as f64);
511494
}
512495

513496
/// Spawns a single multiproof calculation task.
@@ -598,6 +581,12 @@ impl MultiproofManager {
598581

599582
self.inflight += 1;
600583
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
584+
self.metrics
585+
.pending_storage_multiproofs_histogram
586+
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
587+
self.metrics
588+
.pending_account_multiproofs_histogram
589+
.record(self.proof_worker_handle.pending_account_tasks() as f64);
601590
}
602591
}
603592

@@ -606,8 +595,10 @@ impl MultiproofManager {
606595
pub(crate) struct MultiProofTaskMetrics {
607596
/// Histogram of inflight multiproofs.
608597
pub inflight_multiproofs_histogram: Histogram,
609-
/// Histogram of pending multiproofs.
610-
pub pending_multiproofs_histogram: Histogram,
598+
/// Histogram of pending storage multiproofs in the queue.
599+
pub pending_storage_multiproofs_histogram: Histogram,
600+
/// Histogram of pending account multiproofs in the queue.
601+
pub pending_account_multiproofs_histogram: Histogram,
611602

612603
/// Histogram of the number of prefetch proof target accounts.
613604
pub prefetch_proof_targets_accounts_histogram: Histogram,
@@ -657,8 +648,7 @@ pub(crate) struct MultiProofTaskMetrics {
657648
#[derive(Debug)]
658649
pub(super) struct MultiProofTask {
659650
/// The size of proof targets chunk to spawn in one calculation.
660-
///
661-
/// If [`None`], then chunking is disabled.
651+
/// If None, chunking is disabled and all targets are processed in a single proof.
662652
chunk_size: Option<usize>,
663653
/// Task configuration.
664654
config: MultiProofConfig,
@@ -738,10 +728,14 @@ impl MultiProofTask {
738728

739729
// Process proof targets in chunks.
740730
let mut chunks = 0;
741-
let should_chunk = !self.multiproof_manager.is_full();
731+
732+
// Only chunk if account or storage workers are available to take advantage of parallelism.
733+
let should_chunk =
734+
self.multiproof_manager.proof_worker_handle.has_available_account_workers() ||
735+
self.multiproof_manager.proof_worker_handle.has_available_storage_workers();
742736

743737
let mut spawn = |proof_targets| {
744-
self.multiproof_manager.spawn_or_queue(
738+
self.multiproof_manager.spawn(
745739
MultiproofInput {
746740
config: self.config.clone(),
747741
source: None,
@@ -873,10 +867,14 @@ impl MultiProofTask {
873867

874868
// Process state updates in chunks.
875869
let mut chunks = 0;
876-
let should_chunk = !self.multiproof_manager.is_full();
877870

878871
let mut spawned_proof_targets = MultiProofTargets::default();
879872

873+
// Only chunk if account or storage workers are available to take advantage of parallelism.
874+
let should_chunk =
875+
self.multiproof_manager.proof_worker_handle.has_available_account_workers() ||
876+
self.multiproof_manager.proof_worker_handle.has_available_storage_workers();
877+
880878
let mut spawn = |hashed_state_update| {
881879
let proof_targets = get_proof_targets(
882880
&hashed_state_update,
@@ -885,7 +883,7 @@ impl MultiProofTask {
885883
);
886884
spawned_proof_targets.extend_ref(&proof_targets);
887885

888-
self.multiproof_manager.spawn_or_queue(
886+
self.multiproof_manager.spawn(
889887
MultiproofInput {
890888
config: self.config.clone(),
891889
source: Some(source),
@@ -954,7 +952,7 @@ impl MultiProofTask {
954952
/// so that the proofs for accounts and storage slots that were already fetched are not
955953
/// requested again.
956954
/// 2. Using the proof targets, a new multiproof is calculated using
957-
/// [`MultiproofManager::spawn_or_queue`].
955+
/// [`MultiproofManager::spawn`].
958956
/// * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
959957
/// sent back to this task along with the original state update.
960958
/// * Otherwise, the multiproof is calculated and the [`MultiProofMessage::ProofCalculated`]

0 commit comments

Comments
 (0)