diff --git a/CLI.md b/CLI.md index 335fa4d12883..a624ba7df658 100644 --- a/CLI.md +++ b/CLI.md @@ -184,6 +184,24 @@ Client implementation and command-line tool for the Linera blockchain * `--max-joined-tasks ` — Maximum number of tasks that can are joined concurrently in the client Default value: `100` +* `--max-in-flight-requests ` — Maximum concurrent requests per validator node + + Default value: `100` +* `--max-accepted-latency-ms ` — Maximum expected latency in milliseconds for score normalization + + Default value: `5000` +* `--cache-ttl-ms ` — Time-to-live for cached responses in milliseconds + + Default value: `2000` +* `--cache-max-size ` — Maximum number of entries in the cache + + Default value: `1000` +* `--max-request-ttl-ms ` — Maximum latency for an in-flight request before we stop deduplicating it (in milliseconds) + + Default value: `200` +* `--alpha ` — Smoothing factor for Exponential Moving Averages (0 < alpha < 1). Higher values give more weight to recent observations. Typical values are between 0.01 and 0.5. A value of 0.1 means that 10% of the new observation is considered and 90% of the previous average is retained + + Default value: `0.1` * `--storage ` — Storage configuration for the blockchain history * `--storage-max-concurrent-queries ` — The maximal number of simultaneous queries to the database * `--storage-max-stream-queries ` — The maximal number of simultaneous stream queries to the database diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index 2af0bbcd5a92..a5b85e35e464 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -291,6 +291,7 @@ where options.chain_worker_ttl, options.sender_chain_worker_ttl, options.to_chain_client_options(), + options.to_requests_scheduler_config(), ); #[cfg(not(web))] @@ -351,6 +352,7 @@ where cross_chain_message_delivery: CrossChainMessageDelivery::Blocking, ..ChainClientOptions::test_default() }, + linera_core::client::RequestsSchedulerConfig::default(), ); ClientContext { diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index d93e04d15cde..07da4986ba11 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -186,6 +186,58 @@ pub struct ClientContextOptions { /// Maximum number of tasks that can are joined concurrently in the client. #[arg(long, default_value = "100")] pub max_joined_tasks: usize, + + /// Maximum concurrent requests per validator node. + #[arg( + long, + default_value_t = linera_core::client::requests_scheduler::MAX_IN_FLIGHT_REQUESTS, + env = "LINERA_REQUESTS_SCHEDULER_MAX_IN_FLIGHT_REQUESTS" + )] + pub max_in_flight_requests: usize, + + /// Maximum expected latency in milliseconds for score normalization. + #[arg( + long, + default_value_t = linera_core::client::requests_scheduler::MAX_ACCEPTED_LATENCY_MS, + env = "LINERA_REQUESTS_SCHEDULER_MAX_ACCEPTED_LATENCY_MS" + )] + pub max_accepted_latency_ms: f64, + + /// Time-to-live for cached responses in milliseconds. + #[arg( + long, + default_value_t = linera_core::client::requests_scheduler::CACHE_TTL_MS, + env = "LINERA_REQUESTS_SCHEDULER_CACHE_TTL_MS" + )] + pub cache_ttl_ms: u64, + + /// Maximum number of entries in the cache. + #[arg( + long, + default_value_t = linera_core::client::requests_scheduler::CACHE_MAX_SIZE, + env = "LINERA_REQUESTS_SCHEDULER_CACHE_MAX_SIZE" + )] + pub cache_max_size: usize, + + /// Maximum latency for an in-flight request before we stop deduplicating it (in milliseconds). + #[arg( + long, + default_value_t = linera_core::client::requests_scheduler::MAX_REQUEST_TTL_MS, + env = "LINERA_REQUESTS_SCHEDULER_MAX_REQUEST_TTL_MS" + )] + pub max_request_ttl_ms: u64, + + /// Smoothing factor for Exponential Moving Averages (0 < alpha < 1). + /// Higher values give more weight to recent observations. + /// Typical values are between 0.01 and 0.5. + /// A value of 0.1 means that 10% of the new observation is considered + /// and 90% of the previous average is retained. + #[arg( + long, + default_value_t = linera_core::client::requests_scheduler::ALPHA_SMOOTHING_FACTOR, + env = "LINERA_REQUESTS_SCHEDULER_ALPHA" + )] + pub alpha: f64, } impl ClientContextOptions { @@ -218,6 +270,20 @@ impl ClientContextOptions { report_interval_secs: self.timing_interval, } } + + /// Creates [`RequestsSchedulerConfig`] with the corresponding values. + pub(crate) fn to_requests_scheduler_config( + &self, + ) -> linera_core::client::RequestsSchedulerConfig { + linera_core::client::RequestsSchedulerConfig { + max_in_flight_requests: self.max_in_flight_requests, + max_accepted_latency_ms: self.max_accepted_latency_ms, + cache_ttl_ms: self.cache_ttl_ms, + cache_max_size: self.cache_max_size, + max_request_ttl_ms: self.max_request_ttl_ms, + alpha: self.alpha, + } + } } #[derive(Debug, Clone, clap::Args)] diff --git a/linera-client/src/unit_tests/chain_listener.rs b/linera-client/src/unit_tests/chain_listener.rs index 28f89441f824..a64501681970 100644 --- a/linera-client/src/unit_tests/chain_listener.rs +++ b/linera-client/src/unit_tests/chain_listener.rs @@ -119,6 +119,7 @@ async fn test_chain_listener() -> anyhow::Result<()> { Duration::from_secs(30), Duration::from_secs(1), ChainClientOptions::test_default(), + linera_core::client::RequestsSchedulerConfig::default(), )), }; context @@ -204,6 +205,7 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> { Duration::from_secs(30), Duration::from_secs(1), ChainClientOptions::test_default(), + linera_core::client::RequestsSchedulerConfig::default(), )), }; let context = Arc::new(Mutex::new(context)); diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index f271088ebfbf..d413fc551734 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -86,6 +86,9 @@ mod chain_client_state; #[cfg(test)] #[path = "../unit_tests/client_tests.rs"] mod client_tests; +pub mod requests_scheduler; + +pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights}; mod received_log; mod validator_trackers; @@ -149,6 +152,8 @@ pub struct Client { /// Local node to manage the execution state and the local storage of the chains that we are /// tracking. local_node: LocalNodeClient, + /// Manages the requests sent to validator nodes. + requests_scheduler: RequestsScheduler, /// The admin chain ID. admin_id: ChainId, /// Chains that should be tracked by the client. @@ -175,6 +180,7 @@ impl Client { chain_worker_ttl: Duration, sender_chain_worker_ttl: Duration, options: ChainClientOptions, + requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig, ) -> Self { let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect())); let state = WorkerState::new_for_client( @@ -188,10 +194,12 @@ impl Client { .with_chain_worker_ttl(chain_worker_ttl) .with_sender_chain_worker_ttl(sender_chain_worker_ttl); let local_node = LocalNodeClient::new(state); + let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config); Self { environment, local_node, + requests_scheduler, chains: papaya::HashMap::new(), admin_id, tracked_chains, @@ -347,8 +355,10 @@ impl Client { .checked_sub(u64::from(next_height)) .ok_or(ArithmeticError::Overflow)? .min(self.options.certificate_download_batch_size); - let certificates = remote_node - .query_certificates_from(chain_id, next_height, limit) + + let certificates = self + .requests_scheduler + .download_certificates(remote_node, chain_id, next_height, limit) .await?; let Some(info) = self.process_certificates(remote_node, certificates).await? else { break; @@ -362,20 +372,17 @@ impl Client { async fn download_blobs( &self, - remote_node: &RemoteNode, - blob_ids: impl IntoIterator, + remote_nodes: &[RemoteNode], + blob_ids: &[BlobId], ) -> Result<(), ChainClientError> { - self.local_node - .store_blobs( - &futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move { - remote_node.try_download_blob(blob_id).await.unwrap() - })) - .buffer_unordered(self.options.max_joined_tasks) - .collect::>() - .await, - ) - .await - .map_err(Into::into) + let blobs = &self + .requests_scheduler + .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout) + .await? + .ok_or_else(|| { + ChainClientError::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec())) + })?; + self.local_node.store_blobs(blobs).await.map_err(Into::into) } /// Tries to process all the certificates, requesting any missing blobs from the given node. @@ -383,7 +390,7 @@ impl Client { #[instrument(level = "trace", skip_all)] async fn process_certificates( &self, - remote_node: &RemoteNode, + remote_node: &RemoteNode, certificates: Vec, ) -> Result>, ChainClientError> { let mut info = None; @@ -398,7 +405,8 @@ impl Client { .await { Err(LocalNodeError::BlobsNotFound(blob_ids)) => { - self.download_blobs(remote_node, blob_ids).await?; + self.download_blobs(&[remote_node.clone()], &blob_ids) + .await?; } x => { x?; @@ -409,7 +417,8 @@ impl Client { info = Some( match self.handle_certificate(certificate.clone()).await { Err(LocalNodeError::BlobsNotFound(blob_ids)) => { - self.download_blobs(remote_node, blob_ids).await?; + self.download_blobs(&[remote_node.clone()], &blob_ids) + .await?; self.handle_certificate(certificate).await? } x => x?, @@ -663,7 +672,6 @@ impl Client { ) -> Result<(), ChainClientError> { let certificate = Box::new(certificate); let block = certificate.block(); - // Recover history from the network. self.download_certificates(block.header.chain_id, block.header.height) .await?; @@ -672,14 +680,9 @@ impl Client { if let Err(err) = self.process_certificate(certificate.clone()).await { match &err { LocalNodeError::BlobsNotFound(blob_ids) => { - let blobs = RemoteNode::download_blobs( - blob_ids, - &self.validator_nodes().await?, - self.options.blob_download_timeout, - ) - .await - .ok_or(err)?; - self.local_node.store_blobs(&blobs).await?; + self.download_blobs(&self.validator_nodes().await?, blob_ids) + .await + .map_err(|_| err)?; self.process_certificate(certificate).await?; } _ => { @@ -716,14 +719,7 @@ impl Client { if let Err(err) = self.handle_certificate(certificate.clone()).await { match &err { LocalNodeError::BlobsNotFound(blob_ids) => { - let blobs = RemoteNode::download_blobs( - blob_ids, - &nodes, - self.options.blob_download_timeout, - ) - .await - .ok_or(err)?; - self.local_node.store_blobs(&blobs).await?; + self.download_blobs(&nodes, blob_ids).await?; self.handle_certificate(certificate.clone()).await?; } _ => { @@ -777,8 +773,13 @@ impl Client { // anything from the validator - let the function try the other validators return Err(()); } - let certificates = remote_node - .download_certificates_by_heights(sender_chain_id, remote_heights) + let certificates = self + .requests_scheduler + .download_certificates_by_heights( + &remote_node, + sender_chain_id, + remote_heights, + ) .await .map_err(|_| ())?; let mut certificates_with_check_results = vec![]; @@ -934,8 +935,13 @@ impl Client { // Stop if we've reached the height we've already processed. while current_height >= next_outbox_height { // Download the certificate for this height. - let downloaded = remote_node - .download_certificates_by_heights(sender_chain_id, vec![current_height]) + let downloaded = self + .requests_scheduler + .download_certificates_by_heights( + remote_node, + sender_chain_id, + vec![current_height], + ) .await?; let Some(certificate) = downloaded.into_iter().next() else { return Err(ChainClientError::CannotDownloadMissingSenderBlock { @@ -1119,9 +1125,9 @@ impl Client { if !required_blob_ids.is_empty() { let mut blobs = Vec::new(); for blob_id in required_blob_ids { - let blob_content = match remote_node - .node - .download_pending_blob(chain_id, blob_id) + let blob_content = match self + .requests_scheduler + .download_pending_blob(remote_node, chain_id, blob_id) .await { Ok(content) => content, @@ -1217,9 +1223,9 @@ impl Client { Err(LocalNodeError::BlobsNotFound(blob_ids)) => { let mut blobs = Vec::new(); for blob_id in blob_ids { - let blob_content = remote_node - .node - .download_pending_blob(chain_id, blob_id) + let blob_content = self + .requests_scheduler + .download_pending_blob(remote_node, chain_id, blob_id) .await?; blobs.push(Blob::new(blob_content)); } @@ -1248,7 +1254,10 @@ impl Client { communicate_concurrently( remote_nodes, async move |remote_node| { - let certificate = remote_node.download_certificate_for_blob(blob_id).await?; + let certificate = self + .requests_scheduler + .download_certificate_for_blob(&remote_node, blob_id) + .await?; self.receive_sender_certificate( certificate, ReceiveCertificateMode::NeedsCheck, @@ -4123,7 +4132,7 @@ impl ChainClient { } /// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on -/// each subsequent node. Returns error `err` is all of the nodes fail. +/// each subsequent node. Returns error `err` if all of the nodes fail. async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>( nodes: &[RemoteNode], f: F, diff --git a/linera-core/src/client/requests_scheduler/cache.rs b/linera-core/src/client/requests_scheduler/cache.rs new file mode 100644 index 000000000000..8798008eb818 --- /dev/null +++ b/linera-core/src/client/requests_scheduler/cache.rs @@ -0,0 +1,387 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashMap, sync::Arc}; + +use linera_base::time::{Duration, Instant}; + +#[cfg(with_metrics)] +use super::scheduler::metrics; + +/// Cached result entry with timestamp for TTL expiration +#[derive(Debug, Clone)] +pub(super) struct CacheEntry { + result: Arc, + cached_at: Instant, +} + +/// Cache for request results with TTL-based expiration and LRU eviction. +/// +/// This cache supports: +/// - Exact match lookups +/// - Subsumption-based lookups (larger requests can satisfy smaller ones) +/// - TTL-based expiration +/// - LRU eviction +#[derive(Debug, Clone)] +pub(super) struct RequestsCache { + /// Cache of recently completed requests with their results and timestamps. + /// Used to avoid re-executing requests for the same data within the TTL window. + cache: Arc>>>, + /// Time-to-live for cached entries. Entries older than this duration are considered expired. + cache_ttl: Duration, + /// Maximum number of entries to store in the cache. When exceeded, oldest entries are evicted (LRU). + max_cache_size: usize, +} + +impl RequestsCache +where + K: Eq + std::hash::Hash + std::fmt::Debug + Clone + SubsumingKey, + R: Clone + std::fmt::Debug, +{ + /// Creates a new `RequestsCache` with the specified TTL and maximum size. + /// + /// # Arguments + /// - `cache_ttl`: Time-to-live for cached entries + /// - `max_cache_size`: Maximum number of entries in the cache + pub(super) fn new(cache_ttl: Duration, max_cache_size: usize) -> Self { + Self { + cache: Arc::new(tokio::sync::RwLock::new(HashMap::new())), + cache_ttl, + max_cache_size, + } + } + + /// Attempts to retrieve a cached result for the given key. + /// + /// This method performs both exact match lookups and subsumption-based lookups. + /// If a larger request that contains all the data needed by this request is cached, + /// we can extract the subset result instead of making a new request. + /// + /// # Returns + /// - `Some(T)` if a cached result is found (either exact or subsumed) + /// - `None` if no suitable cached result exists + pub(super) async fn get(&self, key: &K) -> Option + where + T: TryFrom, + { + let cache = self.cache.read().await; + + // Check cache for exact match first + if let Some(entry) = cache.get(key) { + tracing::trace!( + key = ?key, + "cache hit (exact match) - returning cached result" + ); + #[cfg(with_metrics)] + metrics::REQUEST_CACHE_HIT.inc(); + return T::try_from((*entry.result).clone()).ok(); + } + + // Check cache for subsuming requests + for (cached_key, entry) in cache.iter() { + if cached_key.subsumes(key) { + if let Some(extracted) = key.try_extract_result(cached_key, &entry.result) { + tracing::trace!( + key = ?key, + "cache hit (subsumption) - extracted result from larger cached request" + ); + #[cfg(with_metrics)] + metrics::REQUEST_CACHE_HIT.inc(); + return T::try_from(extracted).ok(); + } + } + } + + None + } + + /// Stores a result in the cache with LRU eviction if cache is full. + /// + /// If the cache is at capacity, this method removes the oldest expired entries first. + /// Entries are considered "oldest" based on their cached_at timestamp. + /// + /// # Arguments + /// - `key`: The request key to cache + /// - `result`: The result to cache + pub(super) async fn store(&self, key: K, result: Arc) { + self.evict_expired_entries().await; // Clean up expired entries first + let mut cache = self.cache.write().await; + // Insert new entry + cache.insert( + key.clone(), + CacheEntry { + result, + cached_at: Instant::now(), + }, + ); + tracing::trace!( + key = ?key, + "stored result in cache" + ); + } + + /// Removes all cache entries that are older than the configured cache TTL. + /// + /// This method scans the cache and removes entries where the time elapsed since + /// `cached_at` exceeds `cache_ttl`. It's useful for explicitly cleaning up stale + /// cache entries rather than relying on lazy expiration checks. + /// + /// # Returns + /// The number of entries that were evicted + async fn evict_expired_entries(&self) -> usize { + let mut cache = self.cache.write().await; + let now = Instant::now(); + // Not strictly smaller b/c we want to add a new entry after eviction. + if cache.len() < self.max_cache_size { + return 0; // No need to evict if under max size + } + let mut expired_keys = 0usize; + + cache.retain(|_key, entry| { + if now.duration_since(entry.cached_at) > self.cache_ttl { + expired_keys += 1; + false + } else { + true + } + }); + + if expired_keys > 0 { + tracing::trace!(count = expired_keys, "evicted expired cache entries"); + } + + expired_keys + } +} + +/// Trait for request keys that support subsumption-based matching and result extraction. +pub(super) trait SubsumingKey { + /// Checks if this request fully subsumes another request. + /// + /// Request `self` subsumes request `other` if `self`'s result would contain all the data that + /// `other`'s result would contain. This means `other`'s request is redundant if `self` is already + /// in-flight or cached. + fn subsumes(&self, other: &Self) -> bool; + + /// Attempts to extract a subset result for this request from a larger request's result. + /// + /// This is used when a request A subsumes this request B. We can extract B's result + /// from A's result by filtering the certificates to only those requested by B. + /// + /// # Arguments + /// - `from`: The key of the larger request that subsumes this one + /// - `result`: The result from the larger request + /// + /// # Returns + /// - `Some(result)` with the extracted subset if possible + /// - `None` if extraction is not possible (wrong variant, different chain, etc.) + fn try_extract_result(&self, from: &Self, result: &R) -> Option; +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use linera_base::time::Duration; + + use super::*; + + // Mock key type for testing: represents a range request [start, end] + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct RangeKey { + start: u64, + end: u64, + } + + // Mock result type: vector of values in the range + #[derive(Debug, Clone, PartialEq)] + struct RangeResult(Vec); + + impl SubsumingKey for RangeKey { + fn subsumes(&self, other: &Self) -> bool { + // This range subsumes another if it contains the other's range + self.start <= other.start && self.end >= other.end + } + + fn try_extract_result(&self, from: &Self, result: &RangeResult) -> Option { + if !from.subsumes(self) { + return None; + } + // Extract values that fall within our range + let filtered: Vec = result + .0 + .iter() + .filter(|&&v| v >= self.start && v <= self.end) + .copied() + .collect(); + Some(RangeResult(filtered)) + } + } + + #[tokio::test] + async fn test_cache_miss_on_empty_cache() { + let cache: RequestsCache = + RequestsCache::new(Duration::from_secs(60), 10); + let key = RangeKey { start: 0, end: 5 }; + let result: Option = cache.get(&key).await; + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_exact_match_hit() { + let cache = RequestsCache::new(Duration::from_secs(60), 10); + let key = RangeKey { start: 0, end: 5 }; + let result = RangeResult(vec![0, 1, 2, 3, 4, 5]); + + cache.store(key.clone(), Arc::new(result.clone())).await; + let retrieved: Option = cache.get(&key).await; + + assert_eq!(retrieved, Some(result)); + } + + #[tokio::test] + async fn test_exact_match_takes_priority_over_subsumption() { + let cache = RequestsCache::new(Duration::from_secs(60), 10); + + // Store a larger range + let large_key = RangeKey { start: 0, end: 10 }; + let large_result = RangeResult(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + cache + .store(large_key.clone(), Arc::new(large_result.clone())) + .await; + + // Store an exact match + let exact_key = RangeKey { start: 2, end: 5 }; + let exact_result = RangeResult(vec![2, 3, 4, 5]); + cache + .store(exact_key.clone(), Arc::new(exact_result.clone())) + .await; + + // Should get exact match, not extracted from larger range + let retrieved: Option = cache.get(&exact_key).await; + assert_eq!(retrieved, Some(exact_result)); + } + + #[tokio::test] + async fn test_subsumption_hit() { + let cache = RequestsCache::new(Duration::from_secs(60), 10); + + // Store a larger range + let large_key = RangeKey { start: 0, end: 10 }; + let large_result = RangeResult(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + cache.store(large_key, Arc::new(large_result.clone())).await; + + // Request a subset + let subset_key = RangeKey { start: 3, end: 7 }; + let retrieved: Option = cache.get(&subset_key).await; + + assert_eq!(retrieved, Some(RangeResult(vec![3, 4, 5, 6, 7]))); + } + + #[tokio::test] + async fn test_subsumption_miss_when_no_overlap() { + let cache = RequestsCache::new(Duration::from_secs(60), 10); + + let key1 = RangeKey { start: 0, end: 5 }; + let result1 = RangeResult(vec![0, 1, 2, 3, 4, 5]); + cache.store(key1, Arc::new(result1)).await; + + // Non-overlapping range + let key2 = RangeKey { start: 10, end: 15 }; + let retrieved: Option = cache.get(&key2).await; + + assert!(retrieved.is_none()); + } + + #[tokio::test] + async fn test_eviction_when_exceeding_max_size() { + let cache_size = 3u64; + let cache = RequestsCache::new(Duration::from_millis(50), cache_size as usize); + + // Fill cache to max size + for i in 0..cache_size { + let key = RangeKey { + start: i * 10, + end: i * 10, + }; + cache.store(key, Arc::new(RangeResult(vec![i * 10]))).await; + // Small delay to ensure different timestamps + tokio::time::sleep(tokio::time::Duration::from_millis(5)).await; + } + + // Wait for first entry to expire + tokio::time::sleep(tokio::time::Duration::from_millis(60)).await; + + // Cache is now at max size (3) with expired entries, so next store triggers eviction. + let key_4 = RangeKey { + start: 100, + end: 100, + }; + cache + .store(key_4.clone(), Arc::new(RangeResult(vec![100]))) + .await; + + let cache_guard = cache.cache.read().await; + // Expired entries should have been evicted + let first_key = RangeKey { start: 0, end: 5 }; + assert!(!cache_guard.contains_key(&first_key)); + + // Latest entries should still be there + assert!(cache_guard.contains_key(&key_4)); + } + #[tokio::test] + async fn test_subsumption_with_extraction_failure_tries_next() { + // Mock key that subsumes but extraction returns None + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct FailingKey { + id: u64, + always_fail_extraction: bool, + } + + #[derive(Debug, Clone, PartialEq)] + struct SimpleResult(u64); + + impl SubsumingKey for FailingKey { + fn subsumes(&self, other: &Self) -> bool { + self.id >= other.id + } + + fn try_extract_result( + &self, + from: &Self, + _result: &SimpleResult, + ) -> Option { + if from.always_fail_extraction { + None + } else { + Some(SimpleResult(self.id)) + } + } + } + + let cache = RequestsCache::::new(Duration::from_secs(60), 10); + + // Store entry that subsumes but fails extraction + let failing_key = FailingKey { + id: 10, + always_fail_extraction: true, + }; + cache.store(failing_key, Arc::new(SimpleResult(10))).await; + + // Store entry that subsumes and succeeds extraction + let working_key = FailingKey { + id: 20, + always_fail_extraction: false, + }; + cache.store(working_key, Arc::new(SimpleResult(20))).await; + + // Request should find the working one + let target_key = FailingKey { + id: 5, + always_fail_extraction: false, + }; + let retrieved: Option = cache.get(&target_key).await; + + assert!(retrieved.is_some()); + } +} diff --git a/linera-core/src/client/requests_scheduler/in_flight_tracker.rs b/linera-core/src/client/requests_scheduler/in_flight_tracker.rs new file mode 100644 index 000000000000..33786b3ee14a --- /dev/null +++ b/linera-core/src/client/requests_scheduler/in_flight_tracker.rs @@ -0,0 +1,206 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashMap, fmt::Debug, sync::Arc}; + +use linera_base::time::{Duration, Instant}; +use tokio::sync::broadcast; + +use super::{ + cache::SubsumingKey, + request::{RequestKey, RequestResult}, +}; +use crate::node::NodeError; + +/// Tracks in-flight requests to deduplicate concurrent requests for the same data. +/// +/// This structure manages a map of request keys to in-flight entries, each containing +/// broadcast senders for notifying waiters when a request completes, as well as timing +/// information and alternative data sources. +#[derive(Debug, Clone)] +pub(super) struct InFlightTracker { + /// Maps request keys to in-flight entries containing broadcast senders and metadata + entries: Arc>>>, + /// Maximum duration before an in-flight request is considered stale and deduplication is skipped + timeout: Duration, +} + +impl InFlightTracker { + /// Creates a new `InFlightTracker` with the specified timeout. + /// + /// # Arguments + /// - `timeout`: Maximum duration before an in-flight request is considered too old to deduplicate against + pub(super) fn new(timeout: Duration) -> Self { + Self { + entries: Arc::new(tokio::sync::RwLock::new(HashMap::new())), + timeout, + } + } + + /// Attempts to subscribe to an existing in-flight request (exact or subsuming match). + /// + /// Searches for either an exact key match or a subsuming request (whose result would + /// contain all the data needed by this request). Returns information about which type + /// of match was found, along with subscription details. + /// + /// # Arguments + /// - `key`: The request key to look up + /// + /// # Returns + /// - `None`: No matching in-flight request found. Also returned if the found request is stale (exceeds timeout). + /// - `Some(InFlightMatch::Subsuming { key, outcome })`: Subsuming request found + pub(super) async fn try_subscribe(&self, key: &RequestKey) -> Option { + let in_flight = self.entries.read().await; + + if let Some(entry) = in_flight.get(key) { + let elapsed = Instant::now().duration_since(entry.started_at); + + if elapsed <= self.timeout { + return Some(InFlightMatch::Exact(Subscribed(entry.sender.subscribe()))); + } + } + + // Sometimes a request key may not have the exact match but may be subsumed by a larger one. + for (in_flight_key, entry) in in_flight.iter() { + if in_flight_key.subsumes(key) { + let elapsed = Instant::now().duration_since(entry.started_at); + + if elapsed <= self.timeout { + return Some(InFlightMatch::Subsuming { + key: in_flight_key.clone(), + outcome: Subscribed(entry.sender.subscribe()), + }); + } + } + } + + None + } + + /// Inserts a new in-flight request entry. + /// + /// Creates a new broadcast channel and in-flight entry for the given key, + /// marking the start time as now. + /// + /// # Arguments + /// - `key`: The request key to insert + pub(super) async fn insert_new(&self, key: RequestKey) { + let (sender, _receiver) = broadcast::channel(1); + let mut in_flight = self.entries.write().await; + + in_flight.insert( + key, + InFlightEntry { + sender, + started_at: Instant::now(), + alternative_peers: Arc::new(tokio::sync::RwLock::new(Vec::new())), + }, + ); + } + + /// Completes an in-flight request by removing it and broadcasting the result. + /// + /// Removes the entry for the given key and broadcasts the result to all waiting + /// subscribers. Logs the number of waiters that received the notification. + /// + /// # Arguments + /// - `key`: The request key to complete + /// - `result`: The result to broadcast to waiters + pub(super) async fn complete_and_broadcast( + &self, + key: &RequestKey, + result: Arc>, + ) -> usize { + let mut in_flight = self.entries.write().await; + + if let Some(entry) = in_flight.remove(key) { + let waiter_count = entry.sender.receiver_count(); + tracing::trace!( + key = ?key, + waiters = waiter_count, + "request completed; broadcasting result to waiters", + ); + if waiter_count != 0 { + if let Err(err) = entry.sender.send(result) { + tracing::warn!( + key = ?key, + error = ?err, + "failed to broadcast result to waiters" + ); + } + } + return waiter_count; + } + 0 + } + + /// Registers an alternative peer for an in-flight request. + /// + /// If an entry exists for the given key, registers the peer as an alternative source + /// (if not already registered). + /// + /// # Arguments + /// - `key`: The request key + /// - `peer`: The peer to register as an alternative + pub(super) async fn add_alternative_peer(&self, key: &RequestKey, peer: N) + where + N: PartialEq + Eq, + { + if let Some(entry) = self.entries.read().await.get(key) { + // Register this peer as an alternative source if not already present + { + let mut alt_peers = entry.alternative_peers.write().await; + if !alt_peers.contains(&peer) { + alt_peers.push(peer); + } + } + } + } + + /// Retrieves the list of alternative peers registered for an in-flight request. + /// + /// Returns a clone of the alternative peers list if an entry exists for the given key. + /// + /// # Arguments + /// - `key`: The request key to look up + /// + /// # Returns + /// - `Vec`: List of alternative peers (empty if no entry exists) + pub(super) async fn get_alternative_peers(&self, key: &RequestKey) -> Option> { + let in_flight = self.entries.read().await; + + let entry = in_flight.get(key)?; + let peers = entry.alternative_peers.read().await; + Some(peers.clone()) + } +} + +/// Type of in-flight request match found. +#[derive(Debug)] +pub(super) enum InFlightMatch { + /// Exact key match found + Exact(Subscribed), + /// Subsuming key match found (larger request that contains this request) + Subsuming { + /// The key of the subsuming request + key: RequestKey, + /// Outcome of attempting to subscribe + outcome: Subscribed, + }, +} + +/// Outcome of attempting to subscribe to an in-flight request. +/// Successfully subscribed; receiver will be notified when request completes +#[derive(Debug)] +pub(super) struct Subscribed(pub(super) broadcast::Receiver>>); + +/// In-flight request entry that tracks when the request was initiated. +#[derive(Debug)] +pub(super) struct InFlightEntry { + /// Broadcast sender for notifying waiters when the request completes + sender: broadcast::Sender>>, + /// Time when this request was initiated + started_at: Instant, + /// Alternative peers that can provide this data if the primary request fails + alternative_peers: Arc>>, +} diff --git a/linera-core/src/client/requests_scheduler/mod.rs b/linera-core/src/client/requests_scheduler/mod.rs new file mode 100644 index 000000000000..67cb024a292d --- /dev/null +++ b/linera-core/src/client/requests_scheduler/mod.rs @@ -0,0 +1,53 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! This module manages communication with validator nodes, including connection pooling, +//! load balancing, request deduplication, caching, and performance tracking. + +mod cache; +mod in_flight_tracker; +mod node_info; +mod request; +mod scheduler; +mod scoring; + +pub use scheduler::RequestsScheduler; +pub use scoring::ScoringWeights; + +// Module constants - default values for RequestsSchedulerConfig +pub const MAX_IN_FLIGHT_REQUESTS: usize = 100; +pub const MAX_ACCEPTED_LATENCY_MS: f64 = 5000.0; +pub const CACHE_TTL_MS: u64 = 2000; +pub const CACHE_MAX_SIZE: usize = 1000; +pub const MAX_REQUEST_TTL_MS: u64 = 200; +pub const ALPHA_SMOOTHING_FACTOR: f64 = 0.1; + +/// Configuration for the `RequestsScheduler`. +#[derive(Debug, Clone)] +pub struct RequestsSchedulerConfig { + /// Maximum concurrent requests per validator node + pub max_in_flight_requests: usize, + /// Maximum expected latency in milliseconds for score normalization + pub max_accepted_latency_ms: f64, + /// Time-to-live for cached responses in milliseconds + pub cache_ttl_ms: u64, + /// Maximum number of entries in the cache + pub cache_max_size: usize, + /// Maximum latency for an in-flight request before we stop deduplicating it (in milliseconds) + pub max_request_ttl_ms: u64, + /// Smoothing factor for Exponential Moving Averages (0 < alpha < 1) + pub alpha: f64, +} + +impl Default for RequestsSchedulerConfig { + fn default() -> Self { + Self { + max_in_flight_requests: MAX_IN_FLIGHT_REQUESTS, + max_accepted_latency_ms: MAX_ACCEPTED_LATENCY_MS, + cache_ttl_ms: CACHE_TTL_MS, + cache_max_size: CACHE_MAX_SIZE, + max_request_ttl_ms: MAX_REQUEST_TTL_MS, + alpha: ALPHA_SMOOTHING_FACTOR, + } + } +} diff --git a/linera-core/src/client/requests_scheduler/node_info.rs b/linera-core/src/client/requests_scheduler/node_info.rs new file mode 100644 index 000000000000..7e0ddce44d59 --- /dev/null +++ b/linera-core/src/client/requests_scheduler/node_info.rs @@ -0,0 +1,142 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use custom_debug_derive::Debug; +use tokio::sync::Semaphore; + +use super::scoring::ScoringWeights; +use crate::{environment::Environment, remote_node::RemoteNode}; + +/// Tracks performance metrics and request capacity for a validator node using +/// Exponential Moving Averages (EMA) for adaptive scoring. +/// +/// This struct wraps a `RemoteNode` with performance tracking that adapts quickly +/// to changing network conditions. The scoring system uses EMAs to weight recent +/// performance more heavily than historical data. +#[derive(Debug, Clone)] +pub(super) struct NodeInfo { + /// The underlying validator node connection + pub(super) node: RemoteNode, + + /// Semaphore to limit concurrent in-flight requests. + /// It's created with a limit set to `max_in_flight` from configuration. + pub(super) in_flight_semaphore: Arc, + + /// Exponential Moving Average of latency in milliseconds + /// Adapts quickly to changes in response time + ema_latency_ms: f64, + + /// Exponential Moving Average of success rate (0.0 to 1.0) + /// Tracks recent success/failure patterns + ema_success_rate: f64, + + /// Total number of requests processed (for monitoring and cold-start handling) + total_requests: u64, + + /// Configuration for scoring weights + weights: ScoringWeights, + + /// EMA smoothing factor (0 < alpha < 1) + /// Higher values give more weight to recent observations + alpha: f64, + + /// Maximum expected latency in milliseconds for score normalization + max_expected_latency_ms: f64, + + /// Maximum expected in-flight requests for score normalization + max_in_flight: usize, +} + +impl NodeInfo { + /// Creates a new `NodeInfo` with custom configuration. + pub(super) fn with_config( + node: RemoteNode, + weights: ScoringWeights, + alpha: f64, + max_expected_latency_ms: f64, + max_in_flight: usize, + ) -> Self { + assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range"); + Self { + node, + ema_latency_ms: 100.0, // Start with reasonable latency expectation + ema_success_rate: 1.0, // Start optimistically with 100% success + in_flight_semaphore: Arc::new(tokio::sync::Semaphore::new(max_in_flight)), + total_requests: 0, + weights, + alpha, + max_expected_latency_ms, + max_in_flight, + } + } + + /// Calculates a normalized performance score (0.0 to 1.0) using weighted metrics. + /// + /// The score combines three normalized components: + /// - **Latency score**: Inversely proportional to EMA latency + /// - **Success score**: Directly proportional to EMA success rate + /// - **Load score**: Inversely proportional to current load + /// + /// Returns a score from 0.0 to 1.0, where higher values indicate better performance. + pub(super) async fn calculate_score(&self) -> f64 { + // 1. Normalize Latency (lower is better, so we invert) + let latency_score = 1.0 + - (self.ema_latency_ms.min(self.max_expected_latency_ms) + / self.max_expected_latency_ms); + + // 2. Success Rate is already normalized [0, 1] + let success_score = self.ema_success_rate; + + // 3. Normalize Load (lower is better, so we invert) + let current_load = + (self.max_in_flight as f64) - (self.in_flight_semaphore.available_permits() as f64); + let load_score = + 1.0 - (current_load.min(self.max_in_flight as f64) / self.max_in_flight as f64); + + // 4. Apply cold-start penalty for nodes with very few requests + let confidence_factor = (self.total_requests as f64 / 10.0).min(1.0); + + // 5. Combine with weights + let raw_score = (self.weights.latency * latency_score) + + (self.weights.success * success_score) + + (self.weights.load * load_score); + + // Apply confidence factor to penalize nodes with too few samples + raw_score * (0.5 + 0.5 * confidence_factor) + } + + /// Updates performance metrics using Exponential Moving Average. + /// + /// # Arguments + /// - `success`: Whether the request completed successfully + /// - `response_time_ms`: The request's response time in milliseconds + /// + /// Uses EMA formula: new_value = (alpha * observation) + ((1 - alpha) * old_value) + /// This gives more weight to recent observations while maintaining some history. + pub(super) fn update_metrics(&mut self, success: bool, response_time_ms: u64) { + let response_time_f64 = response_time_ms as f64; + + // Update latency EMA + self.ema_latency_ms = + (self.alpha * response_time_f64) + ((1.0 - self.alpha) * self.ema_latency_ms); + + // Update success rate EMA + let success_value = if success { 1.0 } else { 0.0 }; + self.ema_success_rate = + (self.alpha * success_value) + ((1.0 - self.alpha) * self.ema_success_rate); + + self.total_requests += 1; + } + + /// Returns the current EMA success rate. + pub(super) fn ema_success_rate(&self) -> f64 { + self.ema_success_rate + } + + /// Returns the total number of requests processed. + pub(super) fn total_requests(&self) -> u64 { + self.total_requests + } +} diff --git a/linera-core/src/client/requests_scheduler/request.rs b/linera-core/src/client/requests_scheduler/request.rs new file mode 100644 index 000000000000..308e8232e57b --- /dev/null +++ b/linera-core/src/client/requests_scheduler/request.rs @@ -0,0 +1,577 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use linera_base::{ + data_types::{Blob, BlobContent, BlockHeight}, + identifiers::{BlobId, ChainId}, +}; +use linera_chain::types::ConfirmedBlockCertificate; + +use crate::client::requests_scheduler::cache::SubsumingKey; + +/// Unique identifier for different types of download requests. +/// +/// Used for request deduplication to avoid redundant downloads of the same data. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum RequestKey { + /// Download certificates by specific heights + Certificates { + chain_id: ChainId, + heights: Vec, + }, + /// Download a blob by ID + Blob(BlobId), + /// Download a pending blob + PendingBlob { chain_id: ChainId, blob_id: BlobId }, + /// Download certificate for a specific blob + CertificateForBlob(BlobId), +} + +impl RequestKey { + /// Returns the chain ID associated with the request, if applicable. + pub(super) fn chain_id(&self) -> Option { + match self { + RequestKey::Certificates { chain_id, .. } => Some(*chain_id), + RequestKey::PendingBlob { chain_id, .. } => Some(*chain_id), + _ => None, + } + } + + /// Converts certificate-related requests to a common representation of (chain_id, sorted heights). + /// + /// This helper method normalizes both `Certificates` and `CertificatesByHeights` variants + /// into a uniform format for easier comparison and overlap detection. + /// + /// # Returns + /// - `Some((chain_id, heights))` for certificate requests, where heights are sorted + /// - `None` for non-certificate requests (Blob, PendingBlob, CertificateForBlob) + fn heights(&self) -> Option> { + match self { + RequestKey::Certificates { heights, .. } => Some(heights.clone()), + _ => None, + } + } +} + +/// Result types that can be shared across deduplicated requests +#[derive(Debug, Clone)] +pub enum RequestResult { + Certificates(Vec), + Blob(Option), + BlobContent(BlobContent), + Certificate(Box), +} + +/// Marker trait for types that can be converted to/from `RequestResult` +/// for use in the requests cache. +pub trait Cacheable: TryFrom + Into {} +impl Cacheable for T where T: TryFrom + Into {} + +impl From> for RequestResult { + fn from(blob: Option) -> Self { + RequestResult::Blob(blob) + } +} + +impl From> for RequestResult { + fn from(certs: Vec) -> Self { + RequestResult::Certificates(certs) + } +} + +impl From for RequestResult { + fn from(content: BlobContent) -> Self { + RequestResult::BlobContent(content) + } +} + +impl From for RequestResult { + fn from(cert: ConfirmedBlockCertificate) -> Self { + RequestResult::Certificate(Box::new(cert)) + } +} + +impl TryFrom for Option { + type Error = (); + + fn try_from(result: RequestResult) -> Result { + match result { + RequestResult::Blob(blob) => Ok(blob), + _ => Err(()), + } + } +} + +impl TryFrom for Vec { + type Error = (); + + fn try_from(result: RequestResult) -> Result { + match result { + RequestResult::Certificates(certs) => Ok(certs), + _ => Err(()), + } + } +} + +impl TryFrom for BlobContent { + type Error = (); + + fn try_from(result: RequestResult) -> Result { + match result { + RequestResult::BlobContent(content) => Ok(content), + _ => Err(()), + } + } +} + +impl TryFrom for ConfirmedBlockCertificate { + type Error = (); + + fn try_from(result: RequestResult) -> Result { + match result { + RequestResult::Certificate(cert) => Ok(*cert), + _ => Err(()), + } + } +} + +impl SubsumingKey for super::request::RequestKey { + fn subsumes(&self, other: &Self) -> bool { + // Different chains can't subsume each other + if self.chain_id() != other.chain_id() { + return false; + } + + let (in_flight_req_heights, new_req_heights) = match (self.heights(), other.heights()) { + (Some(range1), Some(range2)) => (range1, range2), + _ => return false, // We subsume only certificate requests + }; + + let mut in_flight_req_heights_iter = in_flight_req_heights.into_iter(); + + for new_height in new_req_heights { + if !in_flight_req_heights_iter.any(|h| h == new_height) { + return false; // Found a height not covered by in-flight request + } + } + true + } + + fn try_extract_result( + &self, + in_flight_request: &RequestKey, + result: &RequestResult, + ) -> Option { + // Only certificate results can be extracted + let certificates = match result { + RequestResult::Certificates(certs) => certs, + _ => return None, + }; + + if !in_flight_request.subsumes(self) { + return None; // Can't extract if not subsumed + } + + let mut requested_heights = self.heights()?; + if requested_heights.is_empty() { + return Some(RequestResult::Certificates(vec![])); // Nothing requested + } + let mut certificates_iter = certificates.iter(); + let mut collected = vec![]; + while let Some(height) = requested_heights.first() { + // Remove certs below the requested height. + if let Some(cert) = certificates_iter.find(|cert| &cert.value().height() == height) { + collected.push(cert.clone()); + requested_heights.remove(0); + } else { + return None; // Missing a requested height + } + } + + Some(RequestResult::Certificates(collected)) + } +} + +#[cfg(test)] +mod tests { + use linera_base::{crypto::CryptoHash, data_types::BlockHeight, identifiers::ChainId}; + + use super::{RequestKey, SubsumingKey}; + + #[test] + fn test_subsumes_complete_containment() { + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let large = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(11), BlockHeight(12), BlockHeight(13)], + }; + let small = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(12)], + }; + assert!(large.subsumes(&small)); + assert!(!small.subsumes(&large)); + } + + #[test] + fn test_subsumes_partial_containment() { + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let req1 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(12), BlockHeight(13)], + }; + let req2 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(12), BlockHeight(14)], + }; + assert!(!req1.subsumes(&req2)); + assert!(!req2.subsumes(&req1)); + } + + #[test] + fn test_subsumes_different_chains() { + let chain1 = ChainId(CryptoHash::test_hash("chain1")); + let chain2 = ChainId(CryptoHash::test_hash("chain2")); + let req1 = RequestKey::Certificates { + chain_id: chain1, + heights: vec![BlockHeight(12)], + }; + let req2 = RequestKey::Certificates { + chain_id: chain2, + heights: vec![BlockHeight(12)], + }; + assert!(!req1.subsumes(&req2)); + } + + // Helper function to create a test certificate at a specific height + fn make_test_cert( + height: u64, + chain_id: ChainId, + ) -> linera_chain::types::ConfirmedBlockCertificate { + use linera_base::{ + crypto::ValidatorKeypair, + data_types::{Round, Timestamp}, + }; + use linera_chain::{ + block::ConfirmedBlock, + data_types::{BlockExecutionOutcome, LiteValue, LiteVote}, + test::{make_first_block, BlockTestExt, VoteTestExt}, + }; + + let keypair = ValidatorKeypair::generate(); + let mut proposed_block = make_first_block(chain_id).with_timestamp(Timestamp::from(height)); + + // Set the correct height + proposed_block.height = BlockHeight(height); + + // Create a Block from the proposed block with default execution outcome + let block = BlockExecutionOutcome::default().with(proposed_block); + + // Create a ConfirmedBlock + let confirmed_block = ConfirmedBlock::new(block); + + // Create a LiteVote and convert to Vote + let lite_vote = LiteVote::new( + LiteValue::new(&confirmed_block), + Round::MultiLeader(0), + &keypair.secret_key, + ); + + // Convert to full vote + let vote = lite_vote.with_value(confirmed_block).unwrap(); + + // Convert vote to certificate + vote.into_certificate(keypair.secret_key.public()) + } + + #[test] + fn test_try_extract_result_non_certificate_result() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let req1 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(12)], + }; + let req2 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(12)], + }; + + // Non-certificate result should return None + let blob_result = RequestResult::Blob(None); + assert!(req1.try_extract_result(&req2, &blob_result).is_none()); + } + + #[test] + fn test_try_extract_result_empty_request_range() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let req1 = RequestKey::Certificates { + chain_id, + heights: vec![], + }; + let req2 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(10)], + }; + + let certs = vec![make_test_cert(10, chain_id)]; + let result = RequestResult::Certificates(certs); + + // Empty request is always extractable, should return empty result + match req1.try_extract_result(&req2, &result) { + Some(RequestResult::Certificates(extracted_certs)) => { + assert!(extracted_certs.is_empty()); + } + _ => panic!("Expected Some empty Certificates result"), + } + } + + #[test] + fn test_try_extract_result_empty_result_range() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let req1 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(12)], + }; + let req2 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(12)], + }; + + let result = RequestResult::Certificates(vec![]); // Empty result + + // Empty result should return None + assert!(req1.try_extract_result(&req2, &result).is_none()); + } + + #[test] + fn test_try_extract_result_non_overlapping_ranges() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let new_req = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(10)], + }; + let in_flight_req = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(11)], + }; + + // Result does not contain all requested heights + let certs = vec![make_test_cert(11, chain_id)]; + let result = RequestResult::Certificates(certs); + + // No overlap, should return None + assert!(new_req + .try_extract_result(&in_flight_req, &result) + .is_none()); + } + + #[test] + fn test_try_extract_result_partial_overlap_missing_start() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let req1 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(10), BlockHeight(11), BlockHeight(12)], + }; + let req2 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(11), BlockHeight(12)], + }; + + // Result missing the first height (10) + let certs = vec![make_test_cert(11, chain_id), make_test_cert(12, chain_id)]; + let result = RequestResult::Certificates(certs); + + // Missing start height, should return None + assert!(req1.try_extract_result(&req2, &result).is_none()); + } + + #[test] + fn test_try_extract_result_partial_overlap_missing_end() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let req1 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(10), BlockHeight(11), BlockHeight(12)], + }; + let req2 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(10), BlockHeight(11)], + }; + + // Result missing the last height (14) + let certs = vec![make_test_cert(10, chain_id), make_test_cert(11, chain_id)]; + let result = RequestResult::Certificates(certs); + + // Missing end height, should return None + assert!(req1.try_extract_result(&req2, &result).is_none()); + } + + #[test] + fn test_try_extract_result_partial_overlap_missing_middle() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let new_req = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(10), BlockHeight(12), BlockHeight(13)], + }; + let in_flight_req = RequestKey::Certificates { + chain_id, + heights: vec![ + BlockHeight(10), + BlockHeight(12), + BlockHeight(13), + BlockHeight(14), + ], + }; + + let certs = vec![ + make_test_cert(10, chain_id), + make_test_cert(13, chain_id), + make_test_cert(14, chain_id), + ]; + let result = RequestResult::Certificates(certs); + + assert!(new_req + .try_extract_result(&in_flight_req, &result) + .is_none()); + assert!(in_flight_req + .try_extract_result(&new_req, &result) + .is_none()); + } + + #[test] + fn test_try_extract_result_exact_match() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let req1 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(10), BlockHeight(11), BlockHeight(12)], + }; // [10, 11, 12] + let req2 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(10), BlockHeight(11), BlockHeight(12)], + }; + + let certs = vec![ + make_test_cert(10, chain_id), + make_test_cert(11, chain_id), + make_test_cert(12, chain_id), + ]; + let result = RequestResult::Certificates(certs.clone()); + + // Exact match should return all certificates + let extracted = req1.try_extract_result(&req2, &result); + assert!(extracted.is_some()); + match extracted.unwrap() { + RequestResult::Certificates(extracted_certs) => { + assert_eq!(extracted_certs, certs); + } + _ => panic!("Expected Certificates result"), + } + } + + #[test] + fn test_try_extract_result_superset_extraction() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let req1 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(12), BlockHeight(13)], + }; + let req2 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(12), BlockHeight(13)], + }; + + // Result has more certificates than requested + let certs = vec![ + make_test_cert(10, chain_id), + make_test_cert(11, chain_id), + make_test_cert(12, chain_id), + make_test_cert(13, chain_id), + make_test_cert(14, chain_id), + ]; + let result = RequestResult::Certificates(certs); + + // Should extract only the requested range [12, 13] + let extracted = req1.try_extract_result(&req2, &result); + assert!(extracted.is_some()); + match extracted.unwrap() { + RequestResult::Certificates(extracted_certs) => { + assert_eq!(extracted_certs.len(), 2); + assert_eq!(extracted_certs[0].value().height(), BlockHeight(12)); + assert_eq!(extracted_certs[1].value().height(), BlockHeight(13)); + } + _ => panic!("Expected Certificates result"), + } + } + + #[test] + fn test_try_extract_result_single_height() { + use super::RequestResult; + + let chain_id = ChainId(CryptoHash::test_hash("chain1")); + let req1 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(15)], + }; // [15] + let req2 = RequestKey::Certificates { + chain_id, + heights: vec![BlockHeight(10), BlockHeight(15), BlockHeight(20)], + }; + + let certs = vec![ + make_test_cert(10, chain_id), + make_test_cert(15, chain_id), + make_test_cert(20, chain_id), + ]; + let result = RequestResult::Certificates(certs); + + // Should extract only height 15 + let extracted = req1.try_extract_result(&req2, &result); + assert!(extracted.is_some()); + match extracted.unwrap() { + RequestResult::Certificates(extracted_certs) => { + assert_eq!(extracted_certs.len(), 1); + assert_eq!(extracted_certs[0].value().height(), BlockHeight(15)); + } + _ => panic!("Expected Certificates result"), + } + } + + #[test] + fn test_try_extract_result_different_chains() { + use super::RequestResult; + + let chain1 = ChainId(CryptoHash::test_hash("chain1")); + let chain2 = ChainId(CryptoHash::test_hash("chain2")); + let req1 = RequestKey::Certificates { + chain_id: chain1, + heights: vec![BlockHeight(12)], + }; + let req2 = RequestKey::Certificates { + chain_id: chain2, + heights: vec![BlockHeight(12)], + }; + + let certs = vec![make_test_cert(12, chain1)]; + let result = RequestResult::Certificates(certs); + + // Different chains should return None + assert!(req1.try_extract_result(&req2, &result).is_none()); + } +} diff --git a/linera-core/src/client/requests_scheduler/scheduler.rs b/linera-core/src/client/requests_scheduler/scheduler.rs new file mode 100644 index 000000000000..dca0a304ab76 --- /dev/null +++ b/linera-core/src/client/requests_scheduler/scheduler.rs @@ -0,0 +1,1291 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{cmp::Ordering, collections::BTreeMap, future::Future, sync::Arc}; + +use custom_debug_derive::Debug; +use futures::stream::{FuturesUnordered, StreamExt}; +use linera_base::{ + crypto::ValidatorPublicKey, + data_types::{Blob, BlobContent, BlockHeight}, + identifiers::{BlobId, ChainId}, + time::{Duration, Instant}, +}; +use linera_chain::types::ConfirmedBlockCertificate; +use rand::{ + distributions::{Distribution, WeightedIndex}, + prelude::SliceRandom as _, +}; +use tracing::instrument; + +use super::{ + cache::{RequestsCache, SubsumingKey}, + in_flight_tracker::{InFlightMatch, InFlightTracker}, + node_info::NodeInfo, + request::{RequestKey, RequestResult}, + scoring::ScoringWeights, +}; +use crate::{ + client::{ + communicate_concurrently, + requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable}, + RequestsSchedulerConfig, + }, + environment::Environment, + node::{NodeError, ValidatorNode}, + remote_node::RemoteNode, +}; + +#[cfg(with_metrics)] +pub(super) mod metrics { + use std::sync::LazyLock; + + use linera_base::prometheus_util::{ + exponential_bucket_latencies, register_histogram_vec, register_int_counter, + register_int_counter_vec, + }; + use prometheus::{HistogramVec, IntCounter, IntCounterVec}; + + /// Histogram of response times per validator (in milliseconds) + pub(super) static VALIDATOR_RESPONSE_TIME: LazyLock = LazyLock::new(|| { + register_histogram_vec( + "requests_scheduler_response_time_ms", + "Response time for requests to validators in milliseconds", + &["validator"], + exponential_bucket_latencies(10000.0), // up to 10 seconds + ) + }); + + /// Counter of total requests made to each validator + pub(super) static VALIDATOR_REQUEST_TOTAL: LazyLock = LazyLock::new(|| { + register_int_counter_vec( + "requests_scheduler_request_total", + "Total number of requests made to each validator", + &["validator"], + ) + }); + + /// Counter of successful requests per validator + pub(super) static VALIDATOR_REQUEST_SUCCESS: LazyLock = LazyLock::new(|| { + register_int_counter_vec( + "requests_scheduler_request_success", + "Number of successful requests to each validator", + &["validator"], + ) + }); + + /// Counter for requests that were resolved from the response cache. + pub(super) static REQUEST_CACHE_DEDUPLICATION: LazyLock = LazyLock::new(|| { + register_int_counter( + "requests_scheduler_request_deduplication_total", + "Number of requests that were deduplicated by finding the result in the cache.", + ) + }); + + /// Counter for requests that were served from cache + pub static REQUEST_CACHE_HIT: LazyLock = LazyLock::new(|| { + register_int_counter( + "requests_scheduler_request_cache_hit_total", + "Number of requests that were served from cache", + ) + }); +} + +/// Manages a pool of validator nodes with intelligent load balancing and performance tracking. +/// +/// The `RequestsScheduler` maintains performance metrics for each validator node using +/// Exponential Moving Averages (EMA) and uses these metrics to make intelligent routing +/// decisions. It prevents node overload through request capacity limits and automatically +/// retries failed requests on alternative nodes. +/// +/// # Examples +/// +/// ```ignore +/// // Create with default configuration (balanced scoring) +/// let manager = RequestsScheduler::new(validator_nodes); +/// +/// // Create with custom configuration prioritizing low latency +/// let latency_weights = ScoringWeights { +/// latency: 0.6, +/// success: 0.3, +/// load: 0.1, +/// }; +/// let manager = RequestsScheduler::with_config( +/// validator_nodes, +/// 15, // max 15 concurrent requests per node +/// latency_weights, // custom scoring weights +/// 0.2, // higher alpha for faster adaptation +/// 3000.0, // max expected latency (3 seconds) +/// Duration::from_secs(60), // 60 second cache TTL +/// 200, // cache up to 200 entries +/// ); +/// ``` +#[derive(Debug, Clone)] +pub struct RequestsScheduler { + /// Thread-safe map of validator nodes indexed by their public keys. + /// Each node is wrapped with EMA-based performance tracking information. + nodes: Arc>>>, + /// Maximum number of concurrent requests allowed per node. + /// Prevents overwhelming individual validators with too many parallel requests. + max_requests_per_node: usize, + /// Default scoring weights applied to new nodes. + weights: ScoringWeights, + /// Default EMA smoothing factor for new nodes. + alpha: f64, + /// Default maximum expected latency in milliseconds for score normalization. + max_expected_latency: f64, + /// Tracks in-flight requests to deduplicate concurrent requests for the same data. + in_flight_tracker: InFlightTracker>, + /// Cache of recently completed requests with their results and timestamps. + cache: RequestsCache, +} + +impl RequestsScheduler { + /// Creates a new `RequestsScheduler` with the provided configuration. + pub fn new( + nodes: impl IntoIterator>, + config: RequestsSchedulerConfig, + ) -> Self { + Self::with_config( + nodes, + config.max_in_flight_requests, + ScoringWeights::default(), + config.alpha, + config.max_accepted_latency_ms, + Duration::from_millis(config.cache_ttl_ms), + config.cache_max_size, + Duration::from_millis(config.max_request_ttl_ms), + ) + } + + /// Creates a new `RequestsScheduler` with custom configuration. + /// + /// # Arguments + /// - `nodes`: Initial set of validator nodes + /// - `max_requests_per_node`: Maximum concurrent requests per node + /// - `weights`: Scoring weights for performance metrics + /// - `alpha`: EMA smoothing factor (0 < alpha < 1) + /// - `max_expected_latency_ms`: Maximum expected latency for score normalization + /// - `cache_ttl`: Time-to-live for cached responses + /// - `max_cache_size`: Maximum number of entries in the cache + /// - `max_request_ttl`: Maximum latency for an in-flight request before we stop deduplicating it + #[expect(clippy::too_many_arguments)] + pub fn with_config( + nodes: impl IntoIterator>, + max_requests_per_node: usize, + weights: ScoringWeights, + alpha: f64, + max_expected_latency_ms: f64, + cache_ttl: Duration, + max_cache_size: usize, + max_request_ttl: Duration, + ) -> Self { + assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range"); + Self { + nodes: Arc::new(tokio::sync::RwLock::new( + nodes + .into_iter() + .map(|node| { + ( + node.public_key, + NodeInfo::with_config( + node, + weights, + alpha, + max_expected_latency_ms, + max_requests_per_node, + ), + ) + }) + .collect(), + )), + max_requests_per_node, + weights, + alpha, + max_expected_latency: max_expected_latency_ms, + in_flight_tracker: InFlightTracker::new(max_request_ttl), + cache: RequestsCache::new(cache_ttl, max_cache_size), + } + } + + /// Executes an operation with an automatically selected peer, handling deduplication, + /// tracking, and peer selection. + /// + /// This method provides a high-level API for executing operations against remote nodes + /// while leveraging the [`RequestsScheduler`]'s intelligent peer selection, performance tracking, + /// and request deduplication capabilities. + /// + /// # Type Parameters + /// - `R`: The inner result type (what the operation returns on success) + /// - `F`: The async closure type that takes a `RemoteNode` and returns a future + /// - `Fut`: The future type returned by the closure + /// + /// # Arguments + /// - `key`: Unique identifier for request deduplication + /// - `operation`: Async closure that takes a selected peer and performs the operation + /// + /// # Returns + /// The result from the operation, potentially from cache or a deduplicated in-flight request + /// + /// # Example + /// ```ignore + /// let result: Result, NodeError> = requests_scheduler + /// .with_best( + /// RequestKey::Certificates { chain_id, start, limit }, + /// |peer| async move { + /// peer.download_certificates_from(chain_id, start, limit).await + /// } + /// ) + /// .await; + /// ``` + #[allow(unused)] + async fn with_best(&self, key: RequestKey, operation: F) -> Result + where + R: Cacheable + Clone + Send + 'static, + F: FnOnce(RemoteNode) -> Fut, + Fut: Future>, + { + // Select the best available peer + let peer = self + .select_best_peer() + .await + .ok_or_else(|| NodeError::WorkerError { + error: "No validators available".to_string(), + })?; + self.with_peer(key, peer, operation).await + } + + /// Executes an operation with a specific peer. + /// + /// Similar to [`with_best`](Self::with_best), but uses the provided peer directly + /// instead of selecting the best available peer. This is useful when you need to + /// query a specific validator node. + /// + /// # Type Parameters + /// - `R`: The inner result type (what the operation returns on success) + /// - `F`: The async closure type that takes a `RemoteNode` and returns a future + /// - `Fut`: The future type returned by the closure + /// + /// # Arguments + /// - `key`: Unique identifier for request deduplication + /// - `peer`: The specific peer to use for the operation + /// - `operation`: Async closure that takes the peer and performs the operation + /// + /// # Returns + /// The result from the operation, potentially from cache or a deduplicated in-flight request + async fn with_peer( + &self, + key: RequestKey, + peer: RemoteNode, + operation: F, + ) -> Result + where + R: Cacheable + Clone + Send + 'static, + F: FnOnce(RemoteNode) -> Fut, + Fut: Future>, + { + self.add_peer(peer.clone()).await; + self.in_flight_tracker + .add_alternative_peer(&key, peer.clone()) + .await; + self.deduplicated_request(key, peer, |peer| async { + self.track_request(peer, operation).await + }) + .await + } + + #[instrument(level = "trace", skip_all)] + async fn download_blob( + &self, + peers: &[RemoteNode], + blob_id: BlobId, + timeout: Duration, + ) -> Result, NodeError> { + let key = RequestKey::Blob(blob_id); + let mut peers = peers.to_vec(); + peers.shuffle(&mut rand::thread_rng()); + communicate_concurrently( + &peers, + async move |peer| { + self.with_peer(key, peer, |peer| async move { + peer.download_blob(blob_id).await + }) + .await + }, + |errors| errors.last().cloned().unwrap(), + timeout, + ) + .await + .map_err(|(_validator, error)| error) + } + + /// Downloads the blobs with the given IDs. This is done in one concurrent task per blob. + /// Uses intelligent peer selection based on scores and load balancing. + /// Returns `None` if it couldn't find all blobs. + #[instrument(level = "trace", skip_all)] + pub async fn download_blobs( + &self, + peers: &[RemoteNode], + blob_ids: &[BlobId], + timeout: Duration, + ) -> Result>, NodeError> { + let mut stream = blob_ids + .iter() + .map(|blob_id| self.download_blob(peers, *blob_id, timeout)) + .collect::>(); + + let mut blobs = Vec::new(); + while let Some(maybe_blob) = stream.next().await { + blobs.push(maybe_blob?); + } + Ok(blobs.into_iter().collect::>>()) + } + + pub async fn download_certificates( + &self, + peer: &RemoteNode, + chain_id: ChainId, + start: BlockHeight, + limit: u64, + ) -> Result, NodeError> { + let heights = (start.0..start.0 + limit) + .map(BlockHeight) + .collect::>(); + self.with_peer( + RequestKey::Certificates { + chain_id, + heights: heights.clone(), + }, + peer.clone(), + |peer| async move { + Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await + }, + ) + .await + } + + pub async fn download_certificates_by_heights( + &self, + peer: &RemoteNode, + chain_id: ChainId, + heights: Vec, + ) -> Result, NodeError> { + self.with_peer( + RequestKey::Certificates { + chain_id, + heights: heights.clone(), + }, + peer.clone(), + |peer| async move { + peer.download_certificates_by_heights(chain_id, heights) + .await + }, + ) + .await + } + + pub async fn download_certificate_for_blob( + &self, + peer: &RemoteNode, + blob_id: BlobId, + ) -> Result { + self.with_peer( + RequestKey::CertificateForBlob(blob_id), + peer.clone(), + |peer| async move { peer.download_certificate_for_blob(blob_id).await }, + ) + .await + } + + pub async fn download_pending_blob( + &self, + peer: &RemoteNode, + chain_id: ChainId, + blob_id: BlobId, + ) -> Result { + self.with_peer( + RequestKey::PendingBlob { chain_id, blob_id }, + peer.clone(), + |peer| async move { peer.node.download_pending_blob(chain_id, blob_id).await }, + ) + .await + } + + /// Returns the alternative peers registered for an in-flight request, if any. + /// + /// This can be used to retry a failed request with alternative data sources + /// that were registered during request deduplication. + pub async fn get_alternative_peers( + &self, + key: &RequestKey, + ) -> Option>> { + self.in_flight_tracker.get_alternative_peers(key).await + } + + /// Returns current performance metrics for all managed nodes. + /// + /// Each entry contains: + /// - Performance score (f64, normalized 0.0-1.0) + /// - EMA success rate (f64, 0.0-1.0) + /// - Total requests processed (u64) + /// + /// Useful for monitoring and debugging node performance. + pub async fn get_node_scores(&self) -> BTreeMap { + let nodes = self.nodes.read().await; + let mut result = BTreeMap::new(); + + for (key, info) in nodes.iter() { + let score = info.calculate_score().await; + result.insert( + *key, + (score, info.ema_success_rate(), info.total_requests()), + ); + } + + result + } + + /// Wraps a request operation with performance tracking and capacity management. + /// + /// This method: + /// 1. Acquires a request slot (blocks asynchronously until one is available) + /// 2. Executes the provided operation with the selected peer + /// 3. Measures response time + /// 4. Updates node metrics based on success/failure + /// 5. Releases the request slot + /// + /// # Arguments + /// - `peer`: The remote node to execute the operation on + /// - `operation`: Async closure that performs the actual request with the selected peer + /// + /// # Behavior + /// If no slot is available, this method will wait asynchronously (without polling) + /// until another request completes and releases its slot. The task will be efficiently + /// suspended and woken by the async runtime using notification mechanisms. + async fn track_request( + &self, + peer: RemoteNode, + operation: F, + ) -> Result + where + F: FnOnce(RemoteNode) -> Fut, + Fut: Future>, + { + let start_time = Instant::now(); + let public_key = peer.public_key; + + // Acquire request slot + let nodes = self.nodes.read().await; + let node = nodes.get(&public_key).expect("Node must exist"); + let semaphore = node.in_flight_semaphore.clone(); + let permit = semaphore.acquire().await.unwrap(); + drop(nodes); + + // Execute the operation + let result = operation(peer).await; + + // Update metrics and release slot + let response_time_ms = start_time.elapsed().as_millis() as u64; + drop(permit); // Explicitly drop the permit to release the slot + let is_success = result.is_ok(); + { + let mut nodes = self.nodes.write().await; + if let Some(info) = nodes.get_mut(&public_key) { + info.update_metrics(is_success, response_time_ms); + let score = info.calculate_score().await; + tracing::trace!( + node = %public_key, + address = %info.node.node.address(), + success = %is_success, + response_time_ms = %response_time_ms, + score = %score, + total_requests = %info.total_requests(), + "Request completed" + ); + } + } + + // Record Prometheus metrics + #[cfg(with_metrics)] + { + let validator_name = public_key.to_string(); + metrics::VALIDATOR_RESPONSE_TIME + .with_label_values(&[&validator_name]) + .observe(response_time_ms as f64); + metrics::VALIDATOR_REQUEST_TOTAL + .with_label_values(&[&validator_name]) + .inc(); + if is_success { + metrics::VALIDATOR_REQUEST_SUCCESS + .with_label_values(&[&validator_name]) + .inc(); + } + } + + result + } + + /// Deduplicates concurrent requests for the same data. + /// + /// If a request for the same key is already in flight, this method waits for + /// the existing request to complete and returns its result. Otherwise, it + /// executes the operation and broadcasts the result to all waiting callers. + /// + /// This method also performs **subsumption-based deduplication**: if a larger + /// request that contains all the data needed by this request is already cached + /// or in flight, we can extract the subset result instead of making a new request. + /// + /// # Arguments + /// - `key`: Unique identifier for the request + /// - `operation`: Async closure that performs the actual request + /// + /// # Returns + /// The result from either the in-flight request or the newly executed operation + async fn deduplicated_request( + &self, + key: RequestKey, + peer: N, + operation: F, + ) -> Result + where + T: Cacheable + Clone + Send + 'static, + F: FnOnce(N) -> Fut, + Fut: Future>, + { + // Check cache for exact or subsuming match + if let Some(result) = self.cache.get(&key).await { + return Ok(result); + } + + // Check if there's an in-flight request (exact or subsuming) + if let Some(in_flight_match) = self.in_flight_tracker.try_subscribe(&key).await { + match in_flight_match { + InFlightMatch::Exact(Subscribed(mut receiver)) => { + tracing::trace!( + key = ?key, + "deduplicating request (exact match) - joining existing in-flight request" + ); + #[cfg(with_metrics)] + metrics::REQUEST_CACHE_DEDUPLICATION.inc(); + // Wait for result from existing request + match receiver.recv().await { + Ok(result) => match result.as_ref().clone() { + Ok(res) => match T::try_from(res) { + Ok(converted) => { + tracing::trace!( + key = ?key, + "received result from deduplicated in-flight request" + ); + return Ok(converted); + } + Err(_) => { + tracing::trace!( + key = ?key, + "failed to convert result from deduplicated in-flight request, will execute independently" + ); + } + }, + Err(e) => { + tracing::trace!( + key = ?key, + error = %e, + "in-flight request failed", + ); + // Fall through to execute a new request + } + }, + Err(_) => { + tracing::trace!( + key = ?key, + "in-flight request sender dropped" + ); + // Fall through to execute a new request + } + } + } + InFlightMatch::Subsuming { + key: subsuming_key, + outcome: Subscribed(mut receiver), + } => { + tracing::trace!( + key = ?key, + subsumed_by = ?subsuming_key, + "deduplicating request (subsumption) - joining larger in-flight request" + ); + #[cfg(with_metrics)] + metrics::REQUEST_CACHE_DEDUPLICATION.inc(); + // Wait for result from the subsuming request + match receiver.recv().await { + Ok(result) => { + match result.as_ref() { + Ok(res) => { + if let Some(extracted) = + key.try_extract_result(&subsuming_key, res) + { + tracing::trace!( + key = ?key, + "extracted subset result from larger in-flight request" + ); + match T::try_from(extracted) { + Ok(converted) => return Ok(converted), + Err(_) => { + tracing::trace!( + key = ?key, + "failed to convert extracted result, will execute independently" + ); + } + } + } else { + // Extraction failed, fall through to execute our own request + tracing::trace!( + key = ?key, + "failed to extract from subsuming request, will execute independently" + ); + } + } + Err(e) => { + tracing::trace!( + key = ?key, + error = %e, + "subsuming in-flight request failed", + ); + // Fall through to execute our own request + } + } + } + Err(_) => { + tracing::trace!( + key = ?key, + "subsuming in-flight request sender dropped" + ); + } + } + } + } + }; + + // Create new in-flight entry for this request + self.in_flight_tracker.insert_new(key.clone()).await; + + // Execute the actual request + tracing::trace!(key = ?key, "executing new request"); + let result = operation(peer).await; + let result_for_broadcast: Result = result.clone().map(Into::into); + let shared_result = Arc::new(result_for_broadcast); + + // Broadcast result and clean up + self.in_flight_tracker + .complete_and_broadcast(&key, shared_result.clone()) + .await; + + if let Ok(success) = shared_result.as_ref() { + self.cache + .store(key.clone(), Arc::new(success.clone())) + .await; + } + result + } + + /// Returns all peers ordered by their score (highest first). + /// + /// Only includes peers that can currently accept requests. Each peer is paired + /// with its calculated score based on latency, success rate, and availability. + /// + /// # Returns + /// A vector of `(score, peer)` tuples sorted by score in descending order. + /// Returns an empty vector if no peers can accept requests. + async fn peers_by_score(&self) -> Vec<(f64, RemoteNode)> { + let nodes = self.nodes.read().await; + + // Filter nodes that can accept requests and calculate their scores + let mut scored_nodes = Vec::new(); + for info in nodes.values() { + let score = info.calculate_score().await; + scored_nodes.push((score, info.node.clone())); + } + + // Sort by score (highest first) + scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(Ordering::Equal)); + + scored_nodes + } + + /// Selects the best available peer using weighted random selection from top performers. + /// + /// This method: + /// 1. Filters nodes that have available request capacity + /// 2. Sorts them by performance score + /// 3. Performs weighted random selection from the top 3 performers + /// + /// This approach balances between choosing high-performing nodes and distributing + /// load across multiple validators to avoid creating hotspots. + /// + /// Returns `None` if no nodes are available or all are at capacity. + async fn select_best_peer(&self) -> Option> { + let scored_nodes = self.peers_by_score().await; + + if scored_nodes.is_empty() { + return None; + } + + // Use weighted random selection from top performers (top 3 or all if less) + let top_count = scored_nodes.len().min(3); + let top_nodes = &scored_nodes[..top_count]; + + // Create weights based on normalized scores + // Add small epsilon to prevent zero weights + let weights: Vec = top_nodes.iter().map(|(score, _)| score.max(0.01)).collect(); + + if let Ok(dist) = WeightedIndex::new(&weights) { + let mut rng = rand::thread_rng(); + let index = dist.sample(&mut rng); + Some(top_nodes[index].1.clone()) + } else { + // Fallback to the best node if weights are invalid + tracing::warn!("failed to create weighted distribution, defaulting to best node"); + Some(scored_nodes[0].1.clone()) + } + } + + /// Adds a new peer to the manager if it doesn't already exist. + async fn add_peer(&self, node: RemoteNode) { + let mut nodes = self.nodes.write().await; + let public_key = node.public_key; + nodes.entry(public_key).or_insert_with(|| { + NodeInfo::with_config( + node, + self.weights, + self.alpha, + self.max_expected_latency, + self.max_requests_per_node, + ) + }); + } +} + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + + use linera_base::{ + crypto::{CryptoHash, InMemorySigner}, + data_types::BlockHeight, + identifiers::ChainId, + time::Duration, + }; + use linera_chain::types::ConfirmedBlockCertificate; + use tokio::sync::oneshot; + + use super::{super::request::RequestKey, *}; + use crate::{client::requests_scheduler::MAX_REQUEST_TTL_MS, node::NodeError}; + + type TestEnvironment = crate::environment::Test; + + /// Helper function to create a test RequestsScheduler with custom configuration + fn create_test_manager( + in_flight_timeout: Duration, + cache_ttl: Duration, + ) -> Arc> { + let mut manager = RequestsScheduler::with_config( + vec![], // No actual nodes needed for these tests + 10, + ScoringWeights::default(), + 0.1, + 1000.0, + cache_ttl, + 100, + in_flight_timeout, + ); + // Replace the tracker with one using the custom timeout + manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout); + Arc::new(manager) + } + + /// Helper function to create a test result + fn test_result_ok() -> Result, NodeError> { + Ok(vec![]) + } + + /// Helper function to create a test request key + fn test_key() -> RequestKey { + RequestKey::Certificates { + chain_id: ChainId(CryptoHash::test_hash("test")), + heights: vec![BlockHeight(0), BlockHeight(1)], + } + } + + #[tokio::test] + async fn test_cache_hit_returns_cached_result() { + // Create a manager with standard cache TTL + let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60)); + let key = test_key(); + + // Track how many times the operation is executed + let execution_count = Arc::new(AtomicUsize::new(0)); + let execution_count_clone = execution_count.clone(); + + // First call - should execute the operation and cache the result + let result1: Result, NodeError> = manager + .deduplicated_request(key.clone(), (), |_| async move { + execution_count_clone.fetch_add(1, Ordering::SeqCst); + test_result_ok() + }) + .await; + + assert!(result1.is_ok()); + assert_eq!(execution_count.load(Ordering::SeqCst), 1); + + // Second call - should return cached result without executing the operation + let execution_count_clone2 = execution_count.clone(); + let result2: Result, NodeError> = manager + .deduplicated_request(key.clone(), (), |_| async move { + execution_count_clone2.fetch_add(1, Ordering::SeqCst); + test_result_ok() + }) + .await; + + assert_eq!(result1, result2); + // Operation should still only have been executed once (cache hit) + assert_eq!(execution_count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_in_flight_request_deduplication() { + let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60)); + let key = test_key(); + + // Track how many times the operation is executed + let execution_count = Arc::new(AtomicUsize::new(0)); + + // Create a channel to control when the first operation completes + let (tx, rx) = oneshot::channel(); + + // Start first request (will be slow - waits for signal) + let manager_clone = Arc::clone(&manager); + let key_clone = key.clone(); + let execution_count_clone = execution_count.clone(); + let first_request = tokio::spawn(async move { + manager_clone + .deduplicated_request(key_clone, (), |_| async move { + execution_count_clone.fetch_add(1, Ordering::SeqCst); + // Wait for signal before completing + rx.await.unwrap(); + test_result_ok() + }) + .await + }); + + // Start second request - should deduplicate and wait for the first + let execution_count_clone2 = execution_count.clone(); + let second_request = tokio::spawn(async move { + manager + .deduplicated_request(key, (), |_| async move { + execution_count_clone2.fetch_add(1, Ordering::SeqCst); + test_result_ok() + }) + .await + }); + + // Signal the first request to complete + tx.send(()).unwrap(); + + // Both requests should complete successfully + let result1: Result, NodeError> = + first_request.await.unwrap(); + let result2: Result, NodeError> = + second_request.await.unwrap(); + + assert!(result1.is_ok()); + assert_eq!(result1, result2); + + // Operation should only have been executed once (deduplication worked) + assert_eq!(execution_count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_multiple_subscribers_all_notified() { + let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60)); + let key = test_key(); + + // Track how many times the operation is executed + let execution_count = Arc::new(AtomicUsize::new(0)); + + // Create a channel to control when the operation completes + let (tx, rx) = oneshot::channel(); + + // Start first request (will be slow - waits for signal) + let manager_clone1 = Arc::clone(&manager); + let key_clone1 = key.clone(); + let execution_count_clone = execution_count.clone(); + let first_request = tokio::spawn(async move { + manager_clone1 + .deduplicated_request(key_clone1, (), |_| async move { + execution_count_clone.fetch_add(1, Ordering::SeqCst); + rx.await.unwrap(); + test_result_ok() + }) + .await + }); + + // Start multiple additional requests - all should deduplicate + let mut handles = vec![]; + for _ in 0..5 { + let manager_clone = Arc::clone(&manager); + let key_clone = key.clone(); + let execution_count_clone = execution_count.clone(); + let handle = tokio::spawn(async move { + manager_clone + .deduplicated_request(key_clone, (), |_| async move { + execution_count_clone.fetch_add(1, Ordering::SeqCst); + test_result_ok() + }) + .await + }); + handles.push(handle); + } + + // Signal the first request to complete + tx.send(()).unwrap(); + + // First request should complete successfully + let result: Result, NodeError> = + first_request.await.unwrap(); + assert!(result.is_ok()); + + // All subscriber requests should also complete successfully + for handle in handles { + assert_eq!(handle.await.unwrap(), result); + } + + // Operation should only have been executed once (all requests were deduplicated) + assert_eq!(execution_count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_timeout_triggers_new_request() { + // Create a manager with a very short in-flight timeout + let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60)); + + let key = test_key(); + + // Track how many times the operation is executed + let execution_count = Arc::new(AtomicUsize::new(0)); + + // Create a channel to control when the first operation completes + let (tx, rx) = oneshot::channel(); + + // Start first request (will be slow - waits for signal) + let manager_clone = Arc::clone(&manager); + let key_clone = key.clone(); + let execution_count_clone = execution_count.clone(); + let first_request = tokio::spawn(async move { + manager_clone + .deduplicated_request(key_clone, (), |_| async move { + execution_count_clone.fetch_add(1, Ordering::SeqCst); + rx.await.unwrap(); + test_result_ok() + }) + .await + }); + + // Wait for the timeout to elapse + tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await; + + // Start second request - should NOT deduplicate because first request exceeded timeout + let execution_count_clone2 = execution_count.clone(); + let second_request = tokio::spawn(async move { + manager + .deduplicated_request(key, (), |_| async move { + execution_count_clone2.fetch_add(1, Ordering::SeqCst); + test_result_ok() + }) + .await + }); + + // Wait for second request to complete + let result2: Result, NodeError> = + second_request.await.unwrap(); + assert!(result2.is_ok()); + + // Complete the first request + tx.send(()).unwrap(); + let result1: Result, NodeError> = + first_request.await.unwrap(); + assert!(result1.is_ok()); + + // Operation should have been executed twice (timeout triggered new request) + assert_eq!(execution_count.load(Ordering::SeqCst), 2); + } + + #[tokio::test] + async fn test_slot_limiting_blocks_excess_requests() { + // Tests the slot limiting mechanism: + // - Creates a RequestsScheduler with max_requests_per_node = 2 + // - Starts two slow requests that acquire both available slots + // - Starts a third request and verifies it's blocked waiting for a slot (execution count stays at 2) + // - Completes the first request to release a slot + // - Verifies the third request now acquires the freed slot and executes (execution count becomes 3) + // - Confirms all requests complete successfully + use linera_base::identifiers::BlobType; + + use crate::test_utils::{MemoryStorageBuilder, TestBuilder}; + + // Create a test environment with one validator + let mut builder = TestBuilder::new( + MemoryStorageBuilder::default(), + 1, + 0, + InMemorySigner::new(None), + ) + .await + .unwrap(); + + // Get the validator node + let validator_node = builder.node(0); + let validator_public_key = validator_node.name(); + + // Create a RemoteNode wrapper + let remote_node = RemoteNode { + public_key: validator_public_key, + node: validator_node, + }; + + // Create a RequestsScheduler with max_requests_per_node = 2 + let max_slots = 2; + let mut manager: RequestsScheduler = RequestsScheduler::with_config( + vec![remote_node.clone()], + max_slots, + ScoringWeights::default(), + 0.1, + 1000.0, + Duration::from_secs(60), + 100, + Duration::from_secs(60), + ); + // Replace the tracker with one using a longer timeout for this test + manager.in_flight_tracker = InFlightTracker::new(Duration::from_secs(60)); + let manager = Arc::new(manager); + + // Track execution state + let execution_count = Arc::new(AtomicUsize::new(0)); + let completion_count = Arc::new(AtomicUsize::new(0)); + + // Create channels to control when operations complete + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + // Start first request using with_peer (will block until signaled) + let manager_clone1 = Arc::clone(&manager); + let remote_node_clone1 = remote_node.clone(); + let execution_count_clone1 = execution_count.clone(); + let completion_count_clone1 = completion_count.clone(); + let key1 = RequestKey::Blob(BlobId::new(CryptoHash::test_hash("blob1"), BlobType::Data)); + + let first_request = tokio::spawn(async move { + manager_clone1 + .with_peer(key1, remote_node_clone1, |_peer| async move { + execution_count_clone1.fetch_add(1, Ordering::SeqCst); + // Simulate work by waiting for signal + let _ = rx1.await; + completion_count_clone1.fetch_add(1, Ordering::SeqCst); + Ok(None) // Return Option + }) + .await + }); + + // Give first request time to start and acquire a slot + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!(execution_count.load(Ordering::SeqCst), 1); + + // Start second request using with_peer (will block until signaled) + let manager_clone2 = Arc::clone(&manager); + let remote_node_clone2 = remote_node.clone(); + let execution_count_clone2 = execution_count.clone(); + let completion_count_clone2 = completion_count.clone(); + let key2 = RequestKey::Blob(BlobId::new(CryptoHash::test_hash("blob2"), BlobType::Data)); + + let second_request = tokio::spawn(async move { + manager_clone2 + .with_peer(key2, remote_node_clone2, |_peer| async move { + execution_count_clone2.fetch_add(1, Ordering::SeqCst); + // Simulate work by waiting for signal + let _ = rx2.await; + completion_count_clone2.fetch_add(1, Ordering::SeqCst); + Ok(None) // Return Option + }) + .await + }); + + // Give second request time to start and acquire the second slot + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!(execution_count.load(Ordering::SeqCst), 2); + + // Start third request - this should be blocked waiting for a slot + let remote_node_clone3 = remote_node.clone(); + let execution_count_clone3 = execution_count.clone(); + let completion_count_clone3 = completion_count.clone(); + let key3 = RequestKey::Blob(BlobId::new(CryptoHash::test_hash("blob3"), BlobType::Data)); + + let third_request = tokio::spawn(async move { + manager + .with_peer(key3, remote_node_clone3, |_peer| async move { + execution_count_clone3.fetch_add(1, Ordering::SeqCst); + completion_count_clone3.fetch_add(1, Ordering::SeqCst); + Ok(None) // Return Option + }) + .await + }); + + // Give third request time to try acquiring a slot + tokio::time::sleep(Duration::from_millis(100)).await; + + // Third request should still be waiting (not executed yet) + assert_eq!( + execution_count.load(Ordering::SeqCst), + 2, + "Third request should be waiting for a slot" + ); + + // Complete the first request to release a slot + tx1.send(()).unwrap(); + + // Wait for first request to complete and third request to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Now the third request should have acquired the freed slot and started executing + assert_eq!( + execution_count.load(Ordering::SeqCst), + 3, + "Third request should now be executing" + ); + + // Complete remaining requests + tx2.send(()).unwrap(); + + // Wait for all requests to complete + let _result1 = first_request.await.unwrap(); + let _result2 = second_request.await.unwrap(); + let _result3 = third_request.await.unwrap(); + + // Verify all completed + assert_eq!(completion_count.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn test_alternative_peers_registered_on_deduplication() { + use linera_base::identifiers::BlobType; + + use crate::test_utils::{MemoryStorageBuilder, TestBuilder}; + + // Create a test environment with three validators + let mut builder = TestBuilder::new( + MemoryStorageBuilder::default(), + 3, + 0, + InMemorySigner::new(None), + ) + .await + .unwrap(); + + // Get validator nodes + let nodes: Vec<_> = (0..3) + .map(|i| { + let node = builder.node(i); + let public_key = node.name(); + RemoteNode { public_key, node } + }) + .collect(); + + // Create a RequestsScheduler + let manager: Arc> = + Arc::new(RequestsScheduler::with_config( + nodes.clone(), + 1, + ScoringWeights::default(), + 0.1, + 1000.0, + Duration::from_secs(60), + 100, + Duration::from_millis(MAX_REQUEST_TTL_MS), + )); + + let key = RequestKey::Blob(BlobId::new( + CryptoHash::test_hash("test_blob"), + BlobType::Data, + )); + + // Create a channel to control when first request completes + let (tx, rx) = oneshot::channel(); + + // Start first request with node 0 (will block until signaled) + let manager_clone = Arc::clone(&manager); + let node_clone = nodes[0].clone(); + let key_clone = key.clone(); + let first_request = tokio::spawn(async move { + manager_clone + .with_peer(key_clone, node_clone, |_peer| async move { + // Wait for signal + rx.await.unwrap(); + Ok(None) // Return Option + }) + .await + }); + + // Give first request time to start and become in-flight + tokio::time::sleep(Duration::from_millis(100)).await; + + // Start second and third requests with different nodes + // These should register as alternatives and wait for the first request + let handles: Vec<_> = vec![nodes[1].clone(), nodes[2].clone()] + .into_iter() + .map(|node| { + let manager_clone = Arc::clone(&manager); + let key_clone = key.clone(); + tokio::spawn(async move { + manager_clone + .with_peer(key_clone, node, |_peer| async move { + Ok(None) // Return Option + }) + .await + }) + }) + .collect(); + + // Give time for alternative peers to register + tokio::time::sleep(Duration::from_millis(100)).await; + + // Check that nodes 1 and 2 are registered as alternatives + let alt_peers = manager + .get_alternative_peers(&key) + .await + .expect("in-flight entry") + .into_iter() + .map(|p| p.public_key) + .collect::>(); + assert_eq!( + alt_peers, + vec![nodes[1].public_key, nodes[2].public_key], + "expected nodes 2 and 3 to be registered as alternative peers" + ); + + // Signal first request to complete + tx.send(()).unwrap(); + + // Wait for all requests to complete + let _result1 = first_request.await.unwrap(); + for handle in handles { + let _ = handle.await.unwrap(); + } + + // After completion, the in-flight entry should be removed + tokio::time::sleep(Duration::from_millis(50)).await; + let alt_peers = manager.get_alternative_peers(&key).await; + assert!( + alt_peers.is_none(), + "Expected in-flight entry to be removed after completion" + ); + } +} diff --git a/linera-core/src/client/requests_scheduler/scoring.rs b/linera-core/src/client/requests_scheduler/scoring.rs new file mode 100644 index 000000000000..6cdcae6e4472 --- /dev/null +++ b/linera-core/src/client/requests_scheduler/scoring.rs @@ -0,0 +1,44 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +/// Configurable weights for the scoring algorithm. +/// +/// These weights determine the relative importance of different metrics +/// when calculating a node's performance score. All weights should sum to 1.0. +/// +/// # Examples +/// +/// ```ignore +/// // Prioritize response time and success rate equally +/// let balanced_weights = ScoringWeights { +/// latency: 0.4, +/// success: 0.4, +/// load: 0.2, +/// }; +/// +/// // Prioritize low latency above all else +/// let latency_focused = ScoringWeights { +/// latency: 0.7, +/// success: 0.2, +/// load: 0.1, +/// }; +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct ScoringWeights { + /// Weight for latency metric (lower latency = higher score) + pub latency: f64, + /// Weight for success rate metric (higher success = higher score) + pub success: f64, + /// Weight for load metric (lower load = higher score) + pub load: f64, +} + +impl Default for ScoringWeights { + fn default() -> Self { + Self { + latency: 0.4, // 40% weight on response time + success: 0.4, // 40% weight on success rate + load: 0.2, // 20% weight on current load + } + } +} diff --git a/linera-core/src/node.rs b/linera-core/src/node.rs index 390517764235..bc778e55df84 100644 --- a/linera-core/src/node.rs +++ b/linera-core/src/node.rs @@ -51,6 +51,8 @@ pub trait ValidatorNode { #[cfg(web)] type NotificationStream: Stream + Unpin; + fn address(&self) -> String; + /// Proposes a new block. async fn handle_block_proposal( &self, diff --git a/linera-core/src/remote_node.rs b/linera-core/src/remote_node.rs index 1961685993da..893e26a263fc 100644 --- a/linera-core/src/remote_node.rs +++ b/linera-core/src/remote_node.rs @@ -1,13 +1,10 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{ - collections::{HashSet, VecDeque}, - time::Duration, -}; +use std::collections::{HashSet, VecDeque}; use custom_debug_derive::Debug; -use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt}; +use futures::future::try_join_all; use linera_base::{ crypto::ValidatorPublicKey, data_types::{Blob, BlockHeight}, @@ -21,7 +18,6 @@ use linera_chain::{ TimeoutCertificate, ValidatedBlockCertificate, }, }; -use rand::seq::SliceRandom as _; use tracing::{debug, info, instrument}; use crate::{ @@ -164,21 +160,6 @@ impl RemoteNode { Ok(response.info) } - #[instrument(level = "trace", skip_all)] - pub(crate) async fn query_certificates_from( - &self, - chain_id: ChainId, - start: BlockHeight, - limit: u64, - ) -> Result, NodeError> { - tracing::debug!(name = ?self.public_key, ?chain_id, ?start, ?limit, "Querying certificates"); - let heights = (start.0..start.0 + limit) - .map(BlockHeight) - .collect::>(); - self.download_certificates_by_heights(chain_id, heights) - .await - } - #[instrument(level = "trace")] pub(crate) async fn download_certificate_for_blob( &self, @@ -210,7 +191,7 @@ impl RemoteNode { } #[instrument(level = "trace")] - pub async fn try_download_blob(&self, blob_id: BlobId) -> Option { + pub async fn download_blob(&self, blob_id: BlobId) -> Result, NodeError> { match self.node.download_blob(blob_id).await { Ok(blob) => { let blob = Blob::new(blob); @@ -219,18 +200,20 @@ impl RemoteNode { "Validator {} sent an invalid blob {blob_id}.", self.public_key ); - None + Ok(None) } else { - Some(blob) + Ok(Some(blob)) } } - Err(error) => { + Err(NodeError::BlobsNotFound(_error)) => { tracing::debug!( - "Failed to fetch blob {blob_id} from validator {}: {error}", - self.public_key + ?blob_id, + validator=?self.public_key, + "validator is missing the blob", ); - None + Ok(None) } + Err(error) => Err(error), } } @@ -279,53 +262,6 @@ impl RemoteNode { Ok(certificates) } - /// Downloads a blob, but does not verify if it has actually been published and - /// accepted by a quorum of validators. - #[instrument(level = "trace", skip(validators))] - pub async fn download_blob( - validators: &[Self], - blob_id: BlobId, - timeout: Duration, - ) -> Option { - // Sequentially try each validator in random order. - let mut validators = validators.iter().collect::>(); - validators.shuffle(&mut rand::thread_rng()); - let mut stream = validators - .into_iter() - .zip(0..) - .map(|(remote_node, i)| async move { - linera_base::time::timer::sleep(timeout * i * i).await; - remote_node.try_download_blob(blob_id).await - }) - .collect::>(); - while let Some(maybe_blob) = stream.next().await { - if let Some(blob) = maybe_blob { - return Some(blob); - } - } - None - } - - /// Downloads the blobs with the given IDs. This is done in one concurrent task per block. - /// Each task goes through the validators sequentially in random order and tries to download - /// it. Returns `None` if it couldn't find all blobs. - #[instrument(level = "trace", skip(validators))] - pub async fn download_blobs( - blob_ids: &[BlobId], - validators: &[Self], - timeout: Duration, - ) -> Option> { - let mut stream = blob_ids - .iter() - .map(|blob_id| Self::download_blob(validators, *blob_id, timeout)) - .collect::>(); - let mut blobs = Vec::new(); - while let Some(maybe_blob) = stream.next().await { - blobs.push(maybe_blob?); - } - Some(blobs) - } - /// Checks that requesting these blobs when trying to handle this certificate is legitimate, /// i.e. that there are no duplicates and the blobs are actually required. pub fn check_blobs_not_found( @@ -350,3 +286,11 @@ impl RemoteNode { Ok(()) } } + +impl PartialEq for RemoteNode { + fn eq(&self, other: &Self) -> bool { + self.public_key == other.public_key + } +} + +impl Eq for RemoteNode {} diff --git a/linera-core/src/unit_tests/test_utils.rs b/linera-core/src/unit_tests/test_utils.rs index c1f6bf4b5b35..daeaa5c52a60 100644 --- a/linera-core/src/unit_tests/test_utils.rs +++ b/linera-core/src/unit_tests/test_utils.rs @@ -99,6 +99,10 @@ where { type NotificationStream = NotificationStream; + fn address(&self) -> String { + format!("local:{}", self.public_key) + } + async fn handle_block_proposal( &self, proposal: BlockProposal, @@ -1048,6 +1052,7 @@ where Duration::from_secs(30), Duration::from_secs(1), ChainClientOptions::test_default(), + crate::client::RequestsSchedulerConfig::default(), )); Ok(client.create_chain_client( chain_id, diff --git a/linera-rpc/src/client.rs b/linera-rpc/src/client.rs index 11a69db5ba18..ace917e078d8 100644 --- a/linera-rpc/src/client.rs +++ b/linera-rpc/src/client.rs @@ -44,6 +44,14 @@ impl From for Client { impl ValidatorNode for Client { type NotificationStream = NotificationStream; + fn address(&self) -> String { + match self { + Client::Grpc(grpc_client) => grpc_client.address().to_string(), + #[cfg(with_simple_network)] + Client::Simple(simple_client) => simple_client.address(), + } + } + async fn handle_block_proposal( &self, proposal: BlockProposal, diff --git a/linera-rpc/src/grpc/client.rs b/linera-rpc/src/grpc/client.rs index 6f04be2aef02..280f6971c763 100644 --- a/linera-rpc/src/grpc/client.rs +++ b/linera-rpc/src/grpc/client.rs @@ -25,7 +25,7 @@ use linera_core::{ }; use linera_version::VersionInfo; use tonic::{Code, IntoRequest, Request, Status}; -use tracing::{debug, info, instrument, warn, Level}; +use tracing::{debug, instrument, trace, warn, Level}; use super::{ api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest}, @@ -71,11 +71,11 @@ impl GrpcClient { fn is_retryable(status: &Status) -> bool { match status.code() { Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => { - info!("gRPC request interrupted: {status:?}; retrying"); + trace!("gRPC request interrupted: {status:?}; retrying"); true } Code::Ok | Code::Cancelled | Code::ResourceExhausted => { - info!("Unexpected gRPC status: {status:?}; retrying"); + trace!("Unexpected gRPC status: {status:?}; retrying"); true } Code::NotFound => false, // This code is used if e.g. the validator is missing blobs. @@ -88,7 +88,7 @@ impl GrpcClient { | Code::Internal | Code::DataLoss | Code::Unauthenticated => { - info!("Unexpected gRPC status: {status:?}"); + trace!("Unexpected gRPC status: {status:?}"); false } } @@ -188,6 +188,10 @@ macro_rules! client_delegate { impl ValidatorNode for GrpcClient { type NotificationStream = NotificationStream; + fn address(&self) -> String { + self.address.clone() + } + #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))] async fn handle_block_proposal( &self, diff --git a/linera-rpc/src/simple/client.rs b/linera-rpc/src/simple/client.rs index a0fb417c3067..7de4827db4ad 100644 --- a/linera-rpc/src/simple/client.rs +++ b/linera-rpc/src/simple/client.rs @@ -77,6 +77,13 @@ impl SimpleClient { impl ValidatorNode for SimpleClient { type NotificationStream = NotificationStream; + fn address(&self) -> String { + format!( + "{}://{}:{}", + self.network.protocol, self.network.host, self.network.port + ) + } + /// Initiates a new block. async fn handle_block_proposal( &self, diff --git a/linera-service/src/schema_export.rs b/linera-service/src/schema_export.rs index 5bcf3e964728..c9936cc3510f 100644 --- a/linera-service/src/schema_export.rs +++ b/linera-service/src/schema_export.rs @@ -41,6 +41,10 @@ struct DummyValidatorNode; impl ValidatorNode for DummyValidatorNode { type NotificationStream = NotificationStream; + fn address(&self) -> String { + "dummy".to_string() + } + async fn handle_block_proposal( &self, _: BlockProposal, diff --git a/linera-web/src/lib.rs b/linera-web/src/lib.rs index 7c6bf3f7319f..99007975701f 100644 --- a/linera-web/src/lib.rs +++ b/linera-web/src/lib.rs @@ -93,6 +93,12 @@ pub const OPTIONS: ClientContextOptions = ClientContextOptions { sender_chain_worker_ttl: Duration::from_millis(200), grace_period: linera_core::DEFAULT_GRACE_PERIOD, max_joined_tasks: 100, + max_in_flight_requests: linera_core::client::requests_scheduler::MAX_IN_FLIGHT_REQUESTS, + max_accepted_latency_ms: linera_core::client::requests_scheduler::MAX_ACCEPTED_LATENCY_MS, + cache_ttl_ms: linera_core::client::requests_scheduler::CACHE_TTL_MS, + cache_max_size: linera_core::client::requests_scheduler::CACHE_MAX_SIZE, + max_request_ttl_ms: linera_core::client::requests_scheduler::MAX_REQUEST_TTL_MS, + alpha: linera_core::client::requests_scheduler::ALPHA_SMOOTHING_FACTOR, // TODO(linera-protocol#2944): separate these out from the // `ClientOptions` struct, since they apply only to the CLI/native