Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/freenet-ping/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions apps/freenet-ping/app/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ pub async fn base_node_test_config_with_rng(
network_port: public_port, // if None, node will pick a free one or use default
bandwidth_limit: None,
blocked_addresses,
transient_budget: None,
transient_ttl_secs: None,
},
config_paths: freenet::config::ConfigPathsArgs {
config_dir: Some(temp_dir.path().to_path_buf()),
Expand Down
22 changes: 14 additions & 8 deletions apps/freenet-ping/app/tests/run_app_blocked_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,23 +244,29 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult {
let (stream_node2, _) = connect_async(&uri_node2).await?;
let mut client_node2 = WebApi::start(stream_node2);

// Load contract code
// Compile/load contract code (same helper used by other app tests)
let path_to_code = std::path::PathBuf::from(PACKAGE_DIR).join(PATH_TO_CONTRACT);
tracing::info!(path=%path_to_code.display(), "Loading contract code");
let code = std::fs::read(path_to_code)
.ok()
.ok_or_else(|| anyhow!("Failed to read contract code"))?;
let code_hash = CodeHash::from_code(&code);
tracing::info!(path = %path_to_code.display(), "Loading contract code");

// First compile to compute the code hash, then rebuild options with the correct code_key
let temp_options = PingContractOptions {
frequency: Duration::from_secs(3),
ttl: Duration::from_secs(30),
tag: APP_TAG.to_string(),
code_key: String::new(),
};
let temp_params = Parameters::from(serde_json::to_vec(&temp_options).unwrap());
let temp_container = common::load_contract(&path_to_code, temp_params)?;
let code_hash = CodeHash::from_code(temp_container.data());

// Define contract options
let ping_options = PingContractOptions {
frequency: Duration::from_secs(3),
ttl: Duration::from_secs(30),
tag: APP_TAG.to_string(),
code_key: code_hash.to_string(),
};
let params = Parameters::from(serde_json::to_vec(&ping_options).unwrap());
let container = ContractContainer::try_from((code, &params))?;
let container = common::load_contract(&path_to_code, params)?;
let contract_key = container.key();

// Gateway puts the contract
Expand Down
39 changes: 39 additions & 0 deletions crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub(crate) const PCK_VERSION: &str = env!("CARGO_PKG_VERSION");
// Initialize the executor once.
static ASYNC_RT: LazyLock<Option<Runtime>> = LazyLock::new(GlobalExecutor::initialize_async_rt);

const DEFAULT_TRANSIENT_BUDGET: usize = 32;
const DEFAULT_TRANSIENT_TTL_SECS: u64 = 30;

const QUALIFIER: &str = "";
const ORGANIZATION: &str = "The Freenet Project Inc";
const APPLICATION: &str = "Freenet";
Expand Down Expand Up @@ -97,6 +100,8 @@ impl Default for ConfigArgs {
location: None,
bandwidth_limit: Some(3_000_000), // 3 MB/s default for streaming transfers only
blocked_addresses: None,
transient_budget: Some(DEFAULT_TRANSIENT_BUDGET),
transient_ttl_secs: Some(DEFAULT_TRANSIENT_TTL_SECS),
},
ws_api: WebsocketApiArgs {
address: Some(default_listening_address()),
Expand Down Expand Up @@ -361,6 +366,14 @@ impl ConfigArgs {
.network_api
.blocked_addresses
.map(|addrs| addrs.into_iter().collect()),
transient_budget: self
.network_api
.transient_budget
.unwrap_or(DEFAULT_TRANSIENT_BUDGET),
transient_ttl_secs: self
.network_api
.transient_ttl_secs
.unwrap_or(DEFAULT_TRANSIENT_TTL_SECS),
},
ws_api: WebsocketApiConfig {
// the websocket API is always local
Expand Down Expand Up @@ -542,6 +555,16 @@ pub struct NetworkArgs {
/// List of IP:port addresses to refuse connections to/from.
#[arg(long, num_args = 0..)]
pub blocked_addresses: Option<Vec<SocketAddr>>,

/// Maximum number of concurrent transient connections accepted by a gateway.
#[arg(long, env = "TRANSIENT_BUDGET")]
#[serde(rename = "transient-budget", skip_serializing_if = "Option::is_none")]
pub transient_budget: Option<usize>,

/// Time (in seconds) before an unpromoted transient connection is dropped.
#[arg(long, env = "TRANSIENT_TTL_SECS")]
#[serde(rename = "transient-ttl-secs", skip_serializing_if = "Option::is_none")]
pub transient_ttl_secs: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -608,6 +631,14 @@ pub struct NetworkApiConfig {
/// List of IP:port addresses to refuse connections to/from.
#[serde(skip_serializing_if = "Option::is_none")]
pub blocked_addresses: Option<HashSet<SocketAddr>>,

/// Maximum number of concurrent transient connections accepted by a gateway.
#[serde(default = "default_transient_budget", rename = "transient-budget")]
pub transient_budget: usize,

/// Time (in seconds) before an unpromoted transient connection is dropped.
#[serde(default = "default_transient_ttl_secs", rename = "transient-ttl-secs")]
pub transient_ttl_secs: u64,
}

mod port_allocation;
Expand All @@ -617,6 +648,14 @@ pub fn default_network_api_port() -> u16 {
find_available_port().unwrap_or(31337) // Fallback to 31337 if we can't find a random port
}

fn default_transient_budget() -> usize {
DEFAULT_TRANSIENT_BUDGET
}

fn default_transient_ttl_secs() -> u64 {
DEFAULT_TRANSIENT_TTL_SECS
}

#[derive(clap::Parser, Debug, Default, Copy, Clone, Serialize, Deserialize)]
pub struct WebsocketApiArgs {
/// Address to bind to for the websocket API, default is 0.0.0.0
Expand Down
60 changes: 40 additions & 20 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub struct NodeConfig {
pub(crate) max_upstream_bandwidth: Option<Rate>,
pub(crate) max_downstream_bandwidth: Option<Rate>,
pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
pub(crate) transient_budget: usize,
pub(crate) transient_ttl: Duration,
}

impl NodeConfig {
Expand Down Expand Up @@ -195,6 +197,8 @@ impl NodeConfig {
max_upstream_bandwidth: None,
max_downstream_bandwidth: None,
blocked_addresses: config.network_api.blocked_addresses.clone(),
transient_budget: config.network_api.transient_budget,
transient_ttl: Duration::from_secs(config.network_api.transient_ttl_secs),
})
}

Expand Down Expand Up @@ -1147,35 +1151,51 @@ async fn handle_aborted_op(
gateways: &[PeerKeyLocation],
) -> Result<(), OpError> {
use crate::util::IterExt;
if let TransactionType::Connect = tx.transaction_type() {
// attempt to establish a connection failed, this could be a fatal error since the node
// is useless without connecting to the network, we will retry with exponential backoff
// if necessary
match op_manager.pop(&tx) {
Ok(Some(OpEnum::Connect(op)))
if op.has_backoff()
&& op_manager.ring.open_connections()
< op_manager.ring.connection_manager.min_connections =>
{
let gateway = op.gateway().cloned();
if let Some(gateway) = gateway {
tracing::warn!("Retry connecting to gateway {}", gateway.peer);
connect::join_ring_request(None, &gateway, op_manager).await?;
match tx.transaction_type() {
TransactionType::Connect => {
// attempt to establish a connection failed, this could be a fatal error since the node
// is useless without connecting to the network, we will retry with exponential backoff
// if necessary
match op_manager.pop(&tx) {
Ok(Some(OpEnum::Connect(op)))
if op.has_backoff()
&& op_manager.ring.open_connections()
< op_manager.ring.connection_manager.min_connections =>
{
let gateway = op.gateway().cloned();
if let Some(gateway) = gateway {
tracing::warn!("Retry connecting to gateway {}", gateway.peer);
connect::join_ring_request(None, &gateway, op_manager).await?;
}
}
Ok(Some(OpEnum::Connect(_))) => {
if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
tracing::warn!("Retrying joining the ring with an other gateway");
if let Some(gateway) = gateways.iter().shuffle().next() {
connect::join_ring_request(None, gateway, op_manager).await?
}
}
}
Ok(Some(other)) => {
op_manager.push(tx, other).await?;
}
_ => {}
}
Ok(Some(OpEnum::Connect(_))) => {
if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
tracing::warn!("Retrying joining the ring with an other gateway");
if let Some(gateway) = gateways.iter().shuffle().next() {
connect::join_ring_request(None, gateway, op_manager).await?
}
TransactionType::Get => match op_manager.pop(&tx) {
Ok(Some(OpEnum::Get(op))) => {
if let Err(err) = op.handle_abort(op_manager).await {
if !matches!(err, OpError::StatePushed) {
return Err(err);
}
}
}
Ok(Some(other)) => {
op_manager.push(tx, other).await?;
}
_ => {}
}
},
_ => {}
}
Ok(())
}
Expand Down
Loading
Loading