Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
ebc7930
Introduce ValidatorManager to track all requests and validators' scores
deuszx Oct 7, 2025
28d17f0
Deduplicate requets for the same data
deuszx Oct 8, 2025
6d40c58
Cache responses to queries and resolve subsequent ones w/o going over…
deuszx Oct 8, 2025
20dc5ed
Expose remote node's address and use when logging
deuszx Oct 8, 2025
ba7691d
Simplify RequestKey enum
deuszx Oct 13, 2025
2a1976b
Add ValidatorManager::with_peer method
deuszx Oct 13, 2025
37b4a40
Simplify ValidatorManager::deduplicated_request
deuszx Oct 13, 2025
2cee3fb
Rearrange the methods
deuszx Oct 13, 2025
1d1288b
Add ValidatorManager::with_peer method
deuszx Oct 13, 2025
e196913
Evict all expired entries from cache at once
deuszx Oct 13, 2025
9cc3f6b
Fix compilation clippy
deuszx Oct 14, 2025
89d4d9d
Remove unused methods from ValidatorManager
deuszx Oct 14, 2025
8b5590e
Use specific peers when downloading data from.
deuszx Oct 14, 2025
dc71720
Download blobs using communicate_concurrently
deuszx Oct 15, 2025
b7946a6
Rename remote_nodes to validator_manager
deuszx Oct 15, 2025
5c552d1
Clean up the API of ValidatorManager
deuszx Oct 15, 2025
6e4676f
Cache only sucessful results
deuszx Oct 15, 2025
7ec8053
Fix doc examples on ValidatorManager
deuszx Oct 15, 2025
06ca6ce
Make slot acquisition an async await
deuszx Oct 15, 2025
da08874
Deduplicate requests if they are subset of each other
deuszx Oct 15, 2025
79f8d38
Split validator_manager.rs into module
deuszx Oct 15, 2025
84bc85c
Don't deduplicate slow inflight requests
deuszx Oct 15, 2025
4210ace
Move result.is_ok() check to store_in_cache
deuszx Oct 16, 2025
cc74a7f
Add unit tests for request deduplication
deuszx Oct 16, 2025
cdd68cd
Add a test for slot acquisition rate-limiting
deuszx Oct 16, 2025
de72645
Use alternative peer if in-flight is too slow
deuszx Oct 16, 2025
d953d59
Stop logging grpc error status at INFO
deuszx Oct 16, 2025
bf5f0a5
Extract in-flight map to a separate file
deuszx Oct 16, 2025
bab37b7
Add basic metrics to ValidatorManager
deuszx Oct 16, 2025
89401b1
Simplify register_alternative_and_check_timeout function
deuszx Oct 17, 2025
aa3a5cb
Refactor RequestResult type
deuszx Oct 17, 2025
1cb477f
Simplify code for registering new peers and issuing new queries when …
deuszx Oct 17, 2025
4e87f0f
Extract requests' result cache to a file
deuszx Oct 20, 2025
a3ecee2
Parameterize RequestsCache with types for key and result.
deuszx Oct 20, 2025
2a491e6
Use Sempahore to limit per-node requests in flight
deuszx Oct 21, 2025
c80e0e8
Harden subsumption logic
deuszx Oct 21, 2025
dd385ac
Limit visibility of fields in structs.
deuszx Oct 21, 2025
de5ddef
Move all ValidatorManager config consts to ClientContextOptions
deuszx Oct 21, 2025
eecb470
Remove Facebook copyright line from the new file
deuszx Oct 21, 2025
9fdb79a
Fix comment on REQUEST_CACHE_DEDUPLICATION
deuszx Oct 21, 2025
6e09717
Change visiblity of metrics statics
deuszx Oct 21, 2025
f0a3362
Drop default_ prefix from ValidatorManager fields
deuszx Oct 21, 2025
d092a22
Remove can_accept_request
deuszx Oct 21, 2025
2c1d752
Move alpha smoothing factor to client options
deuszx Oct 21, 2025
fc19004
Typos and small corrections
deuszx Oct 21, 2025
148dc02
Log a warning when a supposedly unreachable code is executed
deuszx Oct 21, 2025
aa0d481
Check if there's a subsuming in-flight request. Simplify types
deuszx Oct 21, 2025
e20f8af
Remove sleeps from the ValidatorManager tests
deuszx Oct 22, 2025
b871b6c
Simplify evict_expired to single loop
deuszx Oct 22, 2025
963f8ad
Make SubsumingKey impl even tighter
deuszx Oct 22, 2025
f8efd60
Use consistent approach when downloading blobs in the client
deuszx Oct 22, 2025
2a8d3fe
Introduce Cacheable trait for values in requests cache
deuszx Oct 22, 2025
d07c3c6
Add more tests for SubsumingKey trait and impl
deuszx Oct 22, 2025
5e6c51d
Add tests for cache (fix a bug).
deuszx Oct 22, 2025
ba357d3
Add RemoteNode API to ValidatorManager; move away from closures
deuszx Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ Client implementation and command-line tool for the Linera blockchain
* `--max-joined-tasks <MAX_JOINED_TASKS>` — Maximum number of tasks that can are joined concurrently in the client

