@@ -146,7 +146,6 @@ where
146146 let _enter = debug_span ! ( target: "engine::tree::payload_processor::prewarm" , parent: span, "spawn_all" ) . entered ( ) ;
147147
148148 let ( done_tx, done_rx) = mpsc:: channel ( ) ;
149- let mut executing = 0usize ;
150149
151150 // When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
152151 // max workers to handle potentially many transactions in parallel rather
@@ -165,62 +164,44 @@ where
165164 handles. push ( ctx. spawn_worker ( i, & executor, actions_tx. clone ( ) , done_tx. clone ( ) ) ) ;
166165 }
167166
167+ // Distribute transactions to workers
168168 let mut tx_index = 0usize ;
169+ while let Ok ( tx) = pending. recv ( ) {
170+ // Stop distributing if termination was requested
171+ if ctx. terminate_execution . load ( Ordering :: Relaxed ) {
172+ trace ! (
173+ target: "engine::tree::payload_processor::prewarm" ,
174+ "Termination requested, stopping transaction distribution"
175+ ) ;
176+ break ;
177+ }
178+
179+ let indexed_tx = IndexedTransaction { index : tx_index, tx } ;
180+ let is_system_tx = indexed_tx. tx . tx ( ) . ty ( ) > MAX_STANDARD_TX_TYPE ;
169181
170- // Handle first transaction - special case for system transactions
171- if let Ok ( first_tx) = pending. recv ( ) {
172- // Move the transaction into the indexed wrapper to avoid an extra clone
173- let indexed_tx = IndexedTransaction { index : tx_index, tx : first_tx } ;
174- // Compute metadata from the moved value
175- let tx_ref = indexed_tx. tx . tx ( ) ;
176- let is_system_tx = tx_ref. ty ( ) > MAX_STANDARD_TX_TYPE ;
177- let first_tx_hash = tx_ref. tx_hash ( ) ;
178-
179- // Check if this is a system transaction (type > 4)
180- // System transactions in the first position typically set critical metadata
181- // that affects all subsequent transactions (e.g., L1 block info, fees on L2s).
182- if is_system_tx {
183- // Broadcast system transaction to all workers to ensure they have the
184- // critical state. This is particularly important for L2s like Optimism
185- // where the first deposit transaction contains essential block metadata.
182+ // System transactions (type > 4) in the first position set critical metadata
183+ // that affects all subsequent transactions (e.g., L1 block info on L2s).
184+ // Broadcast the first system transaction to all workers to ensure they have
185+ // the critical state. This is particularly important for L2s like Optimism
186+ // where the first deposit transaction (type 126) contains essential block metadata.
187+ if tx_index == 0 && is_system_tx {
186188 for handle in & handles {
187- if let Err ( err) = handle. send ( indexed_tx. clone ( ) ) {
188- warn ! (
189- target: "engine::tree::payload_processor::prewarm" ,
190- tx_hash = %first_tx_hash,
191- error = %err,
192- "Failed to send deposit transaction to worker"
193- ) ;
194- }
189+ // Ignore send errors: workers listen to terminate_execution and may
190+ // exit early when signaled. Sending to a disconnected worker is
191+ // possible and harmless and should happen at most once due to
192+ // the terminate_execution check above.
193+ let _ = handle. send ( indexed_tx. clone ( ) ) ;
195194 }
196195 } else {
197- // Not a deposit, send to first worker via round-robin
198- if let Err ( err) = handles[ 0 ] . send ( indexed_tx) {
199- warn ! (
200- target: "engine::tree::payload_processor::prewarm" ,
201- task_idx = 0 ,
202- error = %err,
203- "Failed to send transaction to worker"
204- ) ;
205- }
196+ // Round-robin distribution for all other transactions
197+ let worker_idx = tx_index % workers_needed;
198+ // Ignore send errors: workers listen to terminate_execution and may
199+ // exit early when signaled. Sending to a disconnected worker is
200+ // possible and harmless and should happen at most once due to
201+ // the terminate_execution check above.
202+ let _ = handles[ worker_idx] . send ( indexed_tx) ;
206203 }
207- executing += 1 ;
208- tx_index += 1 ;
209- }
210204
211- // Process remaining transactions with round-robin distribution
212- while let Ok ( executable) = pending. recv ( ) {
213- let indexed_tx = IndexedTransaction { index : tx_index, tx : executable } ;
214- let task_idx = executing % workers_needed;
215- if let Err ( err) = handles[ task_idx] . send ( indexed_tx) {
216- warn ! (
217- target: "engine::tree::payload_processor::prewarm" ,
218- task_idx,
219- error = %err,
220- "Failed to send transaction to worker"
221- ) ;
222- }
223- executing += 1 ;
224205 tx_index += 1 ;
225206 }
226207
@@ -230,7 +211,7 @@ where
230211 while done_rx. recv ( ) . is_ok ( ) { }
231212
232213 let _ = actions_tx
233- . send ( PrewarmTaskEvent :: FinishedTxExecution { executed_transactions : executing } ) ;
214+ . send ( PrewarmTaskEvent :: FinishedTxExecution { executed_transactions : tx_index } ) ;
234215 } ) ;
235216 }
236217
0 commit comments