Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions binaries/cuprated/src/config/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ impl From<BlockDownloaderConfig> for cuprate_p2p::block_downloader::BlockDownloa
Self {
buffer_bytes: value.buffer_bytes,
in_progress_queue_bytes: value.in_progress_queue_bytes,
order_blocks: true,
stop_height: None,
check_client_pool_interval: value.check_client_pool_interval,
target_batch_bytes: value.target_batch_bytes,
initial_batch_len: 1,
Expand Down
17 changes: 12 additions & 5 deletions p2p/p2p/src/block_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ pub struct BlockDownloaderConfig {
pub buffer_bytes: usize,
/// The size of the in progress queue (in bytes) at which we stop requesting more blocks.
pub in_progress_queue_bytes: usize,
/// Whether blocks be ordered before being returned.
pub order_blocks: bool,
/// The height we should sync to, the block at this height will _not_ be synced.
///
/// If [`None`] we will sync all blocks we can find.
pub stop_height: Option<usize>,
/// The [`Duration`] between checking the client pool for free peers.
pub check_client_pool_interval: Duration,
/// The target size of a single batch of blocks (in bytes).
Expand Down Expand Up @@ -251,7 +257,7 @@ where
block_download_tasks: JoinSet::new(),
chain_entry_task: JoinSet::new(),
inflight_requests: BTreeMap::new(),
block_queue: BlockQueue::new(buffer_appender),
block_queue: BlockQueue::new(buffer_appender, config.order_blocks),
failed_batches: BinaryHeap::new(),
config,
}
Expand Down Expand Up @@ -450,6 +456,7 @@ where
) -> Option<ClientDropGuard<N>> {
// We send 2 requests, so if one of them is slow or doesn't have the next chain, we still have a backup.
if self.chain_entry_task.len() < 2
&& self.config.stop_height.is_none_or(|stop_height| chain_tracker.top_height() < stop_height)
// If we have had too many failures then assume the tip has been found so no more chain entries.
&& self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED
// Check we have a big buffer of pending block IDs to retrieve, we don't want to be waiting around
Expand Down Expand Up @@ -633,7 +640,7 @@ where
/// Starts the main loop of the block downloader.
async fn run(mut self) -> Result<(), BlockDownloadError> {
let mut chain_tracker =
initial_chain_search(&mut self.peer_set, &mut self.our_chain_svc).await?;
initial_chain_search(&mut self.peer_set, &mut self.our_chain_svc, self.config.stop_height).await?;

let mut pending_peers = BTreeMap::new();

Expand All @@ -652,7 +659,7 @@ where
self.check_for_free_clients(&mut chain_tracker, &mut pending_peers).await?;

// If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found.
if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
if self.inflight_requests.is_empty() && (self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED || self.config.stop_height.is_some_and(|h| chain_tracker.top_height() == h)) {
tracing::debug!("Failed to find any more chain entries, probably fround the top");
return Ok(());
}
Expand All @@ -666,15 +673,15 @@ where
self.handle_download_batch_res(start_height, result, &mut chain_tracker, &mut pending_peers).await?;

// If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found.
if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
if self.inflight_requests.is_empty() && (self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED || self.config.stop_height.is_some_and(|h| chain_tracker.top_height() == h )) {
tracing::debug!("Failed to find any more chain entries, probably fround the top");
return Ok(());
}
}
Some(Ok(res)) = self.chain_entry_task.join_next() => {
match res {
Ok((client, entry)) => {
match chain_tracker.add_entry(entry, &mut self.our_chain_svc).await {
match chain_tracker.add_entry(entry, &mut self.our_chain_svc, self.config.stop_height).await {
Ok(()) => {
tracing::debug!("Successfully added chain entry to chain tracker.");
self.amount_of_empty_chain_entries = 0;
Expand Down
91 changes: 87 additions & 4 deletions p2p/p2p/src/block_downloader/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,92 @@ impl Ord for ReadyQueueBatch {
}
}

/// The block queue that passes downloaded block batches to the receiver.
pub(crate) enum BlockQueue {
/// A queue that does order the batches.
Ordered(BlockQueueOrdered),
/// A queue that does not order the batches.
Unordered(BlockQueueUnordered),
}

impl BlockQueue {
/// Creates a new [`BlockQueue`].
pub(crate) const fn new(
buffer_appender: BufferAppender<BlockBatch>,
order_blocks: bool,
) -> Self {
if order_blocks {
Self::Ordered(BlockQueueOrdered::new(buffer_appender))
} else {
Self::Unordered(BlockQueueUnordered::new(buffer_appender))
}
}

/// Returns the oldest batch that has not been put in the [`async_buffer`] yet.
pub(crate) fn oldest_ready_batch(&self) -> Option<usize> {
match self {
Self::Ordered(q) => q.oldest_ready_batch(),
Self::Unordered(_) => None,
}
}

/// Returns the size of all the batches that have not been put into the [`async_buffer`] yet.
pub(crate) const fn size(&self) -> usize {
match self {
Self::Ordered(q) => q.size(),
Self::Unordered(_) => 0,
}
}

/// Adds an incoming batch to the queue and checks if we can push any batches into the [`async_buffer`].
///
/// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if
/// there are no batches inflight then this should be [`None`].
pub(crate) async fn add_incoming_batch(
&mut self,
new_batch: ReadyQueueBatch,
oldest_in_flight_start_height: Option<usize>,
) -> Result<(), BlockDownloadError> {
match self {
Self::Ordered(q) => {
q.add_incoming_batch(new_batch, oldest_in_flight_start_height)
.await
}
Self::Unordered(q) => q.add_incoming_batch(new_batch).await,
}
}
}

/// A block queue that does not order the batches before giving them to the receiver.
pub(crate) struct BlockQueueUnordered {
/// The [`BufferAppender`] that gives blocks to Cuprate.
buffer_appender: BufferAppender<BlockBatch>,
}

impl BlockQueueUnordered {
/// Creates a new [`BlockQueueUnordered`].
const fn new(buffer_appender: BufferAppender<BlockBatch>) -> Self {
Self { buffer_appender }
}

/// Pushes the batch into the [`async_buffer`].
pub(crate) async fn add_incoming_batch(
&mut self,
new_batch: ReadyQueueBatch,
) -> Result<(), BlockDownloadError> {
let size = new_batch.block_batch.size;
self.buffer_appender
.send(new_batch.block_batch, size)
.await
.map_err(|_| BlockDownloadError::BufferWasClosed)?;

Ok(())
}
}

/// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the
/// oldest batch has been downloaded.
pub(crate) struct BlockQueue {
pub(crate) struct BlockQueueOrdered {
/// A queue of ready batches.
ready_batches: BinaryHeap<ReadyQueueBatch>,
/// The size, in bytes, of all the batches in [`Self::ready_batches`].
Expand All @@ -53,8 +136,8 @@ pub(crate) struct BlockQueue {
buffer_appender: BufferAppender<BlockBatch>,
}

impl BlockQueue {
/// Creates a new [`BlockQueue`].
impl BlockQueueOrdered {
/// Creates a new [`BlockQueueOrdered`].
pub(crate) const fn new(buffer_appender: BufferAppender<BlockBatch>) -> Self {
Self {
ready_batches: BinaryHeap::new(),
Expand Down Expand Up @@ -146,7 +229,7 @@ mod tests {
block_on(async move {
let (buffer_tx, mut buffer_rx) = cuprate_async_buffer::new_buffer(usize::MAX);

let mut queue = BlockQueue::new(buffer_tx);
let mut queue = BlockQueueOrdered::new(buffer_tx);

let mut sorted_batches = BTreeSet::from_iter(batches.clone());
let mut soreted_batch_2 = sorted_batches.clone();
Expand Down
38 changes: 35 additions & 3 deletions p2p/p2p/src/block_downloader/chain_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{cmp::min, collections::VecDeque, mem};

use rand_distr::num_traits::Saturating;
use cuprate_fixed_bytes::ByteArrayVec;
use tower::{Service, ServiceExt};

Expand Down Expand Up @@ -76,15 +76,29 @@ pub(crate) struct ChainTracker<N: NetworkZone> {
impl<N: NetworkZone> ChainTracker<N> {
/// Creates a new chain tracker.
pub(crate) async fn new<C>(
new_entry: ChainEntry<N>,
mut new_entry: ChainEntry<N>,
first_height: usize,
our_genesis: [u8; 32],
previous_hash: [u8; 32],
our_chain_svc: &mut C,
stop_height: Option<usize>,
) -> Result<Self, ChainTrackerError>
where
C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
{
if let Some(stop_height) = stop_height {
let new_top_height = first_height + new_entry.ids.len();
if new_top_height >= stop_height {
new_entry
.ids
.truncate(stop_height.saturating_sub(first_height));
}

if new_entry.ids.is_empty() {
return Err(ChainTrackerError::NewEntryIsEmpty);
}
}

let top_seen_hash = *new_entry.ids.last().unwrap();
let mut entries = VecDeque::with_capacity(1);
entries.push_back(new_entry);
Expand Down Expand Up @@ -149,6 +163,7 @@ impl<N: NetworkZone> ChainTracker<N> {
&mut self,
mut chain_entry: ChainEntry<N>,
our_chain_svc: &mut C,
stop_height: Option<usize>,
) -> Result<(), ChainTrackerError>
where
C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
Expand All @@ -167,13 +182,30 @@ impl<N: NetworkZone> ChainTracker<N> {
return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
}

let new_entry = ChainEntry {
let mut new_entry = ChainEntry {
// ignore the first block - we already know it.
ids: chain_entry.ids.split_off(1),
peer: chain_entry.peer,
handle: chain_entry.handle,
};

if let Some(stop_height) = stop_height {
let new_top_height = self.top_height() + new_entry.ids.len();
if new_top_height >= stop_height {
new_entry
.ids
.truncate(stop_height.saturating_sub(self.top_height()));
}

if self.top_height() + new_entry.ids.len() > stop_height {
panic!()
}

if new_entry.ids.is_empty() {
return Err(ChainTrackerError::NewEntryIsEmpty);
}
}

self.top_seen_hash = *new_entry.ids.last().unwrap();

self.unknown_entries.push_back(new_entry);
Expand Down
2 changes: 2 additions & 0 deletions p2p/p2p/src/block_downloader/request_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
pub(super) async fn initial_chain_search<N: NetworkZone, C>(
peer_set: &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
mut our_chain_svc: C,
stop_height: Option<usize>
) -> Result<ChainTracker<N>, BlockDownloadError>
where
C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
Expand Down Expand Up @@ -220,6 +221,7 @@ where
our_genesis,
previous_id,
&mut our_chain_svc,
stop_height
)
.await
.map_err(|_| BlockDownloadError::ChainInvalid)?;
Expand Down
57 changes: 53 additions & 4 deletions p2p/p2p/src/block_downloader/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use monero_serai::{
transaction::{Input, Timelock, Transaction, TransactionPrefix},
};
use proptest::{collection::vec, prelude::*};
use proptest::sample::SizeRange;
use proptest::strategy::ValueTree;
use proptest::test_runner::TestRunner;
use tokio::{sync::mpsc, time::timeout};
use tower::{buffer::Buffer, service_fn, Service, ServiceExt};

Expand Down Expand Up @@ -43,9 +46,9 @@ proptest! {
timeout: 60 * 1000,
.. ProptestConfig::default()
})]

#[test]
fn test_block_downloader(blockchain in dummy_blockchain_stragtegy(), peers in 1_usize..128) {
fn test_block_downloader(blockchain in dummy_blockchain_stragtegy(1..50_000), peers in 1_usize..128) {
let blockchain = Arc::new(blockchain);

let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
Expand All @@ -70,6 +73,8 @@ proptest! {
BlockDownloaderConfig {
buffer_bytes: 1_000,
in_progress_queue_bytes: 10_000,
order_blocks: true,
stop_height: None,
check_client_pool_interval: Duration::from_secs(5),
target_batch_bytes: 5_000,
initial_batch_len: 1,
Expand All @@ -85,6 +90,50 @@ proptest! {
}).await
}).unwrap();
}

#[test]
fn test_block_downloader_unordered(blockchain in dummy_blockchain_stragtegy(15_000..20_000), peers in 1_usize..128) {
let blockchain = Arc::new(blockchain);

let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();

tokio_pool.block_on(async move {
timeout(Duration::from_secs(600), async move {
let (new_connection_tx, new_connection_rx) = mpsc::channel(peers);

let peer_set = PeerSet::new(new_connection_rx);

for _ in 0..peers {
let client = mock_block_downloader_client(Arc::clone(&blockchain));

new_connection_tx.try_send(client).unwrap();
}

let stream = download_blocks(
Buffer::new(peer_set, 10).boxed_clone(),
OurChainSvc {
genesis: *blockchain.blocks.first().unwrap().0
},
BlockDownloaderConfig {
buffer_bytes: 1_000,
in_progress_queue_bytes: 10_000,
order_blocks: false,
stop_height: Some(15_000),
check_client_pool_interval: Duration::from_secs(5),
target_batch_bytes: 5_000,
initial_batch_len: 1,
});

let blocks = stream.map(|blocks| blocks.blocks).concat().await;

assert_eq!(blocks.len(), 14_999);

for block in blocks.into_iter() {
assert!(block.0.number().unwrap() < 15_000);
}
}).await
}).unwrap();
}
}

prop_compose! {
Expand Down Expand Up @@ -148,8 +197,8 @@ impl Debug for MockBlockchain {

prop_compose! {
/// Returns a strategy to generate a [`MockBlockchain`].
fn dummy_blockchain_stragtegy()(
blocks in vec(dummy_block_stragtegy(0, [0; 32]), 1..50_000),
fn dummy_blockchain_stragtegy(size: impl Into<SizeRange>)(
blocks in vec(dummy_block_stragtegy(0, [0; 32]), size),
) -> MockBlockchain {
let mut blockchain = IndexMap::new();

Expand Down
Loading