Default value: `100`
* `--max-in-flight-requests <MAX_IN_FLIGHT_REQUESTS>` — Maximum concurrent requests per validator node

Default value: `100`
* `--max-accepted-latency-ms <MAX_ACCEPTED_LATENCY_MS>` — Maximum expected latency in milliseconds for score normalization

Default value: `5000`
* `--cache-ttl-sec <CACHE_TTL_SEC>` — Time-to-live for cached responses in seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make that milliseconds, too? We use ms for almost all timing options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that we'd never need anything less than 1 second but I agree with you we should be consistent. Will change.


Default value: `2`
* `--cache-max-size <CACHE_MAX_SIZE>` — Maximum number of entries in the cache

Default value: `1000`
* `--max-request-ttl-ms <MAX_REQUEST_TTL_MS>` — Maximum latency for an in-flight request before we stop deduplicating it (in milliseconds)

Default value: `200`
* `--alpha <ALPHA>` — Smoothing factor for Exponential Moving Averages (0 < alpha < 1) Higher values give more weight to recent observations Typical values are between 0.01 and 0.5 A value of 0.1 means that 10% of the new observation is considered and 90% of the previous average is retained

Default value: `0.1`
* `--storage <STORAGE_CONFIG>` — Storage configuration for the blockchain history
* `--storage-max-concurrent-queries <STORAGE_MAX_CONCURRENT_QUERIES>` — The maximal number of simultaneous queries to the database
* `--storage-max-stream-queries <STORAGE_MAX_STREAM_QUERIES>` — The maximal number of simultaneous stream queries to the database
Expand Down
2 changes: 2 additions & 0 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ where
options.chain_worker_ttl,
options.sender_chain_worker_ttl,
options.to_chain_client_options(),
options.to_validator_manager_config(),
);

#[cfg(not(web))]
Expand Down Expand Up @@ -351,6 +352,7 @@ where
cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
..ChainClientOptions::test_default()
},
linera_core::client::ValidatorManagerConfig::default(),
);

ClientContext {
Expand Down
66 changes: 66 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,58 @@ pub struct ClientContextOptions {
/// Maximum number of tasks that can are joined concurrently in the client.
#[arg(long, default_value = "100")]
pub max_joined_tasks: usize,

/// Maximum concurrent requests per validator node
#[arg(
long,
default_value_t = linera_core::client::validator_manager::MAX_IN_FLIGHT_REQUESTS,
env = "LINERA_VALIDATOR_MANAGER_MAX_IN_FLIGHT_REQUESTS"
)]
pub max_in_flight_requests: usize,

/// Maximum expected latency in milliseconds for score normalization
#[arg(
long,
default_value_t = linera_core::client::validator_manager::MAX_ACCEPTED_LATENCY_MS,
env = "LINERA_VALIDATOR_MANAGER_MAX_ACCEPTED_LATENCY_MS"
)]
pub max_accepted_latency_ms: f64,

