Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/cargo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ jobs:

- uses: actions/checkout@v6
with:
submodules: 'recursive'
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate?

with:
submodules: 'recursive'
token: ${{ secrets.ACCESS_TOKEN }}

Expand Down
5 changes: 4 additions & 1 deletion bam-local-cluster/src/cluster_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,10 @@ impl BamLocalCluster {
solana_local_cluster::local_cluster::DEFAULT_MINT_LAMPORTS,
&vote_keypairs,
stakes,
ClusterType::MainnetBeta,
ClusterType::Development, // don't use mainnet, since we de-dupe local tvu ip
// addresses, every validator has TVU IP of 127.0.0.1

// see: https://github.com/jito-foundation/jito-solana/blob/ba3cfa5fe84ac1061427aa25e2a3e8e6bb7a5914/turbine/src/cluster_nodes.rs#L389-L392
);

let runtime = Runtime::new().expect("Could not create Tokio runtime");
Expand Down
4 changes: 2 additions & 2 deletions core/src/bam_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl BamManager {
let url = bam_url.lock().unwrap().clone();
if let Some(url) = url {
let result = runtime.block_on(BamConnection::try_init(
url,
url.clone(),
dependencies.cluster_info.clone(),
dependencies.batch_sender.clone(),
dependencies.outbound_receiver.clone(),
Expand Down Expand Up @@ -165,7 +165,7 @@ impl BamManager {
}
}
Err(e) => {
error!("Failed to connect to BAM: {e}");
error!("Failed to connect to BAM with url: {url}: {e}");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
return FinishedConsumeWorkExtraInfo {
processed_results: vec![
TransactionResult::NotCommitted(
NotCommittedReason::PohTimeout,
NotCommittedReason::PohTimeout, // Note: ChannelFull, ChannelDisconnected, MaxHeightReached are misreported as PohTimeout
);
work.transactions.len()
],
Expand Down
61 changes: 36 additions & 25 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,16 @@ pub struct Consumer {
struct SeqNotConflictBatchReusables {
aggregate_write_locks: AHashSet<Pubkey>,
aggregate_read_locks: AHashSet<Pubkey>,
transaction_write_locks: Vec<Pubkey>,
transaction_read_locks: Vec<Pubkey>,
}

impl SeqNotConflictBatchReusables {
pub fn clear(&mut self) {
self.aggregate_write_locks.clear();
self.aggregate_read_locks.clear();
self.transaction_write_locks.clear();
self.transaction_read_locks.clear();
}
}

Expand Down Expand Up @@ -528,7 +532,7 @@ impl Consumer {
.zip(batch.sanitized_transactions())
.filter_map(|(processing_result, tx)| {
if processing_result.was_processed() {
Some(tx.to_versioned_transaction())
Some((tx.to_versioned_transaction(), tx))
} else {
None
}
Expand All @@ -541,14 +545,13 @@ impl Consumer {
let mut reusables = self.seq_not_conflict_batch_reusables.take();

// Entries do **not** yet support conflicting transactions. To get around this we create
// a lists of transactions that are non-conflicting to shred out into entries. If we don't do
// this then blocks are rejected by consensus/replay.
// lists of transactions that are non-conflicting to shred out into entries. If we don't do
// this, then blocks are rejected by consensus/replay.
let batches = Self::create_sequential_non_conflicting_batches(
&mut reusables,
processed_transactions
.into_iter()
.zip(batch.sanitized_transactions().iter()),
processed_transactions.into_iter(),
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already zipped above

);
self.seq_not_conflict_batch_reusables.set(reusables);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't lose the reusables, otherwise we keep the empty containers

let hashes = batches
.iter()
.map(|batch| hash_transactions(batch))
Expand Down Expand Up @@ -652,41 +655,49 @@ impl Consumer {
}
}

fn create_sequential_non_conflicting_batches<'a>(
fn create_sequential_non_conflicting_batches<'a, T: TransactionWithMeta + 'a>(
reusables: &mut SeqNotConflictBatchReusables,
transactions: impl Iterator<Item = (VersionedTransaction, &'a (impl TransactionWithMeta + 'a))>,
processed_transactions: impl Iterator<Item = (VersionedTransaction, &'a T)>,
) -> Vec<Vec<VersionedTransaction>> {
let mut result = vec![];
let mut current_batch = vec![];
reusables.clear();
let SeqNotConflictBatchReusables {
aggregate_write_locks,
aggregate_read_locks,
transaction_write_locks,
transaction_read_locks,
} = reusables;

for (transaction, transaction_info) in transactions {
let account_keys = transaction_info.account_keys();
let write_account_locks = account_keys
Copy link
Copy Markdown
Collaborator Author

@esemeniuc esemeniuc Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old code does 3 iterations over all account keys for the txn

.iter()
.enumerate()
.filter_map(|(index, key)| transaction_info.is_writable(index).then_some(key));
let read_account_locks = account_keys
.iter()
.enumerate()
.filter_map(|(index, key)| (!transaction_info.is_writable(index)).then_some(key));
let has_contention = write_account_locks.clone().any(|key| {
Copy link
Copy Markdown
Collaborator Author

@esemeniuc esemeniuc Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a full iteration over all account keys

aggregate_write_locks.contains(key) || aggregate_read_locks.contains(key)
}) || read_account_locks
.clone()
.any(|key| aggregate_write_locks.contains(key));
for (transaction, transaction_info) in processed_transactions {
transaction_write_locks.clear();
transaction_read_locks.clear();
let mut has_contention = false;
for (key, writable) in
TransactionAccountLocksIterator::new(transaction_info).accounts_with_is_writable()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single iteration over all account keys

{
if writable {
transaction_write_locks.push(*key);
} else {
transaction_read_locks.push(*key);
}

if !has_contention
&& (aggregate_write_locks.contains(key)
|| (writable && aggregate_read_locks.contains(key)))
{
has_contention = true;
}
}

if has_contention {
result.push(std::mem::take(&mut current_batch));
aggregate_write_locks.clear();
aggregate_read_locks.clear();
}
current_batch.push(transaction);
aggregate_write_locks.extend(write_account_locks.cloned());
aggregate_read_locks.extend(read_account_locks.cloned());
aggregate_write_locks.extend(transaction_write_locks.drain(..));
aggregate_read_locks.extend(transaction_read_locks.drain(..));
}

if !current_batch.is_empty() {
Expand Down
Loading