diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index 4df75facf80..381ccee5bea 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -56,8 +56,6 @@ jobs: - uses: actions/checkout@v6 with: - submodules: 'recursive' - with: submodules: 'recursive' token: ${{ secrets.ACCESS_TOKEN }} diff --git a/bam-local-cluster/src/cluster_manager.rs b/bam-local-cluster/src/cluster_manager.rs index b5e32a6349f..49fd7e46c61 100644 --- a/bam-local-cluster/src/cluster_manager.rs +++ b/bam-local-cluster/src/cluster_manager.rs @@ -370,7 +370,10 @@ impl BamLocalCluster { solana_local_cluster::local_cluster::DEFAULT_MINT_LAMPORTS, &vote_keypairs, stakes, - ClusterType::MainnetBeta, + ClusterType::Development, // don't use mainnet, since we de-dupe local tvu ip + // addresses, every validator has TVU IP of 127.0.0.1 + + // see: https://github.com/jito-foundation/jito-solana/blob/ba3cfa5fe84ac1061427aa25e2a3e8e6bb7a5914/turbine/src/cluster_nodes.rs#L389-L392 ); let runtime = Runtime::new().expect("Could not create Tokio runtime"); diff --git a/core/src/bam_manager.rs b/core/src/bam_manager.rs index 856c58c3e27..58684349a85 100644 --- a/core/src/bam_manager.rs +++ b/core/src/bam_manager.rs @@ -135,7 +135,7 @@ impl BamManager { let url = bam_url.lock().unwrap().clone(); if let Some(url) = url { let result = runtime.block_on(BamConnection::try_init( - url, + url.clone(), dependencies.cluster_info.clone(), dependencies.batch_sender.clone(), dependencies.outbound_receiver.clone(), @@ -165,7 +165,7 @@ impl BamManager { } } Err(e) => { - error!("Failed to connect to BAM: {e}"); + error!("Failed to connect to BAM with url: {url}: {e}"); } } } diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index a569dbeffc1..4ebeafa870e 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -296,7 +296,7 @@ impl ConsumeWorker { return FinishedConsumeWorkExtraInfo { processed_results: vec![ TransactionResult::NotCommitted( - NotCommittedReason::PohTimeout, + NotCommittedReason::PohTimeout, // Note: ChannelFull, ChannelDisconnected, MaxHeightReached are misreported as PohTimeout ); work.transactions.len() ], diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index cfac8bd447a..83d9b032821 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -146,12 +146,16 @@ pub struct Consumer { struct SeqNotConflictBatchReusables { aggregate_write_locks: AHashSet, aggregate_read_locks: AHashSet, + transaction_write_locks: Vec, + transaction_read_locks: Vec, } impl SeqNotConflictBatchReusables { pub fn clear(&mut self) { self.aggregate_write_locks.clear(); self.aggregate_read_locks.clear(); + self.transaction_write_locks.clear(); + self.transaction_read_locks.clear(); } } @@ -528,7 +532,7 @@ impl Consumer { .zip(batch.sanitized_transactions()) .filter_map(|(processing_result, tx)| { if processing_result.was_processed() { - Some(tx.to_versioned_transaction()) + Some((tx.to_versioned_transaction(), tx)) } else { None } @@ -541,14 +545,13 @@ impl Consumer { let mut reusables = self.seq_not_conflict_batch_reusables.take(); // Entries do **not** yet support conflicting transactions. To get around this we create - // a lists of transactions that are non-conflicting to shred out into entries. If we don't do - // this then blocks are rejected by consensus/replay. + // lists of transactions that are non-conflicting to shred out into entries. If we don't do + // this, then blocks are rejected by consensus/replay. let batches = Self::create_sequential_non_conflicting_batches( &mut reusables, - processed_transactions - .into_iter() - .zip(batch.sanitized_transactions().iter()), + processed_transactions.into_iter(), ); + self.seq_not_conflict_batch_reusables.set(reusables); let hashes = batches .iter() .map(|batch| hash_transactions(batch)) @@ -652,9 +655,9 @@ impl Consumer { } } - fn create_sequential_non_conflicting_batches<'a>( + fn create_sequential_non_conflicting_batches<'a, T: TransactionWithMeta + 'a>( reusables: &mut SeqNotConflictBatchReusables, - transactions: impl Iterator, + processed_transactions: impl Iterator, ) -> Vec> { let mut result = vec![]; let mut current_batch = vec![]; @@ -662,31 +665,39 @@ impl Consumer { let SeqNotConflictBatchReusables { aggregate_write_locks, aggregate_read_locks, + transaction_write_locks, + transaction_read_locks, } = reusables; - for (transaction, transaction_info) in transactions { - let account_keys = transaction_info.account_keys(); - let write_account_locks = account_keys - .iter() - .enumerate() - .filter_map(|(index, key)| transaction_info.is_writable(index).then_some(key)); - let read_account_locks = account_keys - .iter() - .enumerate() - .filter_map(|(index, key)| (!transaction_info.is_writable(index)).then_some(key)); - let has_contention = write_account_locks.clone().any(|key| { - aggregate_write_locks.contains(key) || aggregate_read_locks.contains(key) - }) || read_account_locks - .clone() - .any(|key| aggregate_write_locks.contains(key)); + for (transaction, transaction_info) in processed_transactions { + transaction_write_locks.clear(); + transaction_read_locks.clear(); + let mut has_contention = false; + for (key, writable) in + TransactionAccountLocksIterator::new(transaction_info).accounts_with_is_writable() + { + if writable { + transaction_write_locks.push(*key); + } else { + transaction_read_locks.push(*key); + } + + if !has_contention + && (aggregate_write_locks.contains(key) + || (writable && aggregate_read_locks.contains(key))) + { + has_contention = true; + } + } + if has_contention { result.push(std::mem::take(&mut current_batch)); aggregate_write_locks.clear(); aggregate_read_locks.clear(); } current_batch.push(transaction); - aggregate_write_locks.extend(write_account_locks.cloned()); - aggregate_read_locks.extend(read_account_locks.cloned()); + aggregate_write_locks.extend(transaction_write_locks.drain(..)); + aggregate_read_locks.extend(transaction_read_locks.drain(..)); } if !current_batch.is_empty() { diff --git a/core/src/banking_stage/transaction_scheduler/bam_receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/bam_receive_and_buffer.rs index 6371d549b5b..aa13a55d11e 100644 --- a/core/src/banking_stage/transaction_scheduler/bam_receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/bam_receive_and_buffer.rs @@ -34,7 +34,7 @@ use { itertools::Itertools, jito_protos::proto::bam_types::{ atomic_txn_batch_result, not_committed::Reason, AtomicTxnBatch, DeserializationErrorReason, - Packet, SchedulingError, + Packet, SchedulingError, TransactionErrorReason, }, solana_accounts_db::account_locks::validate_account_locks, solana_clock::{Slot, MAX_PROCESSING_AGE}, @@ -63,11 +63,8 @@ use { }, }; -type PrevalidationResult = Result<(AtomicTxnBatch, bool, u32, u64), (Reason, u32)>; -type PrevalidationOutput = (Vec, ReceivingStats); - +type PrevalidationResult = Result<(usize, bool, u32, u64), (Reason, u32)>; type VerifyResult = Result<(Vec, bool, u32, u64), (Reason, u32)>; -type VerifyOutput = (Vec, ReceivingStats); pub struct BamReceiveAndBuffer { bam_enabled: Arc, @@ -140,6 +137,9 @@ impl BamReceiveAndBuffer { let mut last_metrics_report = Instant::now(); let mut metrics = BamReceiveAndBufferMetrics::default(); let mut stats = ReceivingStats::default(); + let mut prevalidated = Vec::new(); + let mut packet_batches = Vec::new(); + let mut verify_results = Vec::new(); const METRICS_REPORT_INTERVAL: Duration = Duration::from_millis(20); let mut recv_buffer = Vec::with_capacity(ATOMIC_TXN_BATCH_BURST * 2); @@ -177,13 +177,19 @@ impl BamReceiveAndBuffer { .and_then(|leader_state| leader_state.load().working_bank().map(|b| b.slot())) .unwrap_or_else(|| bank_forks.read().unwrap().working_bank().slot()); - let ((verify_results, deserialize_stats), duration_us) = - measure_us!(Self::batch_verify(&recv_buffer, current_slot, &mut metrics)); + let (deserialize_stats, duration_us) = measure_us!(Self::batch_verify( + &recv_buffer, + current_slot, + &mut metrics, + &mut prevalidated, + &mut packet_batches, + &mut verify_results, + )); stats.accumulate(deserialize_stats); metrics.increment_total_us(duration_us); recv_buffer.clear(); - for result in verify_results { + for result in verify_results.drain(..) { match result { Ok((verified_batch, revert_on_error, seq_id, max_schedule_slot)) => { metrics @@ -304,7 +310,7 @@ impl BamReceiveAndBuffer { .is_active(&agave_feature_set::static_instruction_limit::ID); let mut cost: u64 = 0; - let mut txns_max_age = vec![]; + let mut txns_max_age = Vec::with_capacity(verified_batch.len()); // Checks are taken from receive_and_buffer.rs: // SanitizedTransactionReceiveAndBuffer::buffer_packets @@ -513,7 +519,7 @@ impl BamReceiveAndBuffer { Err(Reason::TransactionError( jito_protos::proto::bam_types::TransactionError { index: index as u32, - reason: DeserializationErrorReason::SanitizeError as i32, + reason: TransactionErrorReason::SanitizeFailure as i32, }, )), stats, @@ -572,12 +578,13 @@ impl BamReceiveAndBuffer { fn prevalidate_batches( atomic_txn_batches: &[AtomicTxnBatch], current_slot: Slot, - ) -> PrevalidationOutput { + prevalidated: &mut Vec, + ) -> ReceivingStats { let mut stats = ReceivingStats::default(); - let prevalidated = atomic_txn_batches - .iter() - .map(|atomic_txn_batch| { + prevalidated.clear(); + prevalidated.extend(atomic_txn_batches.iter().enumerate().map( + |(batch_index, atomic_txn_batch)| { if atomic_txn_batch.max_schedule_slot < current_slot { stats.num_dropped_without_parsing += 1; return Err(( @@ -636,22 +643,25 @@ impl BamReceiveAndBuffer { }; Ok(( - atomic_txn_batch.clone(), + batch_index, revert_on_error, atomic_txn_batch.seq_id, atomic_txn_batch.max_schedule_slot, )) - }) - .collect(); + }, + )); - (prevalidated, stats) + stats } fn batch_verify( atomic_txn_batches: &[AtomicTxnBatch], current_slot: Slot, metrics: &mut BamReceiveAndBufferMetrics, - ) -> VerifyOutput { + prevalidated: &mut Vec, + packet_batches: &mut Vec, + results: &mut Vec, + ) -> ReceivingStats { fn proto_packet_to_packet(from_packet: &Packet) -> BytesPacket { let copy_len = min(PACKET_DATA_SIZE, from_packet.data.len()); let mut to_packet = BytesPacket::new( @@ -716,15 +726,15 @@ impl BamReceiveAndBuffer { let mut stats = ReceivingStats::default(); - let (pre_validated, preverify_stats) = - Self::prevalidate_batches(atomic_txn_batches, current_slot); + let preverify_stats = + Self::prevalidate_batches(atomic_txn_batches, current_slot, prevalidated); stats.accumulate(preverify_stats); - let mut packet_batches: Vec = Vec::new(); + packet_batches.clear(); + packet_batches.reserve(prevalidated.len()); let mut packet_count = 0; - pre_validated.iter().flatten().for_each(|result| { - let solana_packet_batch: Vec<_> = result - .0 + prevalidated.iter().flatten().for_each(|result| { + let solana_packet_batch: Vec<_> = atomic_txn_batches[result.0] .packets .iter() .map(proto_packet_to_packet) @@ -736,7 +746,7 @@ impl BamReceiveAndBuffer { }); let mut verify_packet_batch_time_us = Measure::start("verify_packet_batch_time_us"); - ed25519_verify(&mut packet_batches, false, packet_count); + ed25519_verify(packet_batches, false, packet_count); verify_packet_batch_time_us.stop(); metrics @@ -749,25 +759,24 @@ impl BamReceiveAndBuffer { .sigverify_metrics .increment_total_verify_time(verify_packet_batch_time_us.as_us()); + results.clear(); + results.reserve(prevalidated.len()); let mut packet_batch_iter = packet_batches.iter(); - let results = pre_validated - .into_iter() - .map(|pre_result| { - pre_result.and_then(|(_, revert_on_error, seq_id, max_schedule_slot)| { - let batch = packet_batch_iter.next().unwrap(); - - let deserialized = batch - .iter() - .enumerate() - .map(|(i, pkt)| pkt_to_shared_bytes(&pkt, i, seq_id, metrics)) - .collect::, _>>()?; - - Ok((deserialized, revert_on_error, seq_id, max_schedule_slot)) - }) - }) - .collect(); + for pre_result in prevalidated.drain(..) { + let result = pre_result.and_then(|(_, revert_on_error, seq_id, max_schedule_slot)| { + let batch = packet_batch_iter.next().unwrap(); + let deserialized = batch + .iter() + .enumerate() + .map(|(i, pkt)| pkt_to_shared_bytes(&pkt, i, seq_id, metrics)) + .collect::, _>>()?; - (results, stats) + Ok((deserialized, revert_on_error, seq_id, max_schedule_slot)) + }); + results.push(result); + } + + stats } } @@ -805,7 +814,7 @@ impl ReceiveAndBuffer for BamReceiveAndBuffer { // If BAM is not enabled, drain the channel if !is_bam_enabled { - stats.num_dropped_without_parsing += stats.num_received; + stats.num_dropped_without_parsing += batch.txns_max_age.len(); continue; } @@ -1164,6 +1173,25 @@ mod tests { assert_eq!(actual_length, expected_length); } + fn run_batch_verify( + batches: &[AtomicTxnBatch], + current_slot: Slot, + metrics: &mut BamReceiveAndBufferMetrics, + ) -> (Vec, ReceivingStats) { + let mut prevalidated = Vec::new(); + let mut packet_batches = Vec::new(); + let mut results = Vec::new(); + let stats = BamReceiveAndBuffer::batch_verify( + batches, + current_slot, + metrics, + &mut prevalidated, + &mut packet_batches, + &mut results, + ); + (results, stats) + } + #[test_case(setup_bam_receive_and_buffer; "testcase-bam")] fn test_receive_and_buffer_simple_transfer( setup_receive_and_buffer: impl FnOnce( @@ -1261,8 +1289,7 @@ mod tests { }; let mut stats = BamReceiveAndBufferMetrics::default(); - let (results, _batch_stats) = - BamReceiveAndBuffer::batch_verify(&[bundle], Slot::MAX, &mut stats); + let (results, _batch_stats) = run_batch_verify(&[bundle], Slot::MAX, &mut stats); assert_eq!(results.len(), 1); assert!(results[0].is_ok()); @@ -1282,8 +1309,7 @@ mod tests { }; let mut stats = BamReceiveAndBufferMetrics::default(); - let (results, batch_stats) = - BamReceiveAndBuffer::batch_verify(&[batch], Slot::MAX, &mut stats); + let (results, batch_stats) = run_batch_verify(&[batch], Slot::MAX, &mut stats); assert_eq!(results.len(), 1); assert!(results[0].is_err()); @@ -1307,8 +1333,7 @@ mod tests { }; let mut stats = BamReceiveAndBufferMetrics::default(); - let (results, _batch_stats) = - BamReceiveAndBuffer::batch_verify(&[batch], Slot::MAX, &mut stats); + let (results, _batch_stats) = run_batch_verify(&[batch], Slot::MAX, &mut stats); assert_eq!(results.len(), 1); assert!(results[0].is_err()); @@ -1338,8 +1363,7 @@ mod tests { }; let mut stats = BamReceiveAndBufferMetrics::default(); - let (results, _batch_stats) = - BamReceiveAndBuffer::batch_verify(&[batch], Slot::MAX, &mut stats); + let (results, _batch_stats) = run_batch_verify(&[batch], Slot::MAX, &mut stats); assert_eq!(results.len(), 1); assert!(results[0].is_ok()); @@ -1399,8 +1423,7 @@ mod tests { }; let mut stats = BamReceiveAndBufferMetrics::default(); - let (results, batch_stats) = - BamReceiveAndBuffer::batch_verify(&[bundle], Slot::MAX, &mut stats); + let (results, batch_stats) = run_batch_verify(&[bundle], Slot::MAX, &mut stats); assert_eq!(results.len(), 1); assert!(results[0].is_err()); assert_eq!(batch_stats.num_dropped_without_parsing, 1); @@ -1432,8 +1455,7 @@ mod tests { }; let mut stats = BamReceiveAndBufferMetrics::default(); - let (results, _batch_stats) = - BamReceiveAndBuffer::batch_verify(&[batch], Slot::MAX, &mut stats); + let (results, _batch_stats) = run_batch_verify(&[batch], Slot::MAX, &mut stats); assert_eq!(results.len(), 1); assert!(results[0].is_ok()); @@ -1498,8 +1520,7 @@ mod tests { }; let mut stats = BamReceiveAndBufferMetrics::default(); - let (results, _batch_stats) = - BamReceiveAndBuffer::batch_verify(&[batch], Slot::MAX, &mut stats); + let (results, _batch_stats) = run_batch_verify(&[batch], Slot::MAX, &mut stats); assert_eq!(results.len(), 1); assert!(results[0].is_ok()); @@ -1544,8 +1565,7 @@ mod tests { }; let mut stats = BamReceiveAndBufferMetrics::default(); - let (results, _batch_stats) = - BamReceiveAndBuffer::batch_verify(&[batch], Slot::MAX, &mut stats); + let (results, _batch_stats) = run_batch_verify(&[batch], Slot::MAX, &mut stats); assert_eq!(results.len(), 1); assert!(results[0].is_err()); diff --git a/core/src/banking_stage/transaction_scheduler/bam_scheduler.rs b/core/src/banking_stage/transaction_scheduler/bam_scheduler.rs index e9d86fb30dd..b8ade71ba5a 100644 --- a/core/src/banking_stage/transaction_scheduler/bam_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/bam_scheduler.rs @@ -278,7 +278,7 @@ impl BamScheduler { }; } - // Schedulit + // Schedule it let mut work = self.get_or_create_work_object(); let batch_id = self.get_next_schedule_id(); *num_scheduled += batch_ids.len(); @@ -355,7 +355,7 @@ impl BamScheduler { .iter() .filter_map(|priority_id| container.get_batch(priority_id.id)) .flat_map(|(batch_ids, _, _)| batch_ids.into_iter()) - .cloned(), + .copied(), ); output.transactions.clear(); diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 9f5fbbc1a3e..73899c7bfd9 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -582,23 +582,26 @@ impl AdminRpc for AdminRpcImpl { fn set_bam_url(&self, meta: Self::Metadata, bam_url: Option) -> Result<()> { let old_bam_url = meta.bam_url.lock().unwrap().clone(); + let (bam_url, manual_disconnect) = match bam_url { + Some(url) if url.trim().is_empty() => (None, true), + Some(url) => (Some(url), false), + None => (None, false), + }; let new_bam_url = bam_url.as_ref().map(|url| url.to_string()); debug!("set_bam_url old= {old_bam_url:?}, new={new_bam_url:?}"); if let Some(new_bam_url) = &new_bam_url { - if !new_bam_url.is_empty() { - if let Err(e) = Endpoint::from_str(new_bam_url) { - return Err(jsonrpc_core::error::Error::invalid_params(format!( - "Could not create endpoint: {e}" - ))); - } - } else { - datapoint_info!( - "bam_manually_disconnected", - ("count", 1, i64), - ("previous_bam_url", old_bam_url.unwrap_or_default(), String) - ); + if let Err(e) = Endpoint::from_str(new_bam_url) { + return Err(jsonrpc_core::error::Error::invalid_params(format!( + "Could not create endpoint: {e}" + ))); } + } else if manual_disconnect { + datapoint_info!( + "bam_manually_disconnected", + ("count", 1, i64), + ("previous_bam_url", old_bam_url.unwrap_or_default(), String) + ); } *meta.bam_url.lock().unwrap() = bam_url; diff --git a/validator/src/commands/bam/mod.rs b/validator/src/commands/bam/mod.rs index 6fd382ebd26..99e77ceaa49 100644 --- a/validator/src/commands/bam/mod.rs +++ b/validator/src/commands/bam/mod.rs @@ -25,7 +25,7 @@ const DEFAULT_BAM_HTTPS_PORT: u16 = 50056; /// Normalize and validate BAM URL from a string input. /// /// Takes a potentially incomplete URL and normalizes it by adding default -/// scheme and port if missing. +/// scheme and port if missing. Empty input is handled by `extract_bam_url`. /// /// # Default values /// - HTTP URLs default to port 50055 @@ -35,11 +35,6 @@ const DEFAULT_BAM_HTTPS_PORT: u16 = 50056; /// # Errors /// Returns an error if the URL is invalid or uses an unsupported scheme. fn normalize_bam_url(url_str: &str) -> Result { - // If empty, return empty string to disable BAM - if url_str.trim().is_empty() { - return Ok(String::new()); - } - let url_str_to_parse = if url_str.contains("://") { url_str.into() } else { @@ -117,7 +112,12 @@ fn normalize_bam_url(url_str: &str) -> Result { /// ``` pub fn extract_bam_url(matches: &ArgMatches) -> Result, BamUrlError> { match matches.value_of("bam_url") { - Some(url) => normalize_bam_url(url).map(Some), + Some(url) => { + if url.trim().is_empty() { + return Ok(None); + } + normalize_bam_url(url).map(Some) + } None => Ok(None), } } @@ -138,6 +138,10 @@ pub fn command(_default_args: &DefaultArgs) -> App<'_, '_> { } pub fn execute(subcommand_matches: &ArgMatches, ledger_path: &Path) -> crate::commands::Result<()> { + if !subcommand_matches.is_present("bam_url") { + return Ok(()); + } + let bam_url = extract_bam_url(subcommand_matches) .map_err(|e| -> Box { Box::new(e) })?; let admin_client = admin_rpc_service::connect(ledger_path); @@ -245,16 +249,15 @@ mod tests { } // Empty inputs - #[test_case("", ""; "empty string")] - #[test_case(" ", ""; "spaces only")] - #[test_case("\t\n ", "" ; "whitespace only")] - fn test_extract_bam_url_empty_inputs(input: &str, expected: &str) { + #[test_case("", "empty string")] + #[test_case(" ", "spaces only")] + #[test_case("\t\n ", "whitespace only")] + fn test_extract_bam_url_empty_inputs(input: &str, _case: &str) { let matches = create_test_matches(Some(input)); let result = extract_bam_url(&matches); - assert_eq!( - result.unwrap(), - expected.to_string().into(), - "Failed for input: '{input}', expected: '{expected}'" + assert!( + result.unwrap().is_none(), + "Expected None for input: '{input}'" ); }