/// Time-to-live for cached responses in seconds
#[arg(
long,
default_value_t = linera_core::client::validator_manager::CACHE_TTL_SEC,
env = "LINERA_VALIDATOR_MANAGER_CACHE_TTL_SEC"
)]
pub cache_ttl_sec: u64,

/// Maximum number of entries in the cache
#[arg(
long,
default_value_t = linera_core::client::validator_manager::CACHE_MAX_SIZE,
env = "LINERA_VALIDATOR_MANAGER_CACHE_MAX_SIZE"
)]
pub cache_max_size: usize,

/// Maximum latency for an in-flight request before we stop deduplicating it (in milliseconds)
#[arg(
long,
default_value_t = linera_core::client::validator_manager::MAX_REQUEST_TTL_MS,
env = "LINERA_VALIDATOR_MANAGER_MAX_REQUEST_TTL_MS"
)]
pub max_request_ttl_ms: u64,

/// Smoothing factor for Exponential Moving Averages (0 < alpha < 1)
/// Higher values give more weight to recent observations
/// Typical values are between 0.01 and 0.5
/// A value of 0.1 means that 10% of the new observation is considered
/// and 90% of the previous average is retained
Comment on lines +230 to +234
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Smoothing factor for Exponential Moving Averages (0 < alpha < 1)
/// Higher values give more weight to recent observations
/// Typical values are between 0.01 and 0.5
/// A value of 0.1 means that 10% of the new observation is considered
/// and 90% of the previous average is retained
/// Smoothing factor for Exponential Moving Averages (0 < alpha < 1).
/// Higher values give more weight to recent observations.
/// Typical values are between 0.01 and 0.5.
/// A value of 0.1 means that 10% of the new observation is considered
/// and 90% of the previous average is retained.

#[arg(
long,
default_value_t = linera_core::client::validator_manager::ALPHA_SMOOTHING_FACTOR,
env = "LINERA_VALIDATOR_MANAGER_ALPHA"
)]
pub alpha: f64,
}

impl ClientContextOptions {
Expand Down Expand Up @@ -218,6 +270,20 @@ impl ClientContextOptions {
report_interval_secs: self.timing_interval,
}
}

/// Creates [`ValidatorManagerConfig`] with the corresponding values.
pub(crate) fn to_validator_manager_config(
&self,
) -> linera_core::client::ValidatorManagerConfig {
linera_core::client::ValidatorManagerConfig {
max_in_flight_requests: self.max_in_flight_requests,
max_accepted_latency_ms: self.max_accepted_latency_ms,
cache_ttl_sec: self.cache_ttl_sec,
cache_max_size: self.cache_max_size,
max_request_ttl_ms: self.max_request_ttl_ms,
alpha: self.alpha,
}
}
}

#[derive(Debug, Clone, clap::Args)]
Expand Down
2 changes: 2 additions & 0 deletions linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ async fn test_chain_listener() -> anyhow::Result<()> {
Duration::from_secs(30),
Duration::from_secs(1),
ChainClientOptions::test_default(),
linera_core::client::ValidatorManagerConfig::default(),
)),
};
context
Expand Down Expand Up @@ -204,6 +205,7 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> {
Duration::from_secs(30),
Duration::from_secs(1),
ChainClientOptions::test_default(),
linera_core::client::ValidatorManagerConfig::default(),
)),
};
let context = Arc::new(Mutex::new(context));
Expand Down
99 changes: 57 additions & 42 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ mod chain_client_state;
#[cfg(test)]
#[path = "../unit_tests/client_tests.rs"]
mod client_tests;
pub mod validator_manager;

pub use validator_manager::{ScoringWeights, ValidatorManager, ValidatorManagerConfig};
mod received_log;
mod validator_trackers;

