-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Introduce ValidatorManager to track all requests and validators' scores #4752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: testnet_conway
Are you sure you want to change the base?
Conversation
f922f82
to
b345f8c
Compare
.with_peer( | ||
RequestKey::Certificates { | ||
chain_id, | ||
start: next_height, | ||
limit, | ||
}, | ||
remote_node.clone(), | ||
async move |peer| { | ||
peer.download_certificates_from(chain_id, next_height, limit) | ||
.await | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We seem to be passing the same info twice: once in the RequestKey
, once in the operation to be executed. Maybe it would be possible to make the RequestKey
determine the operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know, but I couldn't find a way to get rid of this duplication. Caller chooses the operation (the closure) but he code that executes the closure (track_request
) and caching logic (deduplicated_request
) are separate functions. What I could do is merge those two but then testing will be much more difficult.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I was thinking was that maybe the operation could be chosen by matching on the RequestKey
inside the ValidatorManager
(or some method on the RequestKey
, even?), instead of by passing a closure. But maybe that wouldn't work with some use cases? I haven't reviewed most of the code yet, those are loose ideas I'm getting while reading the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Initially ValidatorManager
had the matching API to RemoteNode
– i.e. it had methods like download_certificate
, download_blob
, download_certificate_for_blob
but with time I moved away from this and worked with the closure (although there are still methods for downloading blobs). I could revert to that approach but I thought that it's more limiting.
a016601
to
0fa1c8a
Compare
…previous one is stalled
0fa1c8a
to
e7638a2
Compare
/// # Returns | ||
/// - `Some((chain_id, heights))` for certificate requests, where heights are sorted | ||
/// - `None` for non-certificate requests (Blob, PendingBlob, CertificateForBlob) | ||
fn height_range(&self) -> Option<Vec<BlockHeight>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could return a BTreeSet
instead, to make contains
more efficient in the subsumes
implementation. Or, if they are always ordered anyway, we could avoid calling contains
below at all, and just iterate over both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was meant to change the impl to use the ordering and I forgot 🙈
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I addressed this in a9c836a
let certificates = match result { | ||
RequestResult::Certificates(certs) => certs, | ||
_ => return None, | ||
}; | ||
|
||
if self.chain_id().is_none() || from.chain_id().is_none() { | ||
return None; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of all this, shouldn't we just if !self.subsumes(result) { return None; }
?
Otherwise it isn't guaranteed that the filtered
results below are complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I addressed this in a9c836a
+ (self.weights.success * success_score) | ||
+ (self.weights.load * load_score); | ||
|
||
// Apply confidence factor to penalize nodes with too few samples |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that necessary? If a node has too few samples, this makes us avoid getting more samples, doesn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to not prioritise nodes that haven't received enough requests to trust the score.
let elapsed = Instant::now().duration_since(entry.started_at); | ||
|
||
let outcome = if elapsed > self.timeout { | ||
SubscribeOutcome::TimedOut(elapsed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But there might still be a more recent, subsuming (non-exact) one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in bbbd690
"request completed; broadcasting result to waiters", | ||
); | ||
if waiter_count != 0 { | ||
let _ = entry.sender.send(result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe log something if that fails. (In general, we are trying to avoid let _ =
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in aab9839
}); | ||
|
||
/// Counter for requests that were deduplicated (joined an in-flight request) | ||
pub static REQUEST_CACHE_DEDUPLICATION: LazyLock<IntCounter> = LazyLock::new(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this always the same as REQUEST_CACHE_HIT
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really – REQUESTS_CACHE_HIT
is incremented when the request (key) is found in the response cache and REQUESTS_CACHE_DEDUPLICATION
is incremented when the request is deduplicated with another one in-flight.
}); | ||
|
||
// Give the first request time to register as in-flight | ||
tokio::time::sleep(Duration::from_millis(10)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then why use spawn
at all? In 10 ms the request will probably have completed?
assert_eq!(result1, result2); | ||
|
||
// Operation should only have been executed once (deduplication worked) | ||
assert_eq!(execution_count.load(Ordering::SeqCst), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this test add compared to test_cache_hit_returns_cached_result
(and vice versa)?
|
||
// Operation should only have been executed once (all requests were deduplicated) | ||
assert_eq!(execution_count.load(Ordering::SeqCst), 1); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this just extend the first two tests, i.e. make them obsolete?
tokio::time::sleep(Duration::from_millis(10)).await; | ||
|
||
// Wait for the timeout to elapse | ||
tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're trying to avoid sleep
in tests. I guess it's difficult in this case, but maybe there's a way to use the fake clock?
// - Completes the first request to release a slot | ||
// - Verifies the third request now acquires the freed slot and executes (execution count becomes 3) | ||
// - Confirms all requests complete successfully | ||
use linera_base::identifiers::BlobType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably be at the start of the file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in this single test 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! No blockers so far. 👍
stream::iter(blob_ids.into_iter().map(|blob_id| { | ||
communicate_concurrently( | ||
remote_nodes, | ||
async move |remote_node| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this task gets canceled, does that cause any issues, e.g. with the in-flight tracker? (Maybe fixed if we use a Tokio semaphore anyway.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, hopefully when the task with the permit is dropped the semaphore permit is released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but good point!
// Filter nodes that can accept requests and calculate their scores | ||
let mut scored_nodes = Vec::new(); | ||
for info in nodes.values() { | ||
if info.can_accept_request(self.max_requests_per_node).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we already try to acquire permits here? Otherwise there's no guarantee they can still accept requests below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not always we follow with the request after this call so I didn't want to do it. Maybe this method should be changed – it's not guaranteed that returned peers will be able to accept a request when the time comes (since the slots can be used in the meantime) – and instead we'd calculate score for everyone and the peers with higher current load will be scored lower due to lack of availble slots:
let current_load =
(self.max_in_flight as f64) - (self.in_flight_semaphore.available_permits() as f64);
let load_score =
1.0 - (current_load.min(self.max_in_flight as f64) / self.max_in_flight as f64);
ie the load_score
part would be 0 for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in Remove can_accept_request
59e2df1
to
d4c0f2c
Compare
808b2b5
to
7fc2842
Compare
7fc2842
to
bbbd690
Compare
Motivation
The Linera client needs to interact with multiple validator nodes efficiently. Previously, the
client would make individual requests to validators without:
bandwidth and validator resources
This led to:
Proposal
This PR introduces ValidatorManager, a sophisticated request orchestration layer that provides
intelligent peer selection, request deduplication, caching, and performance-based routing.
Key Features
contain the needed data (e.g., a request for blocks 10-12 can be satisfied by an in-flight request
for blocks 10-20)
attempts
Created a new
validator_manager
module with clear separation of concerns:API
High-level APIs:
Benefits
Metrics
In production usage, this should significantly reduce:
The following metrics have been added to Prometheus (with compiled with
--features metrics
):validator_manager_response_time_ms
- Response time for requests to validators in millisecondsvalidator_manager_request_total
- Total number of requests made to each validatorvalidator_manager_request_success
- Number of successful requests to each validator ((validator_manager_request_total - validator_manager_request_success) / validator_manager_request_total
is an error rate)validator_manager_request_deduplication_total
- Number of requests that were deduplicated by joining an in-flight requestvalidator_manager_request_cache_hit_total
- Number of requests that were served from cacheTest Plan
Existing CI makes sure we maintain backwards compatibility. Some tests have been added to the new modules.
Release Plan
Links