diff --git a/crates/rbuilder/src/live_builder/block_output/bid_value_source/best_bid_sync_source.rs b/crates/rbuilder/src/live_builder/block_output/bid_value_source/best_bid_sync_source.rs index c91ee984e..387237c20 100644 --- a/crates/rbuilder/src/live_builder/block_output/bid_value_source/best_bid_sync_source.rs +++ b/crates/rbuilder/src/live_builder/block_output/bid_value_source/best_bid_sync_source.rs @@ -1,6 +1,7 @@ use super::interfaces::{BidValueObs, BidValueSource}; use alloy_primitives::U256; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; /// Simple struct tracking the last best bid and asking it in a sync way via best_bid_value. pub struct BestBidSyncSource { @@ -30,7 +31,7 @@ impl BestBidSyncSource { } pub fn best_bid_value(&self) -> Option { - *self.best_bid_source_inner.best_bid.lock().unwrap() + *self.best_bid_source_inner.best_bid.lock() } } @@ -41,7 +42,7 @@ struct BestBidSyncSourceInner { impl BidValueObs for BestBidSyncSourceInner { fn update_new_bid(&self, bid: U256) { - let mut best_bid = self.best_bid.lock().unwrap(); + let mut best_bid = self.best_bid.lock(); if best_bid.map_or(true, |old_bid| old_bid < bid) { *best_bid = Some(bid); } diff --git a/crates/rbuilder/src/live_builder/block_output/bidding/parallel_sealer_bid_maker.rs b/crates/rbuilder/src/live_builder/block_output/bidding/parallel_sealer_bid_maker.rs index b8e7ae9c9..4f3d4eeb6 100644 --- a/crates/rbuilder/src/live_builder/block_output/bidding/parallel_sealer_bid_maker.rs +++ b/crates/rbuilder/src/live_builder/block_output/bidding/parallel_sealer_bid_maker.rs @@ -1,4 +1,5 @@ -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use tracing::error; @@ -38,13 +39,13 @@ impl PendingBid { } /// Updates bid, replacing on current (we assume they are always increasing but we don't check it). fn update(&self, bid: Bid) { - let mut current_bid = self.bid.lock().unwrap(); + let mut current_bid = self.bid.lock(); *current_bid = Some(bid); self.bid_notify.notify_one(); } fn consume_bid(&self) -> Option { - let mut current_bid = self.bid.lock().unwrap(); + let mut current_bid = self.bid.lock(); current_bid.take() } } @@ -113,7 +114,7 @@ impl ParallelSealerBidMakerProcess { /// block.finalize_block + self.sink.new_block inside spawn_blocking. async fn check_for_new_bid(&mut self) { - if *self.seal_control.seals_in_progress.lock().unwrap() >= self.max_concurrent_seals { + if *self.seal_control.seals_in_progress.lock() >= self.max_concurrent_seals { return; } if let Some(bid) = self.pending_bid.consume_bid() { @@ -121,7 +122,7 @@ impl ParallelSealerBidMakerProcess { let block = bid.block(); let block_number = block.building_context().block(); // Take sealing "slot" - *self.seal_control.seals_in_progress.lock().unwrap() += 1; + *self.seal_control.seals_in_progress.lock() += 1; let seal_control = self.seal_control.clone(); let sink = self.sink.clone(); tokio::task::spawn_blocking(move || { @@ -134,7 +135,7 @@ impl ParallelSealerBidMakerProcess { ), }; // release sealing "slot" - *seal_control.seals_in_progress.lock().unwrap() -= 1; + *seal_control.seals_in_progress.lock() -= 1; seal_control.notify.notify_one(); }); } diff --git a/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs b/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs index 3c729cb79..08f38cb65 100644 --- a/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs +++ b/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs @@ -1,10 +1,10 @@ -use std::sync::{Arc, Mutex}; +use crate::live_builder::block_output::relay_submit::BlockBuildingSink; +use parking_lot::Mutex; +use std::sync::Arc; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use tracing::error; -use crate::live_builder::block_output::relay_submit::BlockBuildingSink; - use super::interfaces::{Bid, BidMaker}; /// BidMaker with a background task sealing only one bid at a time. @@ -41,14 +41,12 @@ impl PendingBid { } /// Updates bid, replacing on current (we assume they are always increasing but we don't check it). fn update(&self, bid: Bid) { - let mut current_bid = self.bid.lock().unwrap(); - *current_bid = Some(bid); + *self.bid.lock() = Some(bid); self.bid_notify.notify_one(); } fn consume_bid(&self) -> Option { - let mut current_bid = self.bid.lock().unwrap(); - current_bid.take() + self.bid.lock().take() } } diff --git a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs index c0e3a5710..52053b1d0 100644 --- a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs +++ b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs @@ -17,9 +17,10 @@ use crate::{ use ahash::HashMap; use alloy_primitives::{utils::format_ether, U256}; use mockall::automock; +use parking_lot::Mutex; use reth_chainspec::ChainSpec; use reth_primitives::SealedBlock; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tokio::{sync::Notify, time::Instant}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level}; @@ -43,7 +44,7 @@ pub struct BestBlockCell { impl BestBlockCell { pub fn compare_and_update(&self, block: Block) { - let mut best_block = self.block.lock().unwrap(); + let mut best_block = self.block.lock(); let old_value = best_block .as_ref() .map(|b| b.trace.bid_value) @@ -55,7 +56,7 @@ impl BestBlockCell { } pub fn take_best_block(&self) -> Option { - self.block.lock().unwrap().take() + self.block.lock().take() } pub async fn wait_for_change(&self) { diff --git a/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs b/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs index 1a1e0e184..914f7fa64 100644 --- a/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs +++ b/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs @@ -5,12 +5,9 @@ use crate::{ }; use alloy_provider::{IpcConnect, Provider, ProviderBuilder}; use futures::StreamExt; +use parking_lot::Mutex; use reth_provider::StateProviderFactory; -use std::{ - pin::pin, - sync::{Arc, Mutex}, - time::Instant, -}; +use std::{pin::pin, sync::Arc, time::Instant}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; @@ -56,7 +53,7 @@ where } }; - let mut orderpool = orderpool.lock().unwrap(); + let mut orderpool = orderpool.lock(); let start = Instant::now(); orderpool.head_updated(block_number, &state); diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs index d3ed52078..c807f31c0 100644 --- a/crates/rbuilder/src/live_builder/order_input/mod.rs +++ b/crates/rbuilder/src/live_builder/order_input/mod.rs @@ -14,13 +14,9 @@ use self::{ }; use crate::primitives::{serialize::CancelShareBundle, BundleReplacementKey, Order}; use jsonrpsee::RpcModule; +use parking_lot::Mutex; use reth_provider::StateProviderFactory; -use std::{ - net::Ipv4Addr, - path::PathBuf, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{net::Ipv4Addr, path::PathBuf, sync::Arc, time::Duration}; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{info, trace, warn}; @@ -39,14 +35,14 @@ impl OrderPoolSubscriber { block_number: u64, sink: Box, ) -> OrderPoolSubscriptionId { - self.orderpool.lock().unwrap().add_sink(block_number, sink) + self.orderpool.lock().add_sink(block_number, sink) } pub fn remove_sink( &self, id: &OrderPoolSubscriptionId, ) -> Option> { - self.orderpool.lock().unwrap().remove_sink(id) + self.orderpool.lock().remove_sink(id) } /// Returned AutoRemovingOrderPoolSubscriptionId will call remove when dropped @@ -72,7 +68,7 @@ pub struct AutoRemovingOrderPoolSubscriptionId { impl Drop for AutoRemovingOrderPoolSubscriptionId { fn drop(&mut self) { - self.orderpool.lock().unwrap().remove_sink(&self.id); + self.orderpool.lock().remove_sink(&self.id); } } @@ -272,7 +268,7 @@ where } { - let mut orderpool = orderpool.lock().unwrap(); + let mut orderpool = orderpool.lock(); orderpool.process_commands(new_commands.clone()); } new_commands.clear(); diff --git a/crates/rbuilder/src/live_builder/simulation/mod.rs b/crates/rbuilder/src/live_builder/simulation/mod.rs index 5e17a06da..8052d0004 100644 --- a/crates/rbuilder/src/live_builder/simulation/mod.rs +++ b/crates/rbuilder/src/live_builder/simulation/mod.rs @@ -11,9 +11,10 @@ use crate::{ utils::gen_uid, }; use ahash::HashMap; +use parking_lot::Mutex; use reth_provider::StateProviderFactory; use simulation_job::SimulationJob; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{info_span, Instrument}; @@ -118,7 +119,7 @@ where let (sim_req_sender, sim_req_receiver) = flume::unbounded(); let (sim_results_sender, sim_results_receiver) = mpsc::channel(1024); { - let mut contexts = current_contexts.lock().unwrap(); + let mut contexts = current_contexts.lock(); let sim_context = SimulationContext { block_ctx: ctx, requests: sim_req_receiver, @@ -139,7 +140,7 @@ where // clean up { - let mut contexts = current_contexts.lock().unwrap(); + let mut contexts = current_contexts.lock(); contexts.contexts.remove(&block_context); } } @@ -147,7 +148,7 @@ where ); { - let mut tasks = self.running_tasks.lock().unwrap(); + let mut tasks = self.running_tasks.lock(); tasks.retain(|handle| !handle.is_finished()); tasks.push(handle); } diff --git a/crates/rbuilder/src/live_builder/simulation/sim_worker.rs b/crates/rbuilder/src/live_builder/simulation/sim_worker.rs index d37342b19..4c572dc2a 100644 --- a/crates/rbuilder/src/live_builder/simulation/sim_worker.rs +++ b/crates/rbuilder/src/live_builder/simulation/sim_worker.rs @@ -7,10 +7,11 @@ use crate::{ telemetry, telemetry::add_sim_thread_utilisation_timings, }; +use parking_lot::Mutex; use reth::revm::cached::CachedReads; use reth_provider::StateProviderFactory; use std::{ - sync::{Arc, Mutex}, + sync::Arc, thread::sleep, time::{Duration, Instant}, }; @@ -34,7 +35,7 @@ pub fn run_sim_worker

( } let current_sim_context = loop { let next_ctx = { - let ctxs = ctx.lock().unwrap(); + let ctxs = ctx.lock(); ctxs.contexts.iter().next().map(|(_, c)| c.clone()) }; // @Perf chose random context so its more fair when we have 2 instead of 1 diff --git a/crates/rbuilder/src/telemetry/dynamic_logs.rs b/crates/rbuilder/src/telemetry/dynamic_logs.rs index 303a75c98..c88aed81c 100644 --- a/crates/rbuilder/src/telemetry/dynamic_logs.rs +++ b/crates/rbuilder/src/telemetry/dynamic_logs.rs @@ -1,11 +1,8 @@ //! This module provides a functionality to dynamically change the log level use lazy_static::lazy_static; -use std::{ - fs::File, - path::PathBuf, - sync::{Arc, Mutex}, -}; +use parking_lot::Mutex; +use std::{fs::File, path::PathBuf, sync::Arc}; use tracing_subscriber::{ filter::Filtered, fmt, layer::SubscriberExt, reload, reload::Handle, util::SubscriberInitExt, EnvFilter, Layer, Registry, @@ -41,13 +38,13 @@ impl Default for LoggerConfig { } pub fn default_log_config() -> LoggerConfig { - DEFAULT_CONFIG.lock().unwrap().clone() + DEFAULT_CONFIG.lock().clone() } /// Reloads the log layer with the provided config pub fn set_log_config(config: LoggerConfig) -> eyre::Result<()> { let (env, write_layer) = create_filter_and_write_layer(&config)?; - let handle = RELOAD_HANDLE.lock().unwrap(); + let handle = RELOAD_HANDLE.lock(); handle .as_ref() .ok_or_else(|| eyre::eyre!("tracing subscriber is not set up"))? @@ -69,7 +66,7 @@ pub fn reset_log_config() -> eyre::Result<()> { /// To reload env filter, use `set_env_filter` and `reset_env_filter` functions pub fn setup_reloadable_tracing_subscriber(config: LoggerConfig) -> eyre::Result<()> { { - let mut default_config = DEFAULT_CONFIG.lock().unwrap(); + let mut default_config = DEFAULT_CONFIG.lock(); *default_config = config.clone(); } @@ -78,7 +75,7 @@ pub fn setup_reloadable_tracing_subscriber(config: LoggerConfig) -> eyre::Result let (reload_layer, reload_handle) = reload::Layer::new(layer); { - let mut handle = RELOAD_HANDLE.lock().unwrap(); + let mut handle = RELOAD_HANDLE.lock(); *handle = Some(reload_handle); } tracing_subscriber::registry().with(reload_layer).init(); diff --git a/crates/rbuilder/src/utils/error_storage.rs b/crates/rbuilder/src/utils/error_storage.rs index 25bf8d79e..dab335cbf 100644 --- a/crates/rbuilder/src/utils/error_storage.rs +++ b/crates/rbuilder/src/utils/error_storage.rs @@ -4,12 +4,9 @@ use crossbeam_queue::ArrayQueue; use lazy_static::lazy_static; +use parking_lot::Mutex; use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, Executor, SqliteConnection}; -use std::{ - path::Path, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{path::Path, sync::Arc, time::Duration}; use tokio_util::sync::CancellationToken; use tracing::{error, info_span, warn}; @@ -36,7 +33,7 @@ lazy_static! { } fn event_queue() -> Option>> { - EVENT_QUEUE.lock().unwrap().clone() + EVENT_QUEUE.lock().clone() } /// Spawn a new error storage writer. @@ -46,7 +43,7 @@ pub async fn spawn_error_storage_writer( global_cancel: CancellationToken, ) -> eyre::Result<()> { let mut storage = ErrorEventStorage::new_from_path(db_path).await?; - *EVENT_QUEUE.lock().unwrap() = Some(Arc::new(ArrayQueue::new(MAX_PENDING_EVENTS))); + *EVENT_QUEUE.lock() = Some(Arc::new(ArrayQueue::new(MAX_PENDING_EVENTS))); tokio::spawn(async move { while !global_cancel.is_cancelled() { if let Some(event_queue) = event_queue() { diff --git a/crates/rbuilder/src/utils/noncer.rs b/crates/rbuilder/src/utils/noncer.rs index 6e927e854..2fa7b218a 100644 --- a/crates/rbuilder/src/utils/noncer.rs +++ b/crates/rbuilder/src/utils/noncer.rs @@ -1,9 +1,10 @@ use ahash::HashMap; use alloy_primitives::{Address, B256}; +use parking_lot::Mutex; use reth::providers::StateProviderBox; use reth_errors::ProviderResult; use reth_provider::StateProviderFactory; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; /// Struct to get nonces for Addresses, caching the results. /// NonceCache contains the data (but doesn't allow you to query it) and NonceCacheRef is a reference that allows you to query it. @@ -48,7 +49,7 @@ pub struct NonceCacheRef { impl NonceCacheRef { pub fn nonce(&self, address: Address) -> ProviderResult { - let mut cache = self.cache.lock().unwrap(); + let mut cache = self.cache.lock(); if let Some(nonce) = cache.get(&address) { return Ok(*nonce); } diff --git a/crates/rbuilder/src/utils/provider_factory_reopen.rs b/crates/rbuilder/src/utils/provider_factory_reopen.rs index 1e4c674db..997e23825 100644 --- a/crates/rbuilder/src/utils/provider_factory_reopen.rs +++ b/crates/rbuilder/src/utils/provider_factory_reopen.rs @@ -1,6 +1,7 @@ use crate::telemetry::{inc_provider_bad_reopen_counter, inc_provider_reopen_counter}; use alloy_eips::{BlockNumHash, BlockNumberOrTag}; use alloy_primitives::{BlockHash, BlockNumber}; +use parking_lot::{Mutex, RwLock}; use reth::providers::{BlockHashReader, ChainSpecProvider, ProviderFactory}; use reth_chainspec::ChainInfo; use reth_db::{Database, DatabaseError}; @@ -13,11 +14,7 @@ use reth_provider::{ HeaderProvider, StateProviderBox, StateProviderFactory, StaticFileProviderFactory, }; use revm_primitives::{B256, U256}; -use std::{ - ops::RangeBounds, - path::PathBuf, - sync::{Arc, Mutex, RwLock}, -}; +use std::{ops::RangeBounds, path::PathBuf, sync::Arc}; use tracing::debug; /// This struct is used as a workaround for https://github.com/paradigmxyz/reth/issues/7836 @@ -71,7 +68,7 @@ impl ProviderFactoryReopener /// This will currently available provider factory without verifying if its correct, it can be used /// when consistency is not absolutely required pub fn provider_factory_unchecked(&self) -> ProviderFactory { - self.provider_factory.lock().unwrap().clone() + self.provider_factory.lock().clone() } /// This will check if historical block hashes for the given block is correct and if not it will reopen @@ -85,13 +82,10 @@ impl ProviderFactoryReopener .provider_factory_unchecked() .last_block_number() .map_err(|err| eyre::eyre!("Error getting best block number: {:?}", err))?; - let mut provider_factory = self.provider_factory.lock().unwrap(); + let mut provider_factory = self.provider_factory.lock(); // Don't need to check consistency for the block that was just checked. - let last_consistent_block_guard = self.last_consistent_block.read().unwrap(); - let last_consistent_block = *last_consistent_block_guard; - // Drop before write might be attempted to avoid deadlock! - drop(last_consistent_block_guard); + let last_consistent_block = *self.last_consistent_block.read(); if !self.testing_mode && last_consistent_block != Some(best_block_number) { match check_provider_factory_health(best_block_number, &provider_factory) { Ok(()) => {} @@ -120,8 +114,7 @@ impl ProviderFactoryReopener } } - let mut last_consistent_block = self.last_consistent_block.write().unwrap(); - *last_consistent_block = Some(best_block_number); + *self.last_consistent_block.write() = Some(best_block_number); } Ok(provider_factory.clone()) } diff --git a/crates/transaction-pool-bundle-ext/bundle_pool_ops/rbuilder/src/lib.rs b/crates/transaction-pool-bundle-ext/bundle_pool_ops/rbuilder/src/lib.rs index 0cdff24ef..c66bc3a0d 100644 --- a/crates/transaction-pool-bundle-ext/bundle_pool_ops/rbuilder/src/lib.rs +++ b/crates/transaction-pool-bundle-ext/bundle_pool_ops/rbuilder/src/lib.rs @@ -9,7 +9,6 @@ use std::{fmt::Formatter, path::Path, sync::Arc, time::Duration}; use alloy_primitives::U256; use alloy_rpc_types_beacon::events::PayloadAttributesEvent; use derive_more::From; -use rbuilder::live_builder::cli::LiveBuilderConfig; use rbuilder::{ building::{ builders::{ @@ -20,6 +19,7 @@ use rbuilder::{ }, live_builder::{ base_config::load_config_toml_and_env, + cli::LiveBuilderConfig, config::{create_builders, BuilderConfig, Config, SpecificBuilderConfig}, order_input::{rpc_server::RawCancelBundle, ReplaceableOrderPoolCommand}, payload_events::MevBoostSlotData,