-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat(txpool): add configurable batch timeout to transaction pool insertions #20661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
feat: add `batch_timeout` configuration for transaction pool insertions. refactor: batcher poll method to continuously process available batches and clarify flush conditions.
a211735 to
16065ff
Compare
CodSpeed Performance ReportMerging #20661 will not alter performanceComparing Summary
Footnotes
|
714cd63 to
edc2c11
Compare
mattsse
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes sense and I#ve thought about this as well because most of the time this ends up just batching 1 single request, but even waiting a bit should be very beneficial here.
left some suggestions and a q re the poll logic
crates/rpc/rpc/src/eth/builder.rs
Outdated
| max_batch_size: usize, | ||
| batch_timeout: Duration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we introude a dedicated Batchconfig type?
| let interval = if batch_timeout.is_zero() { | ||
| None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this an option then instead?
| match this.interval.as_mut().as_pin_mut() { | ||
| // Immediate mode (timeout = 0): zero-cost path, original behavior | ||
| None => loop { | ||
| match this.request_rx.as_mut().poll_recv_many(cx, this.buf, *this.max_batch_size) { | ||
| Poll::Ready(0) => return Poll::Ready(()), // Channel closed | ||
| Poll::Ready(_) => { | ||
| Self::spawn_batch(this.pool, this.buf); | ||
| this.buf.reserve(*this.max_batch_size); | ||
| // continue to check for more requests | ||
| } | ||
| Poll::Pending => return Poll::Pending, | ||
| } | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of this looks a bit too complex
how should this behave exactly?
can we not use just the interval to yield pending if it's not ready yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, thanks for review
…ration and batching modes.
a892bef to
5579b0b
Compare
…e `BatchConfig` struct.
5579b0b to
f324ebf
Compare
mattsse
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to restructure the poll logic a bit so that this behaves like:
- poll_recv many
- if batch empty -> return early
- if batch full -> spawn + continue
- if batch not full and interval is set: poll interval
I'd also like to see some manual poll tests for this
| continue; | ||
| // 1. Collect available requests (non-blocking) | ||
| while this.buf.len() < *this.max_batch_size { | ||
| match this.request_rx.as_mut().poll_recv(cx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should keep using poll_recv_many
|
|
||
| if batch_full || timeout_ready { | ||
| Self::spawn_batch(this.pool, this.buf); | ||
| this.buf.reserve(*this.max_batch_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is wasteful, because this will most likely over allocate
| // 2. Check flush conditions | ||
| let batch_full = this.buf.len() >= *this.max_batch_size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the batch is full we can spawn it immediately and then only need to reset the interval and can skip polling it
| !this.buf.is_empty() | ||
| }; | ||
|
|
||
| if batch_full || timeout_ready { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find timeout_ready a bit confusing here because if no timeout is set, then this represents something else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
…icient request collection and enhance batching logic with new tests.
Introduced a mechanism to forcefully batch transactions for a specified duration using --txpool.batch-timeout.
The previous implementation relied on greedy consumption (poll_recv_many), which processes whatever is available immediately. Under low-to-medium load, this often results in small batches (or single transactions), failing to effectively amortize the cost of acquiring the transaction pool lock.
By introducing a timeout, we can enforce buffering up to
max_batch_sizeorbatch_timeout, significantly improving lock contention and throughput by ensuring larger persistent batches.Implementation Details
Used Optiontokio::time::Interval in BatchTxProcessor to maintain zero-cost overhead for the default immediate mode (timeout=0).
Refactored the poll loop to support both immediate consumptions and time-buffered batching.
Added config propagation and integration tests.