Expand Down Expand Up @@ -149,6 +152,8 @@ pub struct Client<Env: Environment> {
/// Local node to manage the execution state and the local storage of the chains that we are
/// tracking.
local_node: LocalNodeClient<Env::Storage>,
/// Manages the requests sent to validator nodes.
validator_manager: ValidatorManager<Env>,
/// The admin chain ID.
admin_id: ChainId,
/// Chains that should be tracked by the client.
Expand All @@ -175,6 +180,7 @@ impl<Env: Environment> Client<Env> {
chain_worker_ttl: Duration,
sender_chain_worker_ttl: Duration,
options: ChainClientOptions,
validator_manager_config: validator_manager::ValidatorManagerConfig,
) -> Self {
let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
let state = WorkerState::new_for_client(
Expand All @@ -188,10 +194,12 @@ impl<Env: Environment> Client<Env> {
.with_chain_worker_ttl(chain_worker_ttl)
.with_sender_chain_worker_ttl(sender_chain_worker_ttl);
let local_node = LocalNodeClient::new(state);
let validator_manager = ValidatorManager::new(vec![], validator_manager_config);

Self {
environment,
local_node,
validator_manager,
chains: papaya::HashMap::new(),
admin_id,
tracked_chains,
Expand Down Expand Up @@ -347,8 +355,10 @@ impl<Env: Environment> Client<Env> {
.checked_sub(u64::from(next_height))
.ok_or(ArithmeticError::Overflow)?
.min(self.options.certificate_download_batch_size);
let certificates = remote_node
.query_certificates_from(chain_id, next_height, limit)

let certificates = self
.validator_manager
.download_certificates(remote_node, chain_id, next_height, limit)
.await?;
let Some(info) = self.process_certificates(remote_node, certificates).await? else {
break;
Expand All @@ -362,17 +372,20 @@ impl<Env: Environment> Client<Env> {

async fn download_blobs(
&self,
remote_node: &RemoteNode<impl ValidatorNode>,
blob_ids: impl IntoIterator<Item = BlobId>,
remote_nodes: &[RemoteNode<Env::ValidatorNode>],
blob_ids: &[BlobId],
) -> Result<(), ChainClientError> {
self.local_node
.store_blobs(
&futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move {
remote_node.try_download_blob(blob_id).await.unwrap()
}))
.buffer_unordered(self.options.max_joined_tasks)
.collect::<Vec<_>>()
.await,
&self
.validator_manager
.download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
.await?
.ok_or_else(|| {
ChainClientError::RemoteNodeError(NodeError::BlobsNotFound(
blob_ids.to_vec(),
))
})?,
Comment on lines +380 to +388
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put that in a local variable before we call store_blobs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, but that was already here – I just moved it around.

)
.await
.map_err(Into::into)
Expand All @@ -383,7 +396,7 @@ impl<Env: Environment> Client<Env> {
#[instrument(level = "trace", skip_all)]
async fn process_certificates(
&self,
remote_node: &RemoteNode<impl ValidatorNode>,
remote_node: &RemoteNode<Env::ValidatorNode>,
certificates: Vec<ConfirmedBlockCertificate>,
) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
let mut info = None;
Expand All @@ -398,7 +411,8 @@ impl<Env: Environment> Client<Env> {
.await
{
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
self.download_blobs(remote_node, blob_ids).await?;
self.download_blobs(&[remote_node.clone()], &blob_ids)
.await?;
}
x => {
x?;
Expand All @@ -409,7 +423,8 @@ impl<Env: Environment> Client<Env> {
info = Some(
match self.handle_certificate(certificate.clone()).await {
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
self.download_blobs(remote_node, blob_ids).await?;
self.download_blobs(&[remote_node.clone()], &blob_ids)
.await?;
self.handle_certificate(certificate).await?
}
x => x?,
Expand Down Expand Up @@ -663,7 +678,6 @@ impl<Env: Environment> Client<Env> {
) -> Result<(), ChainClientError> {
let certificate = Box::new(certificate);
let block = certificate.block();

// Recover history from the network.
self.download_certificates(block.header.chain_id, block.header.height)
.await?;
Expand All @@ -672,14 +686,9 @@ impl<Env: Environment> Client<Env> {
if let Err(err) = self.process_certificate(certificate.clone()).await {
match &err {
LocalNodeError::BlobsNotFound(blob_ids) => {
let blobs = RemoteNode::download_blobs(
blob_ids,
&self.validator_nodes().await?,
self.options.blob_download_timeout,
)
.await
.ok_or(err)?;
self.local_node.store_blobs(&blobs).await?;
self.download_blobs(&self.validator_nodes().await?, blob_ids)
.await
.map_err(|_| err)?;
self.process_certificate(certificate).await?;
}
_ => {
Expand Down Expand Up @@ -716,14 +725,7 @@ impl<Env: Environment> Client<Env> {
if let Err(err) = self.handle_certificate(certificate.clone()).await {
match &err {
LocalNodeError::BlobsNotFound(blob_ids) => {
let blobs = RemoteNode::download_blobs(
blob_ids,
&nodes,
self.options.blob_download_timeout,
)
.await
.ok_or(err)?;
self.local_node.store_blobs(&blobs).await?;
self.download_blobs(&nodes, blob_ids).await?;
self.handle_certificate(certificate.clone()).await?;
}
_ => {
Expand Down Expand Up @@ -777,8 +779,13 @@ impl<Env: Environment> Client<Env> {
// anything from the validator - let the function try the other validators
return Err(());
}
let certificates = remote_node
.download_certificates_by_heights(sender_chain_id, remote_heights)
let certificates = self
.validator_manager
.download_certificates_by_heights(
&remote_node,
sender_chain_id,
remote_heights,
)
.await
.map_err(|_| ())?;
let mut certificates_with_check_results = vec![];
Expand Down Expand Up @@ -934,8 +941,13 @@ impl<Env: Environment> Client<Env> {
// Stop if we've reached the height we've already processed.
while current_height >= next_outbox_height {
// Download the certificate for this height.
let downloaded = remote_node
.download_certificates_by_heights(sender_chain_id, vec![current_height])
let downloaded = self
.validator_manager
.download_certificates_by_heights(
remote_node,
sender_chain_id,
vec![current_height],
)
.await?;
let Some(certificate) = downloaded.into_iter().next() else {
return Err(ChainClientError::CannotDownloadMissingSenderBlock {
Expand Down Expand Up @@ -1119,9 +1131,9 @@ impl<Env: Environment> Client<Env> {
if !required_blob_ids.is_empty() {
let mut blobs = Vec::new();
for blob_id in required_blob_ids {
let blob_content = match remote_node
.node
.download_pending_blob(chain_id, blob_id)
let blob_content = match self
.validator_manager
.download_pending_blob(remote_node, chain_id, blob_id)
.await
{
Ok(content) => content,
Expand Down Expand Up @@ -1217,9 +1229,9 @@ impl<Env: Environment> Client<Env> {
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
let mut blobs = Vec::new();
for blob_id in blob_ids {
let blob_content = remote_node
.node
.download_pending_blob(chain_id, blob_id)
let blob_content = self
.validator_manager
.download_pending_blob(remote_node, chain_id, blob_id)
.await?;
blobs.push(Blob::new(blob_content));
}
Expand Down Expand Up @@ -1248,7 +1260,10 @@ impl<Env: Environment> Client<Env> {
communicate_concurrently(
remote_nodes,
async move |remote_node| {
let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
let certificate = self
.validator_manager
.download_certificate_for_blob(&remote_node, blob_id)
.await?;
self.receive_sender_certificate(
certificate,
ReceiveCertificateMode::NeedsCheck,
Expand Down Expand Up @@ -4123,7 +4138,7 @@ impl<Env: Environment> ChainClient<Env> {
}

/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on
/// each subsequent node. Returns error `err` is all of the nodes fail.
/// each subsequent node. Returns error `err` if all of the nodes fail.
async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
nodes: &[RemoteNode<A>],
f: F,
Expand Down
Loading
Loading