Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/freenet-ping/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions apps/freenet-ping/app/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ pub async fn base_node_test_config_with_rng(
network_port: public_port, // if None, node will pick a free one or use default
bandwidth_limit: None,
blocked_addresses,
transient_budget: None,
transient_ttl_secs: None,
},
config_paths: freenet::config::ConfigPathsArgs {
config_dir: Some(temp_dir.path().to_path_buf()),
Expand Down
22 changes: 14 additions & 8 deletions apps/freenet-ping/app/tests/run_app_blocked_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,23 +244,29 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult {
let (stream_node2, _) = connect_async(&uri_node2).await?;
let mut client_node2 = WebApi::start(stream_node2);

// Load contract code
// Compile/load contract code (same helper used by other app tests)
let path_to_code = std::path::PathBuf::from(PACKAGE_DIR).join(PATH_TO_CONTRACT);
tracing::info!(path=%path_to_code.display(), "Loading contract code");
let code = std::fs::read(path_to_code)
.ok()
.ok_or_else(|| anyhow!("Failed to read contract code"))?;
let code_hash = CodeHash::from_code(&code);
tracing::info!(path = %path_to_code.display(), "Loading contract code");

// First compile to compute the code hash, then rebuild options with the correct code_key
let temp_options = PingContractOptions {
frequency: Duration::from_secs(3),
ttl: Duration::from_secs(30),
tag: APP_TAG.to_string(),
code_key: String::new(),
};
let temp_params = Parameters::from(serde_json::to_vec(&temp_options).unwrap());
let temp_container = common::load_contract(&path_to_code, temp_params)?;
let code_hash = CodeHash::from_code(temp_container.data());

// Define contract options
let ping_options = PingContractOptions {
frequency: Duration::from_secs(3),
ttl: Duration::from_secs(30),
tag: APP_TAG.to_string(),
code_key: code_hash.to_string(),
};
let params = Parameters::from(serde_json::to_vec(&ping_options).unwrap());
let container = ContractContainer::try_from((code, &params))?;
let container = common::load_contract(&path_to_code, params)?;
let contract_key = container.key();

// Gateway puts the contract
Expand Down
39 changes: 39 additions & 0 deletions crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub(crate) const PCK_VERSION: &str = env!("CARGO_PKG_VERSION");
// Initialize the executor once.
static ASYNC_RT: LazyLock<Option<Runtime>> = LazyLock::new(GlobalExecutor::initialize_async_rt);

const DEFAULT_TRANSIENT_BUDGET: usize = 32;
const DEFAULT_TRANSIENT_TTL_SECS: u64 = 30;

const QUALIFIER: &str = "";
const ORGANIZATION: &str = "The Freenet Project Inc";
const APPLICATION: &str = "Freenet";
Expand Down Expand Up @@ -97,6 +100,8 @@ impl Default for ConfigArgs {
location: None,
bandwidth_limit: Some(3_000_000), // 3 MB/s default for streaming transfers only
blocked_addresses: None,
transient_budget: Some(DEFAULT_TRANSIENT_BUDGET),
transient_ttl_secs: Some(DEFAULT_TRANSIENT_TTL_SECS),
},
ws_api: WebsocketApiArgs {
address: Some(default_listening_address()),
Expand Down Expand Up @@ -361,6 +366,14 @@ impl ConfigArgs {
.network_api
.blocked_addresses
.map(|addrs| addrs.into_iter().collect()),
transient_budget: self
.network_api
.transient_budget
.unwrap_or(DEFAULT_TRANSIENT_BUDGET),
transient_ttl_secs: self
.network_api
.transient_ttl_secs
.unwrap_or(DEFAULT_TRANSIENT_TTL_SECS),
},
ws_api: WebsocketApiConfig {
// the websocket API is always local
Expand Down Expand Up @@ -542,6 +555,16 @@ pub struct NetworkArgs {
/// List of IP:port addresses to refuse connections to/from.
#[arg(long, num_args = 0..)]
pub blocked_addresses: Option<Vec<SocketAddr>>,

/// Maximum number of concurrent transient connections accepted by a gateway.
#[arg(long, env = "TRANSIENT_BUDGET")]
#[serde(rename = "transient-budget", skip_serializing_if = "Option::is_none")]
pub transient_budget: Option<usize>,

/// Time (in seconds) before an unpromoted transient connection is dropped.
#[arg(long, env = "TRANSIENT_TTL_SECS")]
#[serde(rename = "transient-ttl-secs", skip_serializing_if = "Option::is_none")]
pub transient_ttl_secs: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -608,6 +631,14 @@ pub struct NetworkApiConfig {
/// List of IP:port addresses to refuse connections to/from.
#[serde(skip_serializing_if = "Option::is_none")]
pub blocked_addresses: Option<HashSet<SocketAddr>>,

/// Maximum number of concurrent transient connections accepted by a gateway.
#[serde(default = "default_transient_budget", rename = "transient-budget")]
pub transient_budget: usize,

/// Time (in seconds) before an unpromoted transient connection is dropped.
#[serde(default = "default_transient_ttl_secs", rename = "transient-ttl-secs")]
pub transient_ttl_secs: u64,
}

mod port_allocation;
Expand All @@ -617,6 +648,14 @@ pub fn default_network_api_port() -> u16 {
find_available_port().unwrap_or(31337) // Fallback to 31337 if we can't find a random port
}

fn default_transient_budget() -> usize {
DEFAULT_TRANSIENT_BUDGET
}

fn default_transient_ttl_secs() -> u64 {
DEFAULT_TRANSIENT_TTL_SECS
}

#[derive(clap::Parser, Debug, Default, Copy, Clone, Serialize, Deserialize)]
pub struct WebsocketApiArgs {
/// Address to bind to for the websocket API, default is 0.0.0.0
Expand Down
60 changes: 40 additions & 20 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub struct NodeConfig {
pub(crate) max_upstream_bandwidth: Option<Rate>,
pub(crate) max_downstream_bandwidth: Option<Rate>,
pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
pub(crate) transient_budget: usize,
pub(crate) transient_ttl: Duration,
}

impl NodeConfig {
Expand Down Expand Up @@ -195,6 +197,8 @@ impl NodeConfig {
max_upstream_bandwidth: None,
max_downstream_bandwidth: None,
blocked_addresses: config.network_api.blocked_addresses.clone(),
transient_budget: config.network_api.transient_budget,
transient_ttl: Duration::from_secs(config.network_api.transient_ttl_secs),
})
}

Expand Down Expand Up @@ -1147,35 +1151,51 @@ async fn handle_aborted_op(
gateways: &[PeerKeyLocation],
) -> Result<(), OpError> {
use crate::util::IterExt;
if let TransactionType::Connect = tx.transaction_type() {
// attempt to establish a connection failed, this could be a fatal error since the node
// is useless without connecting to the network, we will retry with exponential backoff
// if necessary
match op_manager.pop(&tx) {
Ok(Some(OpEnum::Connect(op)))
if op.has_backoff()
&& op_manager.ring.open_connections()
< op_manager.ring.connection_manager.min_connections =>
{
let gateway = op.gateway().cloned();
if let Some(gateway) = gateway {
tracing::warn!("Retry connecting to gateway {}", gateway.peer);
connect::join_ring_request(None, &gateway, op_manager).await?;
match tx.transaction_type() {
TransactionType::Connect => {
// attempt to establish a connection failed, this could be a fatal error since the node
// is useless without connecting to the network, we will retry with exponential backoff
// if necessary
match op_manager.pop(&tx) {
Ok(Some(OpEnum::Connect(op)))
if op.has_backoff()
&& op_manager.ring.open_connections()
< op_manager.ring.connection_manager.min_connections =>
{
let gateway = op.gateway().cloned();
if let Some(gateway) = gateway {
tracing::warn!("Retry connecting to gateway {}", gateway.peer);
connect::join_ring_request(None, &gateway, op_manager).await?;
}
}
Ok(Some(OpEnum::Connect(_))) => {
if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
tracing::warn!("Retrying joining the ring with an other gateway");
if let Some(gateway) = gateways.iter().shuffle().next() {
connect::join_ring_request(None, gateway, op_manager).await?
}
}
}
Ok(Some(other)) => {
op_manager.push(tx, other).await?;
}
_ => {}
}
Ok(Some(OpEnum::Connect(_))) => {
if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
tracing::warn!("Retrying joining the ring with an other gateway");
if let Some(gateway) = gateways.iter().shuffle().next() {
connect::join_ring_request(None, gateway, op_manager).await?
}
TransactionType::Get => match op_manager.pop(&tx) {
Ok(Some(OpEnum::Get(op))) => {
if let Err(err) = op.handle_abort(op_manager).await {
if !matches!(err, OpError::StatePushed) {
return Err(err);
}
}
}
Ok(Some(other)) => {
op_manager.push(tx, other).await?;
}
_ => {}
}
},
_ => {}
}
Ok(())
}
Expand Down
Loading
Loading