Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::interfaces::{BidValueObs, BidValueSource};
use alloy_primitives::U256;
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;

/// Simple struct tracking the last best bid and asking it in a sync way via best_bid_value.
pub struct BestBidSyncSource {
Expand Down Expand Up @@ -30,7 +31,7 @@ impl BestBidSyncSource {
}

pub fn best_bid_value(&self) -> Option<U256> {
*self.best_bid_source_inner.best_bid.lock().unwrap()
*self.best_bid_source_inner.best_bid.lock()
}
}

Expand All @@ -41,7 +42,7 @@ struct BestBidSyncSourceInner {

impl BidValueObs for BestBidSyncSourceInner {
fn update_new_bid(&self, bid: U256) {
let mut best_bid = self.best_bid.lock().unwrap();
let mut best_bid = self.best_bid.lock();
if best_bid.map_or(true, |old_bid| old_bid < bid) {
*best_bid = Some(bid);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tracing::error;
Expand Down Expand Up @@ -38,13 +39,13 @@ impl PendingBid {
}
/// Updates bid, replacing on current (we assume they are always increasing but we don't check it).
fn update(&self, bid: Bid) {
let mut current_bid = self.bid.lock().unwrap();
let mut current_bid = self.bid.lock();
*current_bid = Some(bid);
self.bid_notify.notify_one();
}

fn consume_bid(&self) -> Option<Bid> {
let mut current_bid = self.bid.lock().unwrap();
let mut current_bid = self.bid.lock();
current_bid.take()
}
}
Expand Down Expand Up @@ -113,15 +114,15 @@ impl ParallelSealerBidMakerProcess {

/// block.finalize_block + self.sink.new_block inside spawn_blocking.
async fn check_for_new_bid(&mut self) {
if *self.seal_control.seals_in_progress.lock().unwrap() >= self.max_concurrent_seals {
if *self.seal_control.seals_in_progress.lock() >= self.max_concurrent_seals {
return;
}
if let Some(bid) = self.pending_bid.consume_bid() {
let payout_tx_val = bid.payout_tx_value();
let block = bid.block();
let block_number = block.building_context().block();
// Take sealing "slot"
*self.seal_control.seals_in_progress.lock().unwrap() += 1;
*self.seal_control.seals_in_progress.lock() += 1;
let seal_control = self.seal_control.clone();
let sink = self.sink.clone();
tokio::task::spawn_blocking(move || {
Expand All @@ -134,7 +135,7 @@ impl ParallelSealerBidMakerProcess {
),
};
// release sealing "slot"
*seal_control.seals_in_progress.lock().unwrap() -= 1;
*seal_control.seals_in_progress.lock() -= 1;
seal_control.notify.notify_one();
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::{Arc, Mutex};
use crate::live_builder::block_output::relay_submit::BlockBuildingSink;
use parking_lot::Mutex;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tracing::error;

use crate::live_builder::block_output::relay_submit::BlockBuildingSink;

use super::interfaces::{Bid, BidMaker};

/// BidMaker with a background task sealing only one bid at a time.
Expand Down Expand Up @@ -41,14 +41,12 @@ impl PendingBid {
}
/// Updates bid, replacing on current (we assume they are always increasing but we don't check it).
fn update(&self, bid: Bid) {
let mut current_bid = self.bid.lock().unwrap();
*current_bid = Some(bid);
*self.bid.lock() = Some(bid);
self.bid_notify.notify_one();
}

fn consume_bid(&self) -> Option<Bid> {
let mut current_bid = self.bid.lock().unwrap();
current_bid.take()
self.bid.lock().take()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use crate::{
use ahash::HashMap;
use alloy_primitives::{utils::format_ether, U256};
use mockall::automock;
use parking_lot::Mutex;
use reth_chainspec::ChainSpec;
use reth_primitives::SealedBlock;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::{sync::Notify, time::Instant};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level};
Expand All @@ -43,7 +44,7 @@ pub struct BestBlockCell {

impl BestBlockCell {
pub fn compare_and_update(&self, block: Block) {
let mut best_block = self.block.lock().unwrap();
let mut best_block = self.block.lock();
let old_value = best_block
.as_ref()
.map(|b| b.trace.bid_value)
Expand All @@ -55,7 +56,7 @@ impl BestBlockCell {
}

pub fn take_best_block(&self) -> Option<Block> {
self.block.lock().unwrap().take()
self.block.lock().take()
}

pub async fn wait_for_change(&self) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ use crate::{
};
use alloy_provider::{IpcConnect, Provider, ProviderBuilder};
use futures::StreamExt;
use parking_lot::Mutex;
use reth_provider::StateProviderFactory;
use std::{
pin::pin,
sync::{Arc, Mutex},
time::Instant,
};
use std::{pin::pin, sync::Arc, time::Instant};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -56,7 +53,7 @@ where
}
};

let mut orderpool = orderpool.lock().unwrap();
let mut orderpool = orderpool.lock();
let start = Instant::now();

orderpool.head_updated(block_number, &state);
Expand Down
16 changes: 6 additions & 10 deletions crates/rbuilder/src/live_builder/order_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@ use self::{
};
use crate::primitives::{serialize::CancelShareBundle, BundleReplacementKey, Order};
use jsonrpsee::RpcModule;
use parking_lot::Mutex;
use reth_provider::StateProviderFactory;
use std::{
net::Ipv4Addr,
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
use std::{net::Ipv4Addr, path::PathBuf, sync::Arc, time::Duration};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{info, trace, warn};
Expand All @@ -39,14 +35,14 @@ impl OrderPoolSubscriber {
block_number: u64,
sink: Box<dyn ReplaceableOrderSink>,
) -> OrderPoolSubscriptionId {
self.orderpool.lock().unwrap().add_sink(block_number, sink)
self.orderpool.lock().add_sink(block_number, sink)
}

pub fn remove_sink(
&self,
id: &OrderPoolSubscriptionId,
) -> Option<Box<dyn ReplaceableOrderSink>> {
self.orderpool.lock().unwrap().remove_sink(id)
self.orderpool.lock().remove_sink(id)
}

/// Returned AutoRemovingOrderPoolSubscriptionId will call remove when dropped
Expand All @@ -72,7 +68,7 @@ pub struct AutoRemovingOrderPoolSubscriptionId {

impl Drop for AutoRemovingOrderPoolSubscriptionId {
fn drop(&mut self) {
self.orderpool.lock().unwrap().remove_sink(&self.id);
self.orderpool.lock().remove_sink(&self.id);
}
}

Expand Down Expand Up @@ -272,7 +268,7 @@ where
}

{
let mut orderpool = orderpool.lock().unwrap();
let mut orderpool = orderpool.lock();
orderpool.process_commands(new_commands.clone());
}
new_commands.clear();
Expand Down
9 changes: 5 additions & 4 deletions crates/rbuilder/src/live_builder/simulation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use crate::{
utils::gen_uid,
};
use ahash::HashMap;
use parking_lot::Mutex;
use reth_provider::StateProviderFactory;
use simulation_job::SimulationJob;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, Instrument};
Expand Down Expand Up @@ -118,7 +119,7 @@ where
let (sim_req_sender, sim_req_receiver) = flume::unbounded();
let (sim_results_sender, sim_results_receiver) = mpsc::channel(1024);
{
let mut contexts = current_contexts.lock().unwrap();
let mut contexts = current_contexts.lock();
let sim_context = SimulationContext {
block_ctx: ctx,
requests: sim_req_receiver,
Expand All @@ -139,15 +140,15 @@ where

// clean up
{
let mut contexts = current_contexts.lock().unwrap();
let mut contexts = current_contexts.lock();
contexts.contexts.remove(&block_context);
}
}
.instrument(span),
);

{
let mut tasks = self.running_tasks.lock().unwrap();
let mut tasks = self.running_tasks.lock();
tasks.retain(|handle| !handle.is_finished());
tasks.push(handle);
}
Expand Down
5 changes: 3 additions & 2 deletions crates/rbuilder/src/live_builder/simulation/sim_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use crate::{
telemetry,
telemetry::add_sim_thread_utilisation_timings,
};
use parking_lot::Mutex;
use reth::revm::cached::CachedReads;
use reth_provider::StateProviderFactory;
use std::{
sync::{Arc, Mutex},
sync::Arc,
thread::sleep,
time::{Duration, Instant},
};
Expand All @@ -34,7 +35,7 @@ pub fn run_sim_worker<P>(
}
let current_sim_context = loop {
let next_ctx = {
let ctxs = ctx.lock().unwrap();
let ctxs = ctx.lock();
ctxs.contexts.iter().next().map(|(_, c)| c.clone())
};
// @Perf chose random context so its more fair when we have 2 instead of 1
Expand Down
15 changes: 6 additions & 9 deletions crates/rbuilder/src/telemetry/dynamic_logs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
//! This module provides a functionality to dynamically change the log level

use lazy_static::lazy_static;
use std::{
fs::File,
path::PathBuf,
sync::{Arc, Mutex},
};
use parking_lot::Mutex;
use std::{fs::File, path::PathBuf, sync::Arc};
use tracing_subscriber::{
filter::Filtered, fmt, layer::SubscriberExt, reload, reload::Handle, util::SubscriberInitExt,
EnvFilter, Layer, Registry,
Expand Down Expand Up @@ -41,13 +38,13 @@ impl Default for LoggerConfig {
}

pub fn default_log_config() -> LoggerConfig {
DEFAULT_CONFIG.lock().unwrap().clone()
DEFAULT_CONFIG.lock().clone()
}

/// Reloads the log layer with the provided config
pub fn set_log_config(config: LoggerConfig) -> eyre::Result<()> {
let (env, write_layer) = create_filter_and_write_layer(&config)?;
let handle = RELOAD_HANDLE.lock().unwrap();
let handle = RELOAD_HANDLE.lock();
handle
.as_ref()
.ok_or_else(|| eyre::eyre!("tracing subscriber is not set up"))?
Expand All @@ -69,7 +66,7 @@ pub fn reset_log_config() -> eyre::Result<()> {
/// To reload env filter, use `set_env_filter` and `reset_env_filter` functions
pub fn setup_reloadable_tracing_subscriber(config: LoggerConfig) -> eyre::Result<()> {
{
let mut default_config = DEFAULT_CONFIG.lock().unwrap();
let mut default_config = DEFAULT_CONFIG.lock();
*default_config = config.clone();
}

Expand All @@ -78,7 +75,7 @@ pub fn setup_reloadable_tracing_subscriber(config: LoggerConfig) -> eyre::Result
let (reload_layer, reload_handle) = reload::Layer::new(layer);

{
let mut handle = RELOAD_HANDLE.lock().unwrap();
let mut handle = RELOAD_HANDLE.lock();
*handle = Some(reload_handle);
}
tracing_subscriber::registry().with(reload_layer).init();
Expand Down
11 changes: 4 additions & 7 deletions crates/rbuilder/src/utils/error_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@

use crossbeam_queue::ArrayQueue;
use lazy_static::lazy_static;
use parking_lot::Mutex;
use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, Executor, SqliteConnection};
use std::{
path::Path,
sync::{Arc, Mutex},
time::Duration,
};
use std::{path::Path, sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::{error, info_span, warn};

Expand All @@ -36,7 +33,7 @@ lazy_static! {
}

fn event_queue() -> Option<Arc<ArrayQueue<ErrorEvent>>> {
EVENT_QUEUE.lock().unwrap().clone()
EVENT_QUEUE.lock().clone()
}

/// Spawn a new error storage writer.
Expand All @@ -46,7 +43,7 @@ pub async fn spawn_error_storage_writer(
global_cancel: CancellationToken,
) -> eyre::Result<()> {
let mut storage = ErrorEventStorage::new_from_path(db_path).await?;
*EVENT_QUEUE.lock().unwrap() = Some(Arc::new(ArrayQueue::new(MAX_PENDING_EVENTS)));
*EVENT_QUEUE.lock() = Some(Arc::new(ArrayQueue::new(MAX_PENDING_EVENTS)));
tokio::spawn(async move {
while !global_cancel.is_cancelled() {
if let Some(event_queue) = event_queue() {
Expand Down
5 changes: 3 additions & 2 deletions crates/rbuilder/src/utils/noncer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use ahash::HashMap;
use alloy_primitives::{Address, B256};
use parking_lot::Mutex;
use reth::providers::StateProviderBox;
use reth_errors::ProviderResult;
use reth_provider::StateProviderFactory;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

/// Struct to get nonces for Addresses, caching the results.
/// NonceCache contains the data (but doesn't allow you to query it) and NonceCacheRef is a reference that allows you to query it.
Expand Down Expand Up @@ -48,7 +49,7 @@ pub struct NonceCacheRef {

impl NonceCacheRef {
pub fn nonce(&self, address: Address) -> ProviderResult<u64> {
let mut cache = self.cache.lock().unwrap();
let mut cache = self.cache.lock();
if let Some(nonce) = cache.get(&address) {
return Ok(*nonce);
}
Expand Down
Loading
Loading