-
-
Notifications
You must be signed in to change notification settings - Fork 107
fix: propagate connection errors for blocked peer fallback (#2021) #2105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add handle_aborted_op call when connection establishment fails - Inject empty ReturnGet to trigger existing retry logic for GET ops - Fix contract loading in run_app_blocked_peers test - Test remains #[ignore] until PUT/subscribe bug fixed separately Implements Option A from issue #2021
| .. | ||
| }) = &self.state | ||
| { | ||
| let return_msg = GetMsg::ReturnGet { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the next state after doing? what do we achieve?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This injects an empty ReturnGet while staying in AwaitingResponse, so the existing fallback will pop the next candidate from the stored alternatives/skip list and retry with another peer instead of hanging on the failed connection.
| let return_msg = GetMsg::ReturnGet { | ||
| id: self.id, | ||
| key: *key, | ||
| value: StoreResponse { | ||
| state: None, | ||
| contract: None, | ||
| }, | ||
| sender: current_target.clone(), | ||
| target: op_manager.ring.connection_manager.own_location(), | ||
| skip_list: skip_list.clone(), | ||
| }; | ||
|
|
||
| op_manager | ||
| .notify_op_change(NetMessage::from(return_msg), OpEnum::Get(self)) | ||
| .await?; | ||
| return Err(OpError::StatePushed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@claude what happens after doing this, what is achieved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same intent as above: pushing an empty ReturnGet back into the GET state machine so it reuses the existing retry path (alternatives/skip list) and moves on to another peer instead of waiting on the failed connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes issue #2021 by implementing connection error propagation to enable blocked peer fallback, adds a transient connection management system for gateways, and fixes contract loading in tests.
Key changes:
- Propagates connection failures through
handle_aborted_opto allow GET operations to retry with alternative peers - Adds transient connection tracking with configurable budget and TTL for gateway resource management
- Fixes contract loading in the blocked peers test by using the
load_contracthelper
Reviewed Changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/core/src/operations/get.rs | Adds handle_abort method to retry GET operations on connection failures; extends state to track current target and skip list |
| crates/core/src/node/network_bridge/p2p_protoc.rs | Implements connection error propagation, transient connection handling, and connection reuse logic |
| crates/core/src/node/mod.rs | Extends handle_aborted_op to process GET operation failures and adds transient configuration fields |
| crates/core/src/ring/connection_manager.rs | Adds transient connection tracking with registration, budget management, and TTL expiration |
| crates/core/src/config/mod.rs | Introduces transient_budget and transient_ttl_secs configuration options with defaults |
| apps/freenet-ping/app/tests/run_app_blocked_peers.rs | Fixes contract loading by properly computing code hash before creating container |
| crates/freenet-macros/src/codegen.rs | Updates generated test configs to include new transient fields |
| crates/core/tests/*.rs | Updates test configurations with new transient connection parameters |
| apps/freenet-ping/app/tests/common/mod.rs | Adds transient config fields to test helper functions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .get_peer_key() | ||
| .expect("peer key should be set when connection exists"); | ||
| callback | ||
| .send_result(Ok((resolved_peer_id, None))) |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code incorrectly returns the local node's own peer ID instead of the remote peer's ID. When reusing an existing connection, the callback should receive the peer ID of the connection being reused (peer), not the local node's peer key. This would cause the operation to incorrectly believe it has connected to itself rather than to the target peer.
The correct code should be:
callback
.send_result(Ok((peer.clone(), None)))
.await| .send_result(Ok((resolved_peer_id, None))) | |
| .send_result(Ok((peer.clone(), None))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed: when reusing an existing connection we now return the remote peer ID (peer.clone()) instead of the local peer key so the requester gets the correct peer back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this was weird, was wondering the same, good catch copilot
| if is_transient { | ||
| let connection_manager = &self.bridge.op_manager.ring.connection_manager; | ||
| let current = connection_manager.transient_count(); | ||
| if current >= connection_manager.transient_budget() { | ||
| tracing::warn!( | ||
| remote = %peer_id.addr, | ||
| budget = connection_manager.transient_budget(), | ||
| current, | ||
| "Transient connection budget exhausted; dropping inbound connection" | ||
| ); | ||
| return Ok(()); |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential race condition in the transient connection budget check. The check at line 1611 reads the current count, but the transient connection is only registered later at line 1711 (after the connection is inserted). Between these two points, multiple concurrent connections could pass the budget check before any of them are registered, allowing the budget to be exceeded.
Consider checking and atomically incrementing a counter, or moving the registration earlier in the flow before inserting the connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an early transient reservation that atomically bumps the budget before notifying callbacks or inserting the connection; if the budget is exhausted we drop the inbound connection immediately and release the reservation when we skip insertion. The location gets updated later once we know it.
crates/core/src/operations/get.rs
Outdated
| attempts_at_hop: attempts_at_hop + 1, | ||
| key, | ||
| current_target: next_target, | ||
| skip_list: updated_tried_peers, |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The skip_list at line 865 is incorrectly set to updated_tried_peers, which discards the original skip_list parameter. The skip_list should accumulate peers to avoid across retries, not be replaced. When trying an alternative peer at the same hop, the skip_list should remain unchanged (it represents peers to skip when finding new candidates), while tried_peers tracks what we've tried at this specific hop.
The correct assignment should be:
skip_list: skip_list.clone(),Or if you want to merge both:
skip_list: {
let mut merged = skip_list.clone();
merged.extend(updated_tried_peers.iter().cloned());
merged
},| skip_list: updated_tried_peers, | |
| skip_list: skip_list.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to keep the incoming skip_list intact (tried_peers handles per-hop attempts) and added a brief comment clarifying the separation between skip_list and tried_peers.
| pub fn register_transient(&self, peer: PeerId, location: Option<Location>) { | ||
| self.transient_connections.write().insert( | ||
| peer, | ||
| TransientEntry { | ||
| opened_at: Instant::now(), | ||
| location, | ||
| }, | ||
| ); | ||
| } |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public method register_transient lacks documentation. Consider adding a doc comment explaining:
- What a transient connection is
- When and why this method should be called
- The parameters and their purpose
- Any side effects or invariants
Example:
/// Registers a new transient connection for the given peer.
///
/// Transient connections are temporary connections accepted by gateways
/// that are not yet added to the ring topology. They have a limited TTL
/// and budget to prevent resource exhaustion.
///
/// # Arguments
/// * `peer` - The peer ID of the transient connection
/// * `location` - Optional location of the peer in the ring
pub fn register_transient(&self, peer: PeerId, location: Option<Location>) {There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added doc comments and renamed to try_register_transient to make the budgeted, fail-fast behavior explicit. Also tracks the count atomically so we respect the configured quota even under concurrency.
| }, | ||
| ); | ||
| } | ||
|
|
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public method drop_transient lacks documentation. Consider adding a doc comment explaining:
- What this method does
- When it should be called
- What the return value represents
- Any side effects
Example:
/// Removes a transient connection from tracking.
///
/// This should be called when a transient connection is either promoted
/// to a normal connection or when it expires and needs to be dropped.
///
/// # Arguments
/// * `peer` - The peer ID of the transient connection to drop
///
/// # Returns
/// The `TransientEntry` if it existed, `None` otherwise
pub fn drop_transient(&self, peer: &PeerId) -> Option<TransientEntry> {| /// Removes a transient connection from tracking. | |
| /// | |
| /// This should be called when a transient connection is either promoted | |
| /// to a normal connection or when it expires and needs to be dropped. | |
| /// | |
| /// # Arguments | |
| /// * `peer` - The peer ID of the transient connection to drop | |
| /// | |
| /// # Returns | |
| /// The `TransientEntry` if it existed, `None` otherwise. | |
| /// | |
| /// # Side Effects | |
| /// Removes the transient connection from the internal tracking map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documented drop_transient and made it decrement the transient counter when removing an entry so the budget remains accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can handle transient connections at the type level and that would make all this stuff safer and elss error prone, but that will require some further changes/refactoring so will do that in a separate PR in the future
| pub(crate) async fn handle_abort(self, op_manager: &OpManager) -> Result<(), OpError> { | ||
| if let Some(GetState::AwaitingResponse { | ||
| key, | ||
| current_target, | ||
| skip_list, | ||
| .. | ||
| }) = &self.state | ||
| { | ||
| // We synthesize an empty ReturnGet back to ourselves to reuse the existing | ||
| // fallback path that tries the next candidate. The state stays | ||
| // AwaitingResponse so the retry logic can pick up from the stored | ||
| // alternatives/skip list. | ||
| let return_msg = GetMsg::ReturnGet { | ||
| id: self.id, | ||
| key: *key, | ||
| value: StoreResponse { | ||
| state: None, | ||
| contract: None, | ||
| }, | ||
| sender: current_target.clone(), | ||
| target: op_manager.ring.connection_manager.own_location(), | ||
| skip_list: skip_list.clone(), | ||
| }; | ||
|
|
||
| op_manager | ||
| .notify_op_change(NetMessage::from(return_msg), OpEnum::Get(self)) | ||
| .await?; | ||
| return Err(OpError::StatePushed); | ||
| } | ||
|
|
||
| // If we weren't awaiting a response, just put the op back. | ||
| // No retry needed; another handler may pick it up later. | ||
| op_manager.push(self.id, OpEnum::Get(self)).await?; | ||
| Ok(()) | ||
| } |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public method handle_abort lacks documentation. Consider adding a doc comment explaining:
- What scenarios trigger an abort
- How the retry mechanism works
- What the error return values mean (especially
OpError::StatePushed) - The relationship with
handle_aborted_op
Example:
/// Handles an aborted outbound connection attempt by triggering retry logic.
///
/// When a connection to a peer fails, this method synthesizes an empty
/// `ReturnGet` message to trigger the existing fallback path that tries
/// alternative peers from the stored candidates list.
///
/// # Arguments
/// * `op_manager` - The operation manager for accessing ring state
///
/// # Returns
/// * `Err(OpError::StatePushed)` - Successfully initiated retry (not an error)
/// * `Ok(())` - Operation was not awaiting a response, pushed back to queue
/// * Other errors - Failed to process the abort
pub(crate) async fn handle_abort(self, op_manager: &OpManager) -> Result<(), OpError> {There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a doc comment explaining that this synthesizes an empty ReturnGet to reuse the retry path, when it returns OpError::StatePushed, and why the op is returned to the queue if it was not waiting on a response.
iduartgomez
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GET retry logic: Inject empty ReturnGet to trigger existing fallback mechanism
Gotta say is a bit cheese we need to do this, but is a shortcoming of the current callback mechanisms. This also can be improved in the future when op state machines are refactored once we are stable.
Problem
Issue #2021: Blocked peer mechanism not working - when connection to blocked peer fails, operations don't fall back to alternative peers.
Solution (Option A)
Propagate connection errors to operation handlers so they can retry with different peers:
Status
Main branch can't even run this test (contract loading fails). This PR is an improvement.
Next Steps
Separate stacked PR will fix PUT/subscribe parent/child completion bug.
Closes #2021