diff --git a/apps/freenet-ping/Cargo.lock b/apps/freenet-ping/Cargo.lock index 32a378d2e..e377b7129 100644 --- a/apps/freenet-ping/Cargo.lock +++ b/apps/freenet-ping/Cargo.lock @@ -1281,7 +1281,7 @@ dependencies = [ [[package]] name = "freenet" -version = "0.1.36" +version = "0.1.37" dependencies = [ "aes-gcm", "ahash", diff --git a/apps/freenet-ping/app/tests/common/mod.rs b/apps/freenet-ping/app/tests/common/mod.rs index a2206671c..665962391 100644 --- a/apps/freenet-ping/app/tests/common/mod.rs +++ b/apps/freenet-ping/app/tests/common/mod.rs @@ -127,6 +127,8 @@ pub async fn base_node_test_config_with_rng( network_port: public_port, // if None, node will pick a free one or use default bandwidth_limit: None, blocked_addresses, + transient_budget: None, + transient_ttl_secs: None, }, config_paths: freenet::config::ConfigPathsArgs { config_dir: Some(temp_dir.path().to_path_buf()), diff --git a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs index f8c1d5452..653465526 100644 --- a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs +++ b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs @@ -244,15 +244,21 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { let (stream_node2, _) = connect_async(&uri_node2).await?; let mut client_node2 = WebApi::start(stream_node2); - // Load contract code + // Compile/load contract code (same helper used by other app tests) let path_to_code = std::path::PathBuf::from(PACKAGE_DIR).join(PATH_TO_CONTRACT); - tracing::info!(path=%path_to_code.display(), "Loading contract code"); - let code = std::fs::read(path_to_code) - .ok() - .ok_or_else(|| anyhow!("Failed to read contract code"))?; - let code_hash = CodeHash::from_code(&code); + tracing::info!(path = %path_to_code.display(), "Loading contract code"); + + // First compile to compute the code hash, then rebuild options with the correct code_key + let temp_options = PingContractOptions { + frequency: Duration::from_secs(3), + ttl: Duration::from_secs(30), + tag: APP_TAG.to_string(), + code_key: String::new(), + }; + let temp_params = Parameters::from(serde_json::to_vec(&temp_options).unwrap()); + let temp_container = common::load_contract(&path_to_code, temp_params)?; + let code_hash = CodeHash::from_code(temp_container.data()); - // Define contract options let ping_options = PingContractOptions { frequency: Duration::from_secs(3), ttl: Duration::from_secs(30), @@ -260,7 +266,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { code_key: code_hash.to_string(), }; let params = Parameters::from(serde_json::to_vec(&ping_options).unwrap()); - let container = ContractContainer::try_from((code, ¶ms))?; + let container = common::load_contract(&path_to_code, params)?; let contract_key = container.key(); // Gateway puts the contract diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index 18f943d8b..36072d01f 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -45,6 +45,9 @@ pub(crate) const PCK_VERSION: &str = env!("CARGO_PKG_VERSION"); // Initialize the executor once. static ASYNC_RT: LazyLock> = LazyLock::new(GlobalExecutor::initialize_async_rt); +const DEFAULT_TRANSIENT_BUDGET: usize = 32; +const DEFAULT_TRANSIENT_TTL_SECS: u64 = 30; + const QUALIFIER: &str = ""; const ORGANIZATION: &str = "The Freenet Project Inc"; const APPLICATION: &str = "Freenet"; @@ -97,6 +100,8 @@ impl Default for ConfigArgs { location: None, bandwidth_limit: Some(3_000_000), // 3 MB/s default for streaming transfers only blocked_addresses: None, + transient_budget: Some(DEFAULT_TRANSIENT_BUDGET), + transient_ttl_secs: Some(DEFAULT_TRANSIENT_TTL_SECS), }, ws_api: WebsocketApiArgs { address: Some(default_listening_address()), @@ -361,6 +366,14 @@ impl ConfigArgs { .network_api .blocked_addresses .map(|addrs| addrs.into_iter().collect()), + transient_budget: self + .network_api + .transient_budget + .unwrap_or(DEFAULT_TRANSIENT_BUDGET), + transient_ttl_secs: self + .network_api + .transient_ttl_secs + .unwrap_or(DEFAULT_TRANSIENT_TTL_SECS), }, ws_api: WebsocketApiConfig { // the websocket API is always local @@ -542,6 +555,16 @@ pub struct NetworkArgs { /// List of IP:port addresses to refuse connections to/from. #[arg(long, num_args = 0..)] pub blocked_addresses: Option>, + + /// Maximum number of concurrent transient connections accepted by a gateway. + #[arg(long, env = "TRANSIENT_BUDGET")] + #[serde(rename = "transient-budget", skip_serializing_if = "Option::is_none")] + pub transient_budget: Option, + + /// Time (in seconds) before an unpromoted transient connection is dropped. + #[arg(long, env = "TRANSIENT_TTL_SECS")] + #[serde(rename = "transient-ttl-secs", skip_serializing_if = "Option::is_none")] + pub transient_ttl_secs: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -608,6 +631,14 @@ pub struct NetworkApiConfig { /// List of IP:port addresses to refuse connections to/from. #[serde(skip_serializing_if = "Option::is_none")] pub blocked_addresses: Option>, + + /// Maximum number of concurrent transient connections accepted by a gateway. + #[serde(default = "default_transient_budget", rename = "transient-budget")] + pub transient_budget: usize, + + /// Time (in seconds) before an unpromoted transient connection is dropped. + #[serde(default = "default_transient_ttl_secs", rename = "transient-ttl-secs")] + pub transient_ttl_secs: u64, } mod port_allocation; @@ -617,6 +648,14 @@ pub fn default_network_api_port() -> u16 { find_available_port().unwrap_or(31337) // Fallback to 31337 if we can't find a random port } +fn default_transient_budget() -> usize { + DEFAULT_TRANSIENT_BUDGET +} + +fn default_transient_ttl_secs() -> u64 { + DEFAULT_TRANSIENT_TTL_SECS +} + #[derive(clap::Parser, Debug, Default, Copy, Clone, Serialize, Deserialize)] pub struct WebsocketApiArgs { /// Address to bind to for the websocket API, default is 0.0.0.0 diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index 3e85de0c6..e5446027b 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -128,6 +128,8 @@ pub struct NodeConfig { pub(crate) max_upstream_bandwidth: Option, pub(crate) max_downstream_bandwidth: Option, pub(crate) blocked_addresses: Option>, + pub(crate) transient_budget: usize, + pub(crate) transient_ttl: Duration, } impl NodeConfig { @@ -195,6 +197,8 @@ impl NodeConfig { max_upstream_bandwidth: None, max_downstream_bandwidth: None, blocked_addresses: config.network_api.blocked_addresses.clone(), + transient_budget: config.network_api.transient_budget, + transient_ttl: Duration::from_secs(config.network_api.transient_ttl_secs), }) } @@ -1147,27 +1151,42 @@ async fn handle_aborted_op( gateways: &[PeerKeyLocation], ) -> Result<(), OpError> { use crate::util::IterExt; - if let TransactionType::Connect = tx.transaction_type() { - // attempt to establish a connection failed, this could be a fatal error since the node - // is useless without connecting to the network, we will retry with exponential backoff - // if necessary - match op_manager.pop(&tx) { - Ok(Some(OpEnum::Connect(op))) - if op.has_backoff() - && op_manager.ring.open_connections() - < op_manager.ring.connection_manager.min_connections => - { - let gateway = op.gateway().cloned(); - if let Some(gateway) = gateway { - tracing::warn!("Retry connecting to gateway {}", gateway.peer); - connect::join_ring_request(None, &gateway, op_manager).await?; + match tx.transaction_type() { + TransactionType::Connect => { + // attempt to establish a connection failed, this could be a fatal error since the node + // is useless without connecting to the network, we will retry with exponential backoff + // if necessary + match op_manager.pop(&tx) { + Ok(Some(OpEnum::Connect(op))) + if op.has_backoff() + && op_manager.ring.open_connections() + < op_manager.ring.connection_manager.min_connections => + { + let gateway = op.gateway().cloned(); + if let Some(gateway) = gateway { + tracing::warn!("Retry connecting to gateway {}", gateway.peer); + connect::join_ring_request(None, &gateway, op_manager).await?; + } + } + Ok(Some(OpEnum::Connect(_))) => { + if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() { + tracing::warn!("Retrying joining the ring with an other gateway"); + if let Some(gateway) = gateways.iter().shuffle().next() { + connect::join_ring_request(None, gateway, op_manager).await? + } + } + } + Ok(Some(other)) => { + op_manager.push(tx, other).await?; } + _ => {} } - Ok(Some(OpEnum::Connect(_))) => { - if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() { - tracing::warn!("Retrying joining the ring with an other gateway"); - if let Some(gateway) = gateways.iter().shuffle().next() { - connect::join_ring_request(None, gateway, op_manager).await? + } + TransactionType::Get => match op_manager.pop(&tx) { + Ok(Some(OpEnum::Get(op))) => { + if let Err(err) = op.handle_abort(op_manager).await { + if !matches!(err, OpError::StatePushed) { + return Err(err); } } } @@ -1175,7 +1194,8 @@ async fn handle_aborted_op( op_manager.push(tx, other).await?; } _ => {} - } + }, + _ => {} } Ok(()) } diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index a8b10ccce..1284e2978 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -441,6 +441,8 @@ impl P2pConnManager { .ring .connection_manager .get_peer_key(); + let op_manager = ctx.bridge.op_manager.clone(); + let gateways = ctx.gateways.clone(); // Initiate connection to the peer ctx.bridge @@ -491,6 +493,17 @@ impl P2pConnManager { "connect_peer: connection attempt returned error: {:?}", e ); + if let Err(err) = + handle_aborted_op(tx, &op_manager, &gateways) + .await + { + tracing::warn!( + tx = %tx, + target = %target_peer_id, + ?err, + "connect_peer: failed to propagate aborted operation" + ); + } } Ok(None) => { tracing::error!( @@ -498,6 +511,17 @@ impl P2pConnManager { target = %target_peer_id, "connect_peer: response channel closed before connection result" ); + if let Err(err) = + handle_aborted_op(tx, &op_manager, &gateways) + .await + { + tracing::warn!( + tx = %tx, + target = %target_peer_id, + ?err, + "connect_peer: failed to propagate aborted operation" + ); + } } Err(_) => { tracing::error!( @@ -505,6 +529,17 @@ impl P2pConnManager { target = %target_peer_id, "connect_peer: timeout waiting for connection result" ); + if let Err(err) = + handle_aborted_op(tx, &op_manager, &gateways) + .await + { + tracing::warn!( + tx = %tx, + target = %target_peer_id, + ?err, + "connect_peer: failed to propagate aborted operation" + ); + } } } }); @@ -1233,6 +1268,43 @@ impl P2pConnManager { ); } + // If we already have a transport channel, reuse it instead of dialing again. This covers + // transient->normal promotion without tripping duplicate connection errors. + if self.connections.contains_key(&peer) { + tracing::info!( + tx = %tx, + remote = %peer, + courtesy, + "connect_peer: reusing existing transport" + ); + let connection_manager = &self.bridge.op_manager.ring.connection_manager; + if let Some(entry) = connection_manager.drop_transient(&peer) { + let loc = entry + .location + .unwrap_or_else(|| Location::from_address(&peer.addr)); + self.bridge + .op_manager + .ring + .add_connection(loc, peer.clone(), false) + .await; + tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); + } + + callback + .send_result(Ok((peer.clone(), None))) + .await + .inspect_err(|err| { + tracing::debug!( + tx = %tx, + remote = %peer, + ?err, + "connect_peer: failed to notify existing-connection callback" + ); + }) + .ok(); + return Ok(()); + } + match state.awaiting_connection.entry(peer_addr) { std::collections::hash_map::Entry::Occupied(mut callbacks) => { let txs_entry = state.awaiting_connection_txs.entry(peer_addr).or_default(); @@ -1353,6 +1425,7 @@ impl P2pConnManager { connection, courtesy, } => { + let conn_manager = &self.bridge.op_manager.ring.connection_manager; let remote_addr = connection.remote_addr(); if let Some(blocked_addrs) = &self.blocked_addresses { @@ -1367,6 +1440,7 @@ impl P2pConnManager { } } + let provided_peer = peer.clone(); let peer_id = peer.unwrap_or_else(|| { tracing::info!( remote = %remote_addr, @@ -1393,7 +1467,10 @@ impl P2pConnManager { "Inbound connection established" ); - self.handle_successful_connection(peer_id, connection, state, None) + let is_transient = + conn_manager.is_gateway() && provided_peer.is_none() && transaction.is_none(); + + self.handle_successful_connection(peer_id, connection, state, None, is_transient) .await?; } HandshakeEvent::OutboundEstablished { @@ -1408,7 +1485,7 @@ impl P2pConnManager { transaction = %transaction, "Outbound connection established" ); - self.handle_successful_connection(peer, connection, state, None) + self.handle_successful_connection(peer, connection, state, None, false) .await?; } HandshakeEvent::OutboundFailed { @@ -1523,13 +1600,24 @@ impl P2pConnManager { connection: PeerConnection, state: &mut EventListenerState, remaining_checks: Option, + is_transient: bool, ) -> anyhow::Result<()> { + let connection_manager = &self.bridge.op_manager.ring.connection_manager; + if is_transient && !connection_manager.try_register_transient(peer_id.clone(), None) { + tracing::warn!( + remote = %peer_id.addr, + budget = connection_manager.transient_budget(), + current = connection_manager.transient_count(), + "Transient connection budget exhausted; dropping inbound connection" + ); + return Ok(()); + } + let pending_txs = state .awaiting_connection_txs .remove(&peer_id.addr) .unwrap_or_default(); if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) { - let connection_manager = &self.bridge.op_manager.ring.connection_manager; let resolved_peer_id = if let Some(peer_id) = connection_manager.get_peer_key() { peer_id } else { @@ -1603,18 +1691,41 @@ impl P2pConnManager { } if newly_inserted { - let pending_loc = self - .bridge - .op_manager - .ring - .connection_manager - .prune_in_transit_connection(&peer_id); - let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); - self.bridge - .op_manager - .ring - .add_connection(loc, peer_id.clone(), false) - .await; + let pending_loc = connection_manager.prune_in_transit_connection(&peer_id); + if !is_transient { + let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); + self.bridge + .op_manager + .ring + .add_connection(loc, peer_id.clone(), false) + .await; + } else { + // Update location now that we know it; budget was reserved before any work. + connection_manager.try_register_transient(peer_id.clone(), pending_loc); + tracing::info!( + peer = %peer_id, + "Registered transient connection (not added to ring topology)" + ); + let ttl = connection_manager.transient_ttl(); + let drop_tx = self.bridge.ev_listener_tx.clone(); + let cm = connection_manager.clone(); + let peer = peer_id.clone(); + tokio::spawn(async move { + tokio::time::sleep(ttl).await; + if cm.drop_transient(&peer).is_some() { + tracing::info!(%peer, "Transient connection expired; dropping"); + if let Err(err) = drop_tx + .send(Right(NodeEvent::DropConnection(peer.clone()))) + .await + { + tracing::warn!(%peer, ?err, "Failed to dispatch DropConnection for expired transient"); + } + } + }); + } + } else if is_transient { + // We reserved budget earlier, but didn't take ownership of the connection. + connection_manager.drop_transient(&peer_id); } Ok(()) } diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 1963e87b3..8e00f2de1 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -173,14 +173,17 @@ pub(crate) async fn request_get( tried_peers.insert(target.peer.clone()); let new_state = Some(GetState::AwaitingResponse { + key: key_val, retries: 0, fetch_contract, requester: None, current_hop: op_manager.ring.max_hops_to_live, subscribe, + current_target: target.clone(), tried_peers, alternatives: candidates, attempts_at_hop: 1, + skip_list: skip_list.clone(), }); let msg = GetMsg::RequestGet { @@ -212,6 +215,7 @@ pub(crate) async fn request_get( } #[derive(Debug)] +#[allow(clippy::large_enum_variant)] enum GetState { /// A new petition for a get op received from another peer. /// The requester field stores who sent us this request, so we can send the result back. @@ -225,18 +229,24 @@ enum GetState { }, /// Awaiting response from petition. AwaitingResponse { + /// Contract being fetched + key: ContractKey, /// If specified the peer waiting for the response upstream requester: Option, fetch_contract: bool, retries: usize, current_hop: usize, subscribe: bool, + /// Peer we are currently trying to reach + current_target: PeerKeyLocation, /// Peers we've already tried at this hop level tried_peers: HashSet, /// Alternative peers we could still try at this hop alternatives: Vec, /// How many peers we've tried at this hop attempts_at_hop: usize, + /// Skip list used for the current hop + skip_list: HashSet, }, /// Operation completed successfully Finished { key: ContractKey }, @@ -263,9 +273,7 @@ impl Display for GetState { retries, current_hop, subscribe, - tried_peers: _, - alternatives: _, - attempts_at_hop: _, + .. } => { write!(f, "AwaitingResponse(requester: {requester:?}, fetch_contract: {fetch_contract}, retries: {retries}, current_hop: {current_hop}, subscribe: {subscribe})") } @@ -341,6 +349,43 @@ impl GetOp { } } + /// Handle aborted outbound connections by reusing the existing retry logic. + pub(crate) async fn handle_abort(self, op_manager: &OpManager) -> Result<(), OpError> { + if let Some(GetState::AwaitingResponse { + key, + current_target, + skip_list, + .. + }) = &self.state + { + // We synthesize an empty ReturnGet back to ourselves to reuse the existing + // fallback path that tries the next candidate. The state stays + // AwaitingResponse so the retry logic can pick up from the stored + // alternatives/skip list. + let return_msg = GetMsg::ReturnGet { + id: self.id, + key: *key, + value: StoreResponse { + state: None, + contract: None, + }, + sender: current_target.clone(), + target: op_manager.ring.connection_manager.own_location(), + skip_list: skip_list.clone(), + }; + + op_manager + .notify_op_change(NetMessage::from(return_msg), OpEnum::Get(self)) + .await?; + return Err(OpError::StatePushed); + } + + // If we weren't awaiting a response, just put the op back. + // No retry needed; another handler may pick it up later. + op_manager.push(self.id, OpEnum::Get(self)).await?; + Ok(()) + } + pub(super) fn finalized(&self) -> bool { self.result.is_some() && matches!(self.state, Some(GetState::Finished { .. })) } @@ -736,6 +781,8 @@ impl Operation for GetOp { target, skip_list, } => { + let id = *id; + let key = *key; tracing::info!( tx = %id, %key, @@ -765,6 +812,9 @@ impl Operation for GetOp { mut tried_peers, mut alternatives, attempts_at_hop, + current_target: _, + skip_list, + .. }) => { // todo: register in the stats for the outcome of the op that failed to get a response from this peer @@ -789,8 +839,8 @@ impl Operation for GetOp { ); return_msg = Some(GetMsg::SeekNode { - id: *id, - key: *key, + id, + key, target: next_target.clone(), sender: this_peer.clone(), fetch_contract, @@ -800,15 +850,22 @@ impl Operation for GetOp { // Update state with the new alternative being tried tried_peers.insert(next_target.peer.clone()); + let updated_tried_peers = tried_peers.clone(); new_state = Some(GetState::AwaitingResponse { retries, fetch_contract, requester: requester.clone(), current_hop, subscribe, - tried_peers, + tried_peers: updated_tried_peers.clone(), alternatives, attempts_at_hop: attempts_at_hop + 1, + key, + current_target: next_target, + // Preserve the accumulated skip_list so future candidate + // selection still avoids already-specified peers; tried_peers + // tracks attempts at this hop. + skip_list: skip_list.clone(), }); } else if retries < MAX_RETRIES { // No more alternatives at this hop, try finding new peers @@ -818,27 +875,27 @@ impl Operation for GetOp { // Get new candidates excluding all tried peers let mut new_candidates = op_manager.ring.k_closest_potentially_caching( - key, + &key, &new_skip_list, DEFAULT_MAX_BREADTH, ); tracing::info!( - tx = %id, - %key, - new_candidates = ?new_candidates, - skip = ?new_skip_list, - hop = current_hop, - retries = retries + 1, - "GET seeking new candidates after exhausted alternatives" + tx = %id, + %key, + new_candidates = ?new_candidates, + skip = ?new_skip_list, + hop = current_hop, + retries = retries + 1, + "GET seeking new candidates after exhausted alternatives" ); if !new_candidates.is_empty() { // Try with the best new peer let target = new_candidates.remove(0); return_msg = Some(GetMsg::SeekNode { - id: *id, - key: *key, + id, + key, target: target.clone(), sender: this_peer.clone(), fetch_contract, @@ -859,6 +916,9 @@ impl Operation for GetOp { tried_peers: new_tried_peers, alternatives: new_candidates, attempts_at_hop: 1, + key, + current_target: target, + skip_list: new_skip_list.clone(), }); } else if let Some(requester_peer) = requester.clone() { // No more peers to try, return failure to requester @@ -872,8 +932,8 @@ impl Operation for GetOp { "No other peers found while trying to get the contract, returning response to requester" ); return_msg = Some(GetMsg::ReturnGet { - id: *id, - key: *key, + id, + key, value: StoreResponse { state: None, contract: None, @@ -895,7 +955,7 @@ impl Operation for GetOp { return_msg = None; new_state = None; result = Some(GetResult { - key: *key, + key, state: WrappedState::new(vec![]), contract: None, }); @@ -920,8 +980,8 @@ impl Operation for GetOp { "No other peers found while trying to get the contract, returning response to requester" ); return_msg = Some(GetMsg::ReturnGet { - id: *id, - key: *key, + id, + key, value: StoreResponse { state: None, contract: None, @@ -941,7 +1001,7 @@ impl Operation for GetOp { return_msg = None; new_state = None; result = Some(GetResult { - key: *key, + key, state: WrappedState::new(vec![]), contract: None, }); @@ -953,8 +1013,8 @@ impl Operation for GetOp { tracing::debug!(tx = %id, "Returning contract {} to {}", key, sender.peer); new_state = None; return_msg = Some(GetMsg::ReturnGet { - id: *id, - key: *key, + id, + key, value: StoreResponse { state: None, contract: None, @@ -1314,14 +1374,17 @@ async fn try_forward_or_return( build_op_result( id, Some(GetState::AwaitingResponse { + key, requester: Some(sender), retries: 0, fetch_contract, current_hop: new_htl, subscribe: false, + current_target: target.clone(), tried_peers, alternatives, attempts_at_hop: 1, + skip_list: new_skip_list.clone(), }), Some(GetMsg::SeekNode { id, diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 9af80bc84..adce1165d 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -198,6 +198,7 @@ where Ok(None) } +#[allow(clippy::large_enum_variant)] pub(crate) enum OpEnum { Connect(Box), Put(put::PutOp), diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 4f1d7023c..5e3f19240 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -1,10 +1,20 @@ +use dashmap::DashMap; use parking_lot::Mutex; use rand::prelude::IndexedRandom; use std::collections::{btree_map::Entry, BTreeMap}; +use std::sync::atomic::{AtomicUsize, Ordering}; use crate::topology::{Limits, TopologyManager}; use super::*; +use std::time::{Duration, Instant}; + +#[derive(Clone)] +pub(crate) struct TransientEntry { + #[allow(dead_code)] + pub opened_at: Instant, + pub location: Option, +} #[derive(Clone)] pub(crate) struct ConnectionManager { @@ -18,6 +28,10 @@ pub(crate) struct ConnectionManager { own_location: Arc, peer_key: Arc>>, is_gateway: bool, + transient_connections: Arc>, + transient_in_use: Arc, + transient_budget: usize, + transient_ttl: Duration, pub min_connections: usize, pub max_connections: usize, pub rnd_if_htl_above: usize, @@ -79,9 +93,12 @@ impl ConnectionManager { own_location, ), config.is_gateway, + config.transient_budget, + config.transient_ttl, ) } + #[allow(clippy::too_many_arguments)] fn init( max_upstream_bandwidth: Rate, max_downstream_bandwidth: Rate, @@ -90,6 +107,8 @@ impl ConnectionManager { rnd_if_htl_above: usize, (pub_key, peer_id, own_location): (TransportPublicKey, Option, AtomicU64), is_gateway: bool, + transient_budget: usize, + transient_ttl: Duration, ) -> Self { let topology_manager = Arc::new(RwLock::new(TopologyManager::new(Limits { max_upstream_bandwidth, @@ -107,6 +126,10 @@ impl ConnectionManager { own_location: own_location.into(), peer_key: Arc::new(Mutex::new(peer_id)), is_gateway, + transient_connections: Arc::new(DashMap::new()), + transient_in_use: Arc::new(AtomicUsize::new(0)), + transient_budget, + transient_ttl, min_connections, max_connections, rnd_if_htl_above, @@ -331,6 +354,74 @@ impl ConnectionManager { self.peer_key.lock().clone() } + pub fn is_gateway(&self) -> bool { + self.is_gateway + } + + /// Attempts to register a transient connection, enforcing the configured budget. + /// Returns `false` when the budget is exhausted, leaving the map unchanged. + pub fn try_register_transient(&self, peer: PeerId, location: Option) -> bool { + if self.transient_connections.contains_key(&peer) { + if let Some(mut entry) = self.transient_connections.get_mut(&peer) { + entry.location = location; + } + return true; + } + + let current = self.transient_in_use.load(Ordering::Acquire); + if current >= self.transient_budget { + return false; + } + + let key = peer.clone(); + self.transient_connections.insert( + peer, + TransientEntry { + opened_at: Instant::now(), + location, + }, + ); + let prev = self.transient_in_use.fetch_add(1, Ordering::SeqCst); + if prev >= self.transient_budget { + // Undo if we raced past the budget. + self.transient_connections.remove(&key); + self.transient_in_use.fetch_sub(1, Ordering::SeqCst); + return false; + } + + true + } + + /// Drops a transient connection and returns its metadata, if it existed. + /// Also decrements the transient budget counter. + pub fn drop_transient(&self, peer: &PeerId) -> Option { + let removed = self + .transient_connections + .remove(peer) + .map(|(_, entry)| entry); + if removed.is_some() { + self.transient_in_use.fetch_sub(1, Ordering::SeqCst); + } + removed + } + + #[allow(dead_code)] + pub fn is_transient(&self, peer: &PeerId) -> bool { + self.transient_connections.contains_key(peer) + } + + pub fn transient_count(&self) -> usize { + self.transient_in_use.load(Ordering::Acquire) + } + + pub fn transient_budget(&self) -> usize { + self.transient_budget + } + + pub fn transient_ttl(&self) -> Duration { + self.transient_ttl + } + /// Sets the peer id if is not already set, or returns the current peer id. pub fn try_set_peer_key(&self, addr: SocketAddr) -> Option { let mut this_peer = self.peer_key.lock(); diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 16ce71be8..85a875bdd 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -6,10 +6,7 @@ use std::collections::{BTreeSet, HashSet}; use std::net::SocketAddr; use std::{ - sync::{ - atomic::{AtomicU64, AtomicUsize}, - Arc, Weak, - }, + sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, }; use tracing::Instrument; diff --git a/crates/core/tests/error_notification.rs b/crates/core/tests/error_notification.rs index 2a111ee5e..7f59a642c 100644 --- a/crates/core/tests/error_notification.rs +++ b/crates/core/tests/error_notification.rs @@ -300,6 +300,8 @@ async fn test_connection_drop_error_notification() -> anyhow::Result<()> { network_port: Some(gateway_port), bandwidth_limit: None, blocked_addresses: None, + transient_budget: None, + transient_ttl_secs: None, }, config_paths: freenet::config::ConfigPathsArgs { config_dir: Some(temp_dir_gw.path().to_path_buf()), @@ -349,6 +351,8 @@ async fn test_connection_drop_error_notification() -> anyhow::Result<()> { network_port: None, bandwidth_limit: None, blocked_addresses: None, + transient_budget: None, + transient_ttl_secs: None, }, config_paths: freenet::config::ConfigPathsArgs { config_dir: Some(temp_dir_peer.path().to_path_buf()), diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index 2b4f542ed..e200a2e6b 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -95,6 +95,8 @@ async fn base_node_test_config( network_port: public_port, bandwidth_limit: None, blocked_addresses: None, + transient_budget: None, + transient_ttl_secs: None, }, config_paths: { freenet::config::ConfigPathsArgs { diff --git a/crates/core/tests/ubertest.rs b/crates/core/tests/ubertest.rs index 3579fb6e5..5ed7fc2ef 100644 --- a/crates/core/tests/ubertest.rs +++ b/crates/core/tests/ubertest.rs @@ -188,6 +188,8 @@ async fn create_peer_config( network_port: Some(network_port), bandwidth_limit: None, blocked_addresses: None, + transient_budget: None, + transient_ttl_secs: None, }, config_paths: freenet::config::ConfigPathsArgs { config_dir: Some(temp_dir.path().to_path_buf()), diff --git a/crates/freenet-macros/src/codegen.rs b/crates/freenet-macros/src/codegen.rs index d2ae2ecee..e7a7f76a7 100644 --- a/crates/freenet-macros/src/codegen.rs +++ b/crates/freenet-macros/src/codegen.rs @@ -149,6 +149,8 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { network_port: Some(network_port), bandwidth_limit: None, blocked_addresses: None, + transient_budget: None, + transient_ttl_secs: None, }, config_paths: freenet::config::ConfigPathsArgs { config_dir: Some(temp_dir.path().to_path_buf()), @@ -259,6 +261,8 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { network_port: Some(network_port), bandwidth_limit: None, blocked_addresses: None, + transient_budget: None, + transient_ttl_secs: None, }, config_paths: freenet::config::ConfigPathsArgs { config_dir: Some(temp_dir.path().to_path_buf()),