diff --git a/Cargo.lock b/Cargo.lock index 9acd99c56e6..7036a24a60e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1042,12 +1042,6 @@ dependencies = [ "litrs", ] -[[package]] -name = "dtoa" -version = "1.0.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" - [[package]] name = "dyn-clone" version = "1.0.18" @@ -1151,12 +1145,6 @@ dependencies = [ "serde", ] -[[package]] -name = "erased_set" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a02a5d186d7bf1cb21f1f95e1a9cfa5c1f2dcd803a47aad454423ceec13525c5" - [[package]] name = "errno" version = "0.3.10" @@ -1244,14 +1232,15 @@ dependencies = [ [[package]] name = "futures-buffered" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34acda8ae8b63fbe0b2195c998b180cff89a8212fb2622a78b572a9f1c6f7684" +checksum = "fe940397c8b744b9c2c974791c2c08bca2c3242ce0290393249e98f215a00472" dependencies = [ "cordyceps", "diatomic-waker", "futures-core", "pin-project-lite", + "spin", ] [[package]] @@ -1962,9 +1951,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" +checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" dependencies = [ "bytes", "futures-channel", @@ -1972,6 +1961,7 @@ dependencies = [ "http 1.2.0", "http-body", "hyper", + "libc", "pin-project-lite", "socket2", "tokio", @@ -2143,9 +2133,9 @@ dependencies = [ [[package]] name = "igd-next" -version = "0.15.1" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76b0d7d4541def58a37bf8efc559683f21edce7c82f0d866c93ac21f7e098f93" +checksum = "d06464e726471718db9ad3fefc020529fabcde03313a0fc3967510e2db5add12" dependencies = [ "async-trait", "attohttpc", @@ -2156,7 +2146,7 @@ dependencies = [ "hyper", "hyper-util", "log", - "rand 0.8.5", + "rand 0.9.0", "tokio", "url", "xmltree", @@ -2261,11 +2251,12 @@ dependencies = [ "iroh-relay", "n0-future", "netdev", - "netwatch 0.4.0", + "netwatch", "parse-size", "pin-project", "pkarr", "portmapper", + "postcard", "pretty_assertions", "rand 0.8.5", "rand_chacha 0.3.1", @@ -2391,23 +2382,34 @@ dependencies = [ [[package]] name = "iroh-metrics" -version = "0.32.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0f7cd1ffe3b152a5f4f4c1880e01e07d96001f20e02cc143cb7842987c616b3" +checksum = "f70466f14caff7420a14373676947e25e2917af6a5b1bec45825beb2bf1eb6a7" dependencies = [ - "erased_set", "http-body-util", "hyper", "hyper-util", - "prometheus-client", + "iroh-metrics-derive", + "itoa", "reqwest", "serde", - "struct_iterable", - "thiserror 2.0.11", + "snafu", "tokio", "tracing", ] +[[package]] +name = "iroh-metrics-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d12f5c45c4ed2436302a4e03cad9a0ad34b2962ad0c5791e1019c0ee30eeb09" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "iroh-quinn" version = "0.13.0" @@ -2599,9 +2601,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.169" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libredox" @@ -2812,9 +2814,9 @@ dependencies = [ [[package]] name = "n0-future" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "399e11dc3b0e8d9d65b27170d22f5d779d52d9bed888db70d7e0c2c7ce3dfc52" +checksum = "7bb0e5d99e681ab3c938842b96fcb41bf8a7bb4bfdb11ccbd653a7e83e06c794" dependencies = [ "cfg_aliases", "derive_more", @@ -2831,6 +2833,18 @@ dependencies = [ "web-time", ] +[[package]] +name = "nested_enum_utils" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43fa9161ed44d30e9702fe42bd78693bceac0fed02f647da749f36109023d3a3" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "netdev" version = "0.31.0" @@ -2875,11 +2889,12 @@ dependencies = [ [[package]] name = "netlink-packet-route" -version = "0.19.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74c171cd77b4ee8c7708da746ce392440cb7bcf618d122ec9ecc607b12938bf4" +checksum = "0800eae8638a299eaa67476e1c6b6692922273e0f7939fd188fc861c837b9cd2" dependencies = [ "anyhow", + "bitflags 2.8.0", "byteorder", "libc", "log", @@ -2928,42 +2943,9 @@ dependencies = [ [[package]] name = "netwatch" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64da82edf903649e6cb6a77b5a6f7fe01387d8865065d411d139018510880302" -dependencies = [ - "anyhow", - "atomic-waker", - "bytes", - "derive_more", - "futures-lite", - "futures-sink", - "futures-util", - "iroh-quinn-udp", - "libc", - "netdev", - "netlink-packet-core", - "netlink-packet-route 0.19.0", - "netlink-sys", - "once_cell", - "rtnetlink 0.13.1", - "rtnetlink 0.14.1", - "serde", - "socket2", - "thiserror 2.0.11", - "time", - "tokio", - "tokio-util", - "tracing", - "windows 0.58.0", - "wmi", -] - -[[package]] -name = "netwatch" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7879c2cfdf30d92f2be89efa3169b3d78107e3ab7f7b9a37157782569314e1" +checksum = "67eeaa5f7505c93c5a9b35ba84fd21fb8aa3f24678c76acfe8716af7862fb07a" dependencies = [ "atomic-waker", "bytes", @@ -2973,15 +2955,15 @@ dependencies = [ "js-sys", "libc", "n0-future", + "nested_enum_utils", "netdev", "netlink-packet-core", - "netlink-packet-route 0.19.0", + "netlink-packet-route 0.23.0", + "netlink-proto", "netlink-sys", - "rtnetlink 0.13.1", - "rtnetlink 0.14.1", "serde", + "snafu", "socket2", - "thiserror 2.0.11", "time", "tokio", "tokio-util", @@ -2992,28 +2974,6 @@ dependencies = [ "wmi", ] -[[package]] -name = "nix" -version = "0.26.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" -dependencies = [ - "bitflags 1.3.2", - "cfg-if", - "libc", -] - -[[package]] -name = "nix" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" -dependencies = [ - "bitflags 2.8.0", - "cfg-if", - "libc", -] - [[package]] name = "no-std-compat" version = "0.4.1" @@ -3479,28 +3439,31 @@ checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6" [[package]] name = "portmapper" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b715da165f399be093fecb2ca774b00713a3b32f6b27e0752fbf255e3be622af" +checksum = "7d6db66007eac4a0ec8331d0d20c734bd64f6445d64bbaf0d0a27fea7a054e36" dependencies = [ "base64", "bytes", "derive_more", "futures-lite", "futures-util", + "hyper-util", "igd-next", "iroh-metrics", "libc", - "netwatch 0.3.0", + "nested_enum_utils", + "netwatch", "num_enum", "rand 0.8.5", "serde", "smallvec", + "snafu", "socket2", - "thiserror 2.0.11", "time", "tokio", "tokio-util", + "tower-layer", "tracing", "url", ] @@ -3617,29 +3580,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "prometheus-client" -version = "0.22.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504ee9ff529add891127c4827eb481bd69dc0ebc72e9a682e187db4caa60c3ca" -dependencies = [ - "dtoa", - "itoa", - "parking_lot", - "prometheus-client-derive-encode", -] - -[[package]] -name = "prometheus-client-derive-encode" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.98", -] - [[package]] name = "proptest" version = "1.6.0" @@ -4017,9 +3957,9 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.13" +version = "0.17.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ac5d832aa16abd7d1def883a8545280c20a60f523a370aa3a9617c2b8550ee" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", "cfg-if", @@ -4029,42 +3969,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rtnetlink" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a552eb82d19f38c3beed3f786bd23aa434ceb9ac43ab44419ca6d67a7e186c0" -dependencies = [ - "futures", - "log", - "netlink-packet-core", - "netlink-packet-route 0.17.1", - "netlink-packet-utils", - "netlink-proto", - "netlink-sys", - "nix 0.26.4", - "thiserror 1.0.69", - "tokio", -] - -[[package]] -name = "rtnetlink" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b684475344d8df1859ddb2d395dd3dac4f8f3422a1aa0725993cb375fc5caba5" -dependencies = [ - "futures", - "log", - "netlink-packet-core", - "netlink-packet-route 0.19.0", - "netlink-packet-utils", - "netlink-proto", - "netlink-sys", - "nix 0.27.1", - "thiserror 1.0.69", - "tokio", -] - [[package]] name = "rustc-demangle" version = "0.1.24" @@ -4539,11 +4443,32 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fad6c857cbab2627dcf01ec85a623ca4e7dcb5691cbaa3d7fb7653671f0d09c9" +[[package]] +name = "snafu" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "socket2" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" dependencies = [ "libc", "windows-sys 0.52.0", @@ -4991,15 +4916,15 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.13" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", "futures-sink", "futures-util", - "hashbrown 0.14.5", + "hashbrown 0.15.2", "pin-project-lite", "tokio", ] diff --git a/iroh-dns-server/Cargo.toml b/iroh-dns-server/Cargo.toml index 22f45e68aef..c56e2d24fbd 100644 --- a/iroh-dns-server/Cargo.toml +++ b/iroh-dns-server/Cargo.toml @@ -28,7 +28,7 @@ governor = "0.8" hickory-server = { version = "0.25.1", features = ["https-ring"] } http = "1.0.0" humantime-serde = "1.1.1" -iroh-metrics = { version = "0.32.0", features = ["metrics", "service"] } +iroh-metrics = { version = "0.34", features = ["service"] } lru = "0.12.3" n0-future = "0.1.2" pkarr = { version = "3.7", features = ["relays", "dht"], default-features = false } diff --git a/iroh-dns-server/benches/write.rs b/iroh-dns-server/benches/write.rs index c0f6df6104f..771b673c0f3 100644 --- a/iroh-dns-server/benches/write.rs +++ b/iroh-dns-server/benches/write.rs @@ -1,15 +1,22 @@ +use std::sync::Arc; + use anyhow::Result; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use iroh::{discovery::pkarr::PkarrRelayClient, node_info::NodeInfo, SecretKey}; -use iroh_dns_server::{config::Config, server::Server, ZoneStore}; +use iroh_dns_server::{config::Config, metrics::Metrics, server::Server, ZoneStore}; use rand_chacha::rand_core::SeedableRng; use tokio::runtime::Runtime; const LOCALHOST_PKARR: &str = "http://localhost:8080/pkarr"; async fn start_dns_server(config: Config) -> Result { - let store = ZoneStore::persistent(Config::signed_packet_store_path()?, Default::default())?; - Server::spawn(config, store).await + let metrics = Arc::new(Metrics::default()); + let store = ZoneStore::persistent( + Config::signed_packet_store_path()?, + Default::default(), + metrics.clone(), + )?; + Server::spawn(config, store, metrics).await } fn benchmark_dns_server(c: &mut Criterion) { diff --git a/iroh-dns-server/src/dns.rs b/iroh-dns-server/src/dns.rs index bf07edaece8..eb8d27ea17c 100644 --- a/iroh-dns-server/src/dns.rs +++ b/iroh-dns-server/src/dns.rs @@ -26,7 +26,6 @@ use hickory_server::{ server::{Request, RequestHandler, ResponseHandler, ResponseInfo}, store::in_memory::InMemoryAuthority, }; -use iroh_metrics::inc; use serde::{Deserialize, Serialize}; use tokio::{ net::{TcpListener, UdpSocket}, @@ -122,12 +121,13 @@ impl DnsServer { pub struct DnsHandler { #[debug("Catalog")] catalog: Arc, + metrics: Arc, } impl DnsHandler { /// Create a DNS server given some settings, a connection to the DB for DID-by-username lookups /// and the server DID to serve under `_did.`. - pub fn new(zone_store: ZoneStore, config: &DnsConfig) -> Result { + pub fn new(zone_store: ZoneStore, config: &DnsConfig, metrics: Arc) -> Result { let origins = config .origins .iter() @@ -149,6 +149,7 @@ impl DnsHandler { Ok(Self { catalog: Arc::new(catalog), + metrics, }) } @@ -168,10 +169,14 @@ impl RequestHandler for DnsHandler { request: &Request, response_handle: R, ) -> ResponseInfo { - inc!(Metrics, dns_requests); + self.metrics.dns_requests.inc(); match request.protocol() { - Protocol::Udp => inc!(Metrics, dns_requests_udp), - Protocol::Https => inc!(Metrics, dns_requests_https), + Protocol::Udp => { + self.metrics.dns_requests_udp.inc(); + } + Protocol::Https => { + self.metrics.dns_requests_https.inc(); + } _ => {} } debug!(protocol=%request.protocol(), queries=?request.queries(), "incoming DNS request"); @@ -179,12 +184,12 @@ impl RequestHandler for DnsHandler { let res = self.catalog.handle_request(request, response_handle).await; match &res.response_code() { ResponseCode::NoError => match res.answer_count() { - 0 => inc!(Metrics, dns_lookup_notfound), - _ => inc!(Metrics, dns_lookup_success), + 0 => self.metrics.dns_lookup_notfound.inc(), + _ => self.metrics.dns_lookup_success.inc(), }, - ResponseCode::NXDomain => inc!(Metrics, dns_lookup_notfound), - _ => inc!(Metrics, dns_lookup_error), - } + ResponseCode::NXDomain => self.metrics.dns_lookup_notfound.inc(), + _ => self.metrics.dns_lookup_error.inc(), + }; res } } diff --git a/iroh-dns-server/src/http.rs b/iroh-dns-server/src/http.rs index d0ed7a876e6..00df72eac84 100644 --- a/iroh-dns-server/src/http.rs +++ b/iroh-dns-server/src/http.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::{bail, Context, Result}; use axum::{ - extract::{ConnectInfo, Request}, + extract::{ConnectInfo, Request, State}, handler::Handler, http::Method, middleware::{self, Next}, @@ -15,7 +15,6 @@ use axum::{ routing::get, Router, }; -use iroh_metrics::{inc, inc_by}; use serde::{Deserialize, Serialize}; use tokio::{net::TcpListener, task::JoinSet}; use tower_http::{ @@ -31,7 +30,7 @@ mod rate_limiting; mod tls; pub use self::{rate_limiting::RateLimitConfig, tls::CertMode}; -use crate::{config::Config, metrics::Metrics, state::AppState}; +use crate::{config::Config, state::AppState}; /// Config for the HTTP server #[derive(Debug, Serialize, Deserialize, Clone)] @@ -227,13 +226,13 @@ pub(crate) fn create_app(state: AppState, rate_limit_config: &RateLimitConfig) - ) .route("/healthcheck", get(|| async { "OK" })) .route("/", get(|| async { "Hi!" })) - .with_state(state); + .with_state(state.clone()); // configure app router .layer(cors) .layer(trace) - .route_layer(middleware::from_fn(metrics_middleware)) + .route_layer(middleware::from_fn_with_state(state, metrics_middleware)) } /// Record request metrics. @@ -244,17 +243,24 @@ pub(crate) fn create_app(state: AppState, rate_limit_config: &RateLimitConfig) - // // See also // https://github.com/tokio-rs/axum/blob/main/examples/prometheus-metrics/src/main.rs#L114 -async fn metrics_middleware(req: Request, next: Next) -> impl IntoResponse { +async fn metrics_middleware( + State(state): State, + req: Request, + next: Next, +) -> impl IntoResponse { let start = Instant::now(); let response = next.run(req).await; let latency = start.elapsed().as_millis(); let status = response.status(); - inc_by!(Metrics, http_requests_duration_ms, latency as u64); - inc!(Metrics, http_requests); + state + .metrics + .http_requests_duration_ms + .inc_by(latency as u64); + state.metrics.http_requests.inc(); if status.is_success() { - inc!(Metrics, http_requests_success); + state.metrics.http_requests_success.inc(); } else { - inc!(Metrics, http_requests_error); + state.metrics.http_requests_error.inc(); } response } diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index 1c2775e5de1..25708fabec6 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -194,7 +194,7 @@ mod tests { max_batch_time: Duration::from_millis(100), ..Default::default() }; - let store = ZoneStore::in_memory(options)?; + let store = ZoneStore::in_memory(options, Default::default())?; // create a signed packet let signed_packet = random_signed_packet()?; diff --git a/iroh-dns-server/src/main.rs b/iroh-dns-server/src/main.rs index e9575ff4164..05f18bc4d35 100644 --- a/iroh-dns-server/src/main.rs +++ b/iroh-dns-server/src/main.rs @@ -2,9 +2,7 @@ use std::path::PathBuf; use anyhow::Result; use clap::Parser; -use iroh_dns_server::{ - config::Config, metrics::init_metrics, server::run_with_config_until_ctrl_c, -}; +use iroh_dns_server::{config::Config, server::run_with_config_until_ctrl_c}; use tracing::debug; #[derive(Parser, Debug)] @@ -27,6 +25,5 @@ async fn main() -> Result<()> { Config::default() }; - init_metrics(); run_with_config_until_ctrl_c(config).await } diff --git a/iroh-dns-server/src/metrics.rs b/iroh-dns-server/src/metrics.rs index 1e0cc21088a..301336e8f10 100644 --- a/iroh-dns-server/src/metrics.rs +++ b/iroh-dns-server/src/metrics.rs @@ -1,64 +1,41 @@ //! Metrics support for the server -use iroh_metrics::core::{Core, Counter, Metric}; -use struct_iterable::Iterable; +use iroh_metrics::{Counter, MetricsGroup}; /// Metrics for iroh-dns-server -#[derive(Debug, Clone, Iterable)] -#[allow(missing_docs)] +#[derive(Debug, Default, MetricsGroup)] +#[metrics(name = "dns_server")] pub struct Metrics { + /// Number of pkarr relay puts that updated the state pub pkarr_publish_update: Counter, + /// Number of pkarr relay puts that did not update the state pub pkarr_publish_noop: Counter, + /// DNS requests (total) pub dns_requests: Counter, + /// DNS requests via UDP pub dns_requests_udp: Counter, + /// DNS requests via HTTPS (DoH) pub dns_requests_https: Counter, + /// DNS lookup responses with at least one answer pub dns_lookup_success: Counter, + /// DNS lookup responses with no answers pub dns_lookup_notfound: Counter, + /// DNS lookup responses which failed pub dns_lookup_error: Counter, + /// Number of HTTP requests pub http_requests: Counter, + /// Number of HTTP requests with a 2xx status code pub http_requests_success: Counter, + /// Number of HTTP requests with a non-2xx status code pub http_requests_error: Counter, + /// Total duration of all HTTP requests pub http_requests_duration_ms: Counter, + /// Signed packets inserted into the store pub store_packets_inserted: Counter, + /// Signed packets removed from the store pub store_packets_removed: Counter, + /// Number of updates to existing packets pub store_packets_updated: Counter, + /// Number of expired packets pub store_packets_expired: Counter, } - -impl Default for Metrics { - fn default() -> Self { - Self { - pkarr_publish_update: Counter::new("Number of pkarr relay puts that updated the state"), - pkarr_publish_noop: Counter::new( - "Number of pkarr relay puts that did not update the state", - ), - dns_requests: Counter::new("DNS requests (total)"), - dns_requests_udp: Counter::new("DNS requests via UDP"), - dns_requests_https: Counter::new("DNS requests via HTTPS (DoH)"), - dns_lookup_success: Counter::new("DNS lookup responses with at least one answer"), - dns_lookup_notfound: Counter::new("DNS lookup responses with no answers"), - dns_lookup_error: Counter::new("DNS lookup responses which failed"), - http_requests: Counter::new("Number of HTTP requests"), - http_requests_success: Counter::new("Number of HTTP requests with a 2xx status code"), - http_requests_error: Counter::new("Number of HTTP requests with a non-2xx status code"), - http_requests_duration_ms: Counter::new("Total duration of all HTTP requests"), - store_packets_inserted: Counter::new("Signed packets inserted into the store"), - store_packets_removed: Counter::new("Signed packets removed from the store"), - store_packets_updated: Counter::new("Number of updates to existing packets"), - store_packets_expired: Counter::new("Number of expired packets"), - } - } -} - -impl Metric for Metrics { - fn name() -> &'static str { - "dns_server" - } -} - -/// Init the metrics collection core. -pub fn init_metrics() { - Core::init(|reg, metrics| { - metrics.insert(Metrics::new(reg)); - }); -} diff --git a/iroh-dns-server/src/server.rs b/iroh-dns-server/src/server.rs index e40e3de1fd4..8ee9ceb9e93 100644 --- a/iroh-dns-server/src/server.rs +++ b/iroh-dns-server/src/server.rs @@ -1,29 +1,34 @@ //! The main server which combines the DNS and HTTP(S) servers. +use std::sync::Arc; + use anyhow::Result; -use iroh_metrics::metrics::start_metrics_server; +use iroh_metrics::service::start_metrics_server; use tracing::info; use crate::{ config::Config, dns::{DnsHandler, DnsServer}, http::HttpServer, + metrics::Metrics, state::AppState, store::ZoneStore, }; /// Spawn the server and run until the `Ctrl-C` signal is received, then shutdown. pub async fn run_with_config_until_ctrl_c(config: Config) -> Result<()> { + let metrics = Arc::new(Metrics::default()); let zone_store_options = config.zone_store.clone().unwrap_or_default(); let mut store = ZoneStore::persistent( Config::signed_packet_store_path()?, zone_store_options.into(), + metrics.clone(), )?; if let Some(bootstrap) = config.mainline_enabled() { info!("mainline fallback enabled"); store = store.with_mainline_fallback(bootstrap); }; - let server = Server::spawn(config, store).await?; + let server = Server::spawn(config, store, metrics).await?; tokio::signal::ctrl_c().await?; info!("shutdown"); server.shutdown().await?; @@ -44,15 +49,21 @@ impl Server { /// * A DNS server task /// * A HTTP server task, if `config.http` is not empty /// * A HTTPS server task, if `config.https` is not empty - pub async fn spawn(config: Config, store: ZoneStore) -> Result { - let dns_handler = DnsHandler::new(store.clone(), &config.dns)?; + pub async fn spawn(config: Config, store: ZoneStore, metrics: Arc) -> Result { + let dns_handler = DnsHandler::new(store.clone(), &config.dns, metrics.clone())?; - let state = AppState { store, dns_handler }; + let state = AppState { + store, + dns_handler, + metrics: metrics.clone(), + }; let metrics_addr = config.metrics_addr(); let metrics_task = tokio::task::spawn(async move { if let Some(addr) = metrics_addr { - start_metrics_server(addr).await?; + let mut registry = iroh_metrics::Registry::default(); + registry.register(metrics); + start_metrics_server(addr, Arc::new(registry)).await?; } Ok(()) }); @@ -122,12 +133,12 @@ impl Server { config.https = None; config.metrics = Some(MetricsConfig::disabled()); - let mut store = ZoneStore::in_memory(options.unwrap_or_default())?; + let mut store = ZoneStore::in_memory(options.unwrap_or_default(), Default::default())?; if let Some(bootstrap) = mainline { info!("mainline fallback enabled"); store = store.with_mainline_fallback(bootstrap); } - let server = Self::spawn(config, store).await?; + let server = Self::spawn(config, store, Default::default()).await?; let dns_addr = server.dns_server.local_addr(); let http_addr = server.http_server.http_addr().expect("http is set"); let http_url = format!("http://{http_addr}").parse()?; diff --git a/iroh-dns-server/src/state.rs b/iroh-dns-server/src/state.rs index 9063d40b3d0..a2d1513cafd 100644 --- a/iroh-dns-server/src/state.rs +++ b/iroh-dns-server/src/state.rs @@ -1,6 +1,8 @@ //! Shared state and store for the iroh-dns-server -use crate::{dns::DnsHandler, store::ZoneStore}; +use std::sync::Arc; + +use crate::{dns::DnsHandler, metrics::Metrics, store::ZoneStore}; /// The shared app state. #[derive(Clone)] @@ -9,4 +11,6 @@ pub struct AppState { pub store: ZoneStore, /// Handler for DNS requests pub dns_handler: DnsHandler, + /// Metrics collector. + pub metrics: Arc, } diff --git a/iroh-dns-server/src/store.rs b/iroh-dns-server/src/store.rs index 503faf2ceee..2dc129d1abc 100644 --- a/iroh-dns-server/src/store.rs +++ b/iroh-dns-server/src/store.rs @@ -4,7 +4,6 @@ use std::{collections::BTreeMap, num::NonZeroUsize, path::Path, sync::Arc, time: use anyhow::Result; use hickory_server::proto::rr::{Name, RecordSet, RecordType, RrKey}; -use iroh_metrics::inc; use lru::LruCache; use pkarr::{Client as PkarrClient, SignedPacket}; use tokio::sync::Mutex; @@ -41,19 +40,24 @@ pub struct ZoneStore { cache: Arc>, store: Arc, pkarr: Option>, + metrics: Arc, } impl ZoneStore { /// Create a persistent store - pub fn persistent(path: impl AsRef, options: ZoneStoreOptions) -> Result { - let packet_store = SignedPacketStore::persistent(path, options)?; - Ok(Self::new(packet_store)) + pub fn persistent( + path: impl AsRef, + options: ZoneStoreOptions, + metrics: Arc, + ) -> Result { + let packet_store = SignedPacketStore::persistent(path, options, metrics.clone())?; + Ok(Self::new(packet_store, metrics)) } /// Create an in-memory store. - pub fn in_memory(options: ZoneStoreOptions) -> Result { - let packet_store = SignedPacketStore::in_memory(options)?; - Ok(Self::new(packet_store)) + pub fn in_memory(options: ZoneStoreOptions, metrics: Arc) -> Result { + let packet_store = SignedPacketStore::in_memory(options, metrics.clone())?; + Ok(Self::new(packet_store, metrics)) } /// Configure a pkarr client for resolution of packets from the bittorrent mainline DHT. @@ -77,12 +81,13 @@ impl ZoneStore { } /// Create a new zone store. - pub fn new(store: SignedPacketStore) -> Self { + pub fn new(store: SignedPacketStore, metrics: Arc) -> Self { let zone_cache = ZoneCache::new(DEFAULT_CACHE_CAPACITY); Self { store: Arc::new(store), cache: Arc::new(Mutex::new(zone_cache)), pkarr: None, + metrics, } } @@ -144,11 +149,11 @@ impl ZoneStore { pub async fn insert(&self, signed_packet: SignedPacket, _source: PacketSource) -> Result { let pubkey = PublicKeyBytes::from_signed_packet(&signed_packet); if self.store.upsert(signed_packet).await? { - inc!(Metrics, pkarr_publish_update); + self.metrics.pkarr_publish_update.inc(); self.cache.lock().await.remove(&pubkey); Ok(true) } else { - inc!(Metrics, pkarr_publish_noop); + self.metrics.pkarr_publish_noop.inc(); Ok(false) } } diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index d01836567a2..a77d2516a46 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -1,7 +1,6 @@ -use std::{future::Future, path::Path, result, time::Duration}; +use std::{future::Future, path::Path, result, sync::Arc, time::Duration}; use anyhow::{Context, Result}; -use iroh_metrics::inc; use pkarr::{SignedPacket, Timestamp}; use redb::{ backends::InMemoryBackend, Database, MultimapTableDefinition, ReadableTable, TableDefinition, @@ -63,6 +62,7 @@ struct Actor { recv: PeekableReceiver, cancel: CancellationToken, options: Options, + metrics: Arc, } #[derive(Debug, Clone, Copy)] @@ -155,9 +155,9 @@ impl Actor { tables.signed_packets.insert(key.as_bytes(), &value[..])?; tables.update_time.insert(&packet.timestamp().to_bytes(), key.as_bytes())?; if replaced { - inc!(Metrics, store_packets_updated); + self.metrics.store_packets_updated.inc(); } else { - inc!(Metrics, store_packets_inserted); + self.metrics.store_packets_inserted.inc(); } res.send(true).ok(); } @@ -166,14 +166,11 @@ impl Actor { let updated = if let Some(row) = tables.signed_packets.remove(key.as_bytes())? { let packet = SignedPacket::deserialize(row.value())?; tables.update_time.remove(&packet.timestamp().to_bytes(), key.as_bytes())?; - inc!(Metrics, store_packets_removed); + self.metrics.store_packets_removed.inc(); true } else { false }; - if updated { - inc!(Metrics, store_packets_removed); - } res.send(updated).ok(); } Message::Snapshot { res } => { @@ -186,7 +183,7 @@ impl Actor { if packet.timestamp() < expired { tables.update_time.remove(&time, key.as_bytes())?; let _ = tables.signed_packets.remove(key.as_bytes())?; - inc!(Metrics, store_packets_expired); + self.metrics.store_packets_expired.inc(); } } } @@ -234,7 +231,11 @@ impl Snapshot { } impl SignedPacketStore { - pub fn persistent(path: impl AsRef, options: Options) -> Result { + pub fn persistent( + path: impl AsRef, + options: Options, + metrics: Arc, + ) -> Result { let path = path.as_ref(); info!("loading packet database from {}", path.to_string_lossy()); if let Some(parent) = path.parent() { @@ -248,16 +249,16 @@ impl SignedPacketStore { let db = Database::builder() .create(path) .context("failed to open packet database")?; - Self::open(db, options) + Self::open(db, options, metrics) } - pub fn in_memory(options: Options) -> Result { + pub fn in_memory(options: Options, metrics: Arc) -> Result { info!("using in-memory packet database"); let db = Database::builder().create_with_backend(InMemoryBackend::new())?; - Self::open(db, options) + Self::open(db, options, metrics) } - pub fn open(db: Database, options: Options) -> Result { + pub fn open(db: Database, options: Options, metrics: Arc) -> Result { // create tables let write_tx = db.begin_write()?; let _ = Tables::new(&write_tx)?; @@ -272,6 +273,7 @@ impl SignedPacketStore { recv: PeekableReceiver::new(recv), cancel: cancel2, options, + metrics, }; // start an io thread and donate it to the tokio runtime so we can do blocking IO // inside the thread despite being in a tokio runtime diff --git a/iroh-relay/Cargo.toml b/iroh-relay/Cargo.toml index 4c0c5f9a70b..95355a27cce 100644 --- a/iroh-relay/Cargo.toml +++ b/iroh-relay/Cargo.toml @@ -33,7 +33,7 @@ http-body-util = "0.1.0" hyper = { version = "1", features = ["server", "client", "http1"] } hyper-util = "0.1.1" iroh-base = { version = "0.34.1", path = "../iroh-base", default-features = false, features = ["key", "relay"] } -iroh-metrics = { version = "0.32", default-features = false } +iroh-metrics = { version = "0.34", default-features = false } n0-future = "0.1.2" num_enum = "0.7" pin-project = "1" @@ -139,6 +139,7 @@ cfg_aliases = { version = "0.2" } [features] default = ["metrics"] server = [ + "metrics", "dep:clap", "dep:dashmap", "dep:governor", diff --git a/iroh-relay/src/main.rs b/iroh-relay/src/main.rs index 5f2f0ccb26f..6771b3613d9 100644 --- a/iroh-relay/src/main.rs +++ b/iroh-relay/src/main.rs @@ -724,54 +724,6 @@ async fn build_relay_config(cfg: Config) -> Result Self { - Self { - /* - * Metrics about STUN requests - */ - requests: Counter::new("Number of STUN requests made to the server."), - ipv4_success: Counter::new("Number of successful ipv4 STUN requests served."), - ipv6_success: Counter::new("Number of successful ipv6 STUN requests served."), - bad_requests: Counter::new("Number of bad requests made to the STUN endpoint."), - failures: Counter::new("Number of STUN requests that end in failure."), - } - } - } - - impl Metric for StunMetrics { - fn name() -> &'static str { - "stun" - } - } -} - #[cfg(test)] mod tests { use std::num::NonZeroU32; diff --git a/iroh-relay/src/server.rs b/iroh-relay/src/server.rs index 4c13b135de2..3608877d531 100644 --- a/iroh-relay/src/server.rs +++ b/iroh-relay/src/server.rs @@ -27,7 +27,6 @@ use hyper::body::Incoming; use iroh_base::NodeId; #[cfg(feature = "test-utils")] use iroh_base::RelayUrl; -use iroh_metrics::inc; use n0_future::{future::Boxed, StreamExt}; use tokio::{ net::{TcpListener, UdpSocket}, @@ -53,7 +52,7 @@ pub(crate) mod streams; pub mod testing; pub use self::{ - metrics::{Metrics, StunMetrics}, + metrics::{Metrics, RelayMetrics, StunMetrics}, resolver::{ReloadingResolver, DEFAULT_CERT_RELOAD_INTERVAL}, }; @@ -269,6 +268,7 @@ pub struct Server { /// If the server has manual certificates configured the certificate chain will be /// available here, this can be used by a client to authenticate the server. certificates: Option>>, + metrics: RelayMetrics, } impl Server { @@ -280,18 +280,16 @@ impl Server { { let mut tasks = JoinSet::new(); + let metrics = RelayMetrics::default(); + #[cfg(feature = "metrics")] if let Some(addr) = config.metrics_addr { debug!("Starting metrics server"); - use iroh_metrics::core::Metric; - - iroh_metrics::core::Core::init(|reg, metrics| { - metrics.insert(metrics::Metrics::new(reg)); - metrics.insert(StunMetrics::new(reg)); - }); + let mut registry = iroh_metrics::Registry::default(); + registry.register_all(&metrics); tasks.spawn( async move { - iroh_metrics::metrics::start_metrics_server(addr).await?; + iroh_metrics::service::start_metrics_server(addr, Arc::new(registry)).await?; anyhow::Ok(()) } .instrument(info_span!("metrics-server")), @@ -307,7 +305,8 @@ impl Server { let addr = sock.local_addr()?; info!("STUN server listening on {addr}"); tasks.spawn( - server_stun_listener(sock).instrument(info_span!("stun-server", %addr)), + server_stun_listener(sock, metrics.stun.clone()) + .instrument(info_span!("stun-server", %addr)), ); Some(addr) } @@ -351,6 +350,7 @@ impl Server { .key_cache_capacity .unwrap_or(DEFAULT_KEY_CACHE_CAPACITY); let mut builder = http_server::ServerBuilder::new(relay_bind_addr) + .metrics(metrics.server.clone()) .headers(headers) .key_cache_capacity(key_cache_capacity) .access(relay_config.access) @@ -440,6 +440,7 @@ impl Server { quic_handle, supervisor: AbortOnDropHandle::new(task), certificates, + metrics, }) } @@ -514,6 +515,11 @@ impl Server { .into() }) } + + /// Returns the metrics collected in the relay server. + pub fn metrics(&self) -> &RelayMetrics { + &self.metrics + } } /// Supervisor for the relay server tasks. @@ -582,7 +588,7 @@ async fn relay_supervisor( /// Runs a STUN server. /// /// When the future is dropped, the server stops. -async fn server_stun_listener(sock: UdpSocket) -> Result<()> { +async fn server_stun_listener(sock: UdpSocket, metrics: Arc) -> Result<()> { info!(addr = ?sock.local_addr().ok(), "running STUN server"); let sock = Arc::new(sock); let mut buffer = vec![0u8; 64 << 10]; @@ -601,18 +607,18 @@ async fn server_stun_listener(sock: UdpSocket) -> Result<()> { res = sock.recv_from(&mut buffer) => { match res { Ok((n, src_addr)) => { - inc!(StunMetrics, requests); + metrics.requests.inc(); let pkt = &buffer[..n]; if !protos::stun::is(pkt) { debug!(%src_addr, "STUN: ignoring non stun packet"); - inc!(StunMetrics, bad_requests); + metrics.bad_requests.inc(); continue; } let pkt = pkt.to_vec(); - tasks.spawn(handle_stun_request(src_addr, pkt, sock.clone())); + tasks.spawn(handle_stun_request(src_addr, pkt, sock.clone(), metrics.clone())); } Err(err) => { - inc!(StunMetrics, failures); + metrics.failures.inc(); warn!("failed to recv: {err:#}"); } } @@ -622,14 +628,19 @@ async fn server_stun_listener(sock: UdpSocket) -> Result<()> { } /// Handles a single STUN request, doing all logging required. -async fn handle_stun_request(src_addr: SocketAddr, pkt: Vec, sock: Arc) { +async fn handle_stun_request( + src_addr: SocketAddr, + pkt: Vec, + sock: Arc, + metrics: Arc, +) { let (txid, response) = match protos::stun::parse_binding_request(&pkt) { Ok(txid) => { debug!(%src_addr, %txid, "STUN: received binding request"); (txid, protos::stun::response(txid, src_addr)) } Err(err) => { - inc!(StunMetrics, bad_requests); + metrics.bad_requests.inc(); warn!(%src_addr, "STUN: invalid binding request: {:?}", err); return; } @@ -646,14 +657,14 @@ async fn handle_stun_request(src_addr: SocketAddr, pkt: Vec, sock: Arc inc!(StunMetrics, ipv4_success), - SocketAddr::V6(_) => inc!(StunMetrics, ipv6_success), - } + SocketAddr::V4(_) => metrics.ipv4_success.inc(), + SocketAddr::V6(_) => metrics.ipv6_success.inc(), + }; } trace!(%src_addr, %txid, "sent {len} bytes"); } Err(err) => { - inc!(StunMetrics, failures); + metrics.failures.inc(); warn!(%src_addr, %txid, "failed to write response: {err:#}"); } } diff --git a/iroh-relay/src/server/client.rs b/iroh-relay/src/server/client.rs index ed42e55d7e7..ec6832c4243 100644 --- a/iroh-relay/src/server/client.rs +++ b/iroh-relay/src/server/client.rs @@ -8,7 +8,6 @@ use std::{ use anyhow::{bail, Context, Result}; use bytes::Bytes; use iroh_base::NodeId; -use iroh_metrics::{inc, inc_by}; use n0_future::{FutureExt, Sink, SinkExt, Stream, StreamExt}; use rand::Rng; use time::{Date, OffsetDateTime}; @@ -73,7 +72,12 @@ impl Client { /// Creates a client from a connection & starts a read and write loop to handle io to and from /// the client /// Call [`Client::shutdown`] to close the read and write loops before dropping the [`Client`] - pub(super) fn new(config: Config, connection_id: u64, clients: &Clients) -> Client { + pub(super) fn new( + config: Config, + connection_id: u64, + clients: &Clients, + metrics: Arc, + ) -> Client { let Config { node_id, stream: io, @@ -89,9 +93,9 @@ impl Client { quota = quota.allow_burst(max_burst); } let limiter = governor::RateLimiter::direct(quota); - RateLimitedRelayedStream::new(io, limiter) + RateLimitedRelayedStream::new(io, limiter, metrics.clone()) } - None => RateLimitedRelayedStream::unlimited(io), + None => RateLimitedRelayedStream::unlimited(io, metrics.clone()), }; let done = CancellationToken::new(); @@ -111,6 +115,7 @@ impl Client { clients: clients.clone(), client_counter: ClientCounter::default(), ping_tracker: PingTracker::default(), + metrics, }; // start io loop @@ -213,6 +218,7 @@ struct Actor { /// Statistics about the connected clients client_counter: ClientCounter, ping_tracker: PingTracker, + metrics: Arc, } impl Actor { @@ -220,9 +226,9 @@ impl Actor { // Note the accept and disconnects metrics must be in a pair. Technically the // connection is accepted long before this in the HTTP server, but it is clearer to // handle the metric here. - inc!(Metrics, accepts); + self.metrics.accepts.inc(); if self.client_counter.update(self.node_id) { - inc!(Metrics, unique_client_keys); + self.metrics.unique_client_keys.inc(); } match self.run_inner(done).await { Err(e) => { @@ -234,7 +240,7 @@ impl Actor { } self.clients.unregister(self.connection_id, self.node_id); - inc!(Metrics, disconnects); + self.metrics.disconnects.inc(); } async fn run_inner(&mut self, done: CancellationToken) -> Result<()> { @@ -314,7 +320,7 @@ impl Actor { let content = packet.data; if let Ok(len) = content.len().try_into() { - inc_by!(Metrics, bytes_sent, len); + self.metrics.bytes_sent.inc_by(len); } self.write_frame(Frame::RecvPacket { src_key, content }) .await @@ -324,11 +330,11 @@ impl Actor { trace!("send packet"); match self.send_raw(packet).await { Ok(()) => { - inc!(Metrics, send_packets_sent); + self.metrics.send_packets_sent.inc(); Ok(()) } Err(err) => { - inc!(Metrics, send_packets_dropped); + self.metrics.send_packets_dropped.inc(); Err(err) } } @@ -338,11 +344,11 @@ impl Actor { trace!("send disco packet"); match self.send_raw(packet).await { Ok(()) => { - inc!(Metrics, disco_packets_sent); + self.metrics.disco_packets_sent.inc(); Ok(()) } Err(err) => { - inc!(Metrics, disco_packets_dropped); + self.metrics.disco_packets_dropped.inc(); Err(err) } } @@ -360,13 +366,13 @@ impl Actor { Frame::SendPacket { dst_key, packet } => { let packet_len = packet.len(); self.handle_frame_send_packet(dst_key, packet)?; - inc_by!(Metrics, bytes_recv, packet_len as u64); + self.metrics.bytes_recv.inc_by(packet_len as u64); } Frame::Ping { data } => { - inc!(Metrics, got_ping); + self.metrics.got_ping.inc(); // TODO: add rate limiter self.write_frame(Frame::Pong { data }).await?; - inc!(Metrics, sent_pong); + self.metrics.sent_pong.inc(); } Frame::Pong { data } => { self.ping_tracker.pong_received(data); @@ -375,7 +381,7 @@ impl Actor { bail!("server issue: {:?}", problem); } _ => { - inc!(Metrics, unknown_frames); + self.metrics.unknown_frames.inc(); } } Ok(()) @@ -383,11 +389,13 @@ impl Actor { fn handle_frame_send_packet(&self, dst: NodeId, data: Bytes) -> Result<()> { if disco::looks_like_disco_wrapper(&data) { - inc!(Metrics, disco_packets_recv); - self.clients.send_disco_packet(dst, data, self.node_id)?; + self.metrics.disco_packets_recv.inc(); + self.clients + .send_disco_packet(dst, data, self.node_id, &self.metrics)?; } else { - inc!(Metrics, send_packets_recv); - self.clients.send_packet(dst, data, self.node_id)?; + self.metrics.send_packets_recv.inc(); + self.clients + .send_packet(dst, data, self.node_id, &self.metrics)?; } Ok(()) } @@ -406,6 +414,7 @@ struct RateLimitedRelayedStream { state: State, /// Keeps track if this stream was ever rate-limited. limited_once: bool, + metrics: Arc, } #[derive(derive_more::Debug)] @@ -421,21 +430,27 @@ enum State { } impl RateLimitedRelayedStream { - fn new(inner: RelayedStream, limiter: governor::DefaultDirectRateLimiter) -> Self { + fn new( + inner: RelayedStream, + limiter: governor::DefaultDirectRateLimiter, + metrics: Arc, + ) -> Self { Self { inner, limiter: Some(Arc::new(limiter)), state: State::Ready, limited_once: false, + metrics, } } - fn unlimited(inner: RelayedStream) -> Self { + fn unlimited(inner: RelayedStream, metrics: Arc) -> Self { Self { inner, limiter: None, state: State::Ready, limited_once: false, + metrics, } } } @@ -444,9 +459,9 @@ impl RateLimitedRelayedStream { /// Records metrics about being rate-limited. fn record_rate_limited(&mut self) { // TODO: add a label for the frame type. - inc!(Metrics, frames_rx_ratelimited_total); + self.metrics.frames_rx_ratelimited_total.inc(); if !self.limited_once { - inc!(Metrics, conns_rx_ratelimited_total); + self.metrics.conns_rx_ratelimited_total.inc(); self.limited_once = true; } } @@ -628,8 +643,9 @@ mod tests { RelayedStream::Relay(Framed::new(MaybeTlsStream::Test(io), RelayCodec::test())); let clients = Clients::default(); + let metrics = Arc::new(Metrics::default()); let actor = Actor { - stream: RateLimitedRelayedStream::unlimited(stream), + stream: RateLimitedRelayedStream::unlimited(stream, metrics.clone()), timeout: Duration::from_secs(1), send_queue: send_queue_r, disco_send_queue: disco_send_queue_r, @@ -639,6 +655,7 @@ mod tests { clients: clients.clone(), client_counter: ClientCounter::default(), ping_tracker: PingTracker::default(), + metrics, }; let done = CancellationToken::new(); @@ -742,7 +759,7 @@ mod tests { MaybeTlsStream::Test(io_read), RelayCodec::test(), )); - let mut stream = RateLimitedRelayedStream::new(stream, limiter); + let mut stream = RateLimitedRelayedStream::new(stream, limiter, Default::default()); // Prepare a frame to send, assert its size. let data = Bytes::from_static(b"hello world!!"); diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index 6713dd06872..ed71e93dd7b 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -13,7 +13,6 @@ use anyhow::{bail, Result}; use bytes::Bytes; use dashmap::DashMap; use iroh_base::NodeId; -use iroh_metrics::inc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, trace}; @@ -45,12 +44,12 @@ impl Clients { } /// Builds the client handler and starts the read & write loops for the connection. - pub async fn register(&self, client_config: Config) { + pub async fn register(&self, client_config: Config, metrics: Arc) { let node_id = client_config.node_id; let connection_id = self.get_connection_id(); trace!(remote_node = node_id.fmt_short(), "registering client"); - let client = Client::new(client_config, connection_id, self); + let client = Client::new(client_config, connection_id, self, metrics); if let Some(old_client) = self.0.clients.insert(node_id, client) { debug!( remote_node = node_id.fmt_short(), @@ -104,10 +103,16 @@ impl Clients { } /// Attempt to send a packet to client with [`NodeId`] `dst`. - pub(super) fn send_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> { + pub(super) fn send_packet( + &self, + dst: NodeId, + data: Bytes, + src: NodeId, + metrics: &Metrics, + ) -> Result<()> { let Some(client) = self.0.clients.get(&dst) else { debug!(dst = dst.fmt_short(), "no connected client, dropped packet"); - inc!(Metrics, send_packets_dropped); + metrics.send_packets_dropped.inc(); return Ok(()); }; match client.try_send_packet(src, data) { @@ -135,13 +140,19 @@ impl Clients { } /// Attempt to send a disco packet to client with [`NodeId`] `dst`. - pub(super) fn send_disco_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> { + pub(super) fn send_disco_packet( + &self, + dst: NodeId, + data: Bytes, + src: NodeId, + metrics: &Metrics, + ) -> Result<()> { let Some(client) = self.0.clients.get(&dst) else { debug!( dst = dst.fmt_short(), "no connected client, dropped disco packet" ); - inc!(Metrics, disco_packets_dropped); + metrics.disco_packets_dropped.inc(); return Ok(()); }; match client.try_send_disco_packet(src, data) { @@ -209,11 +220,12 @@ mod tests { let (builder_a, mut a_rw) = test_client_builder(a_key); let clients = Clients::default(); - clients.register(builder_a).await; + let metrics = Arc::new(Metrics::default()); + clients.register(builder_a, metrics.clone()).await; // send packet let data = b"hello world!"; - clients.send_packet(a_key, Bytes::from(&data[..]), b_key)?; + clients.send_packet(a_key, Bytes::from(&data[..]), b_key, &metrics)?; let frame = recv_frame(FrameType::RecvPacket, &mut a_rw).await?; assert_eq!( frame, @@ -224,7 +236,7 @@ mod tests { ); // send disco packet - clients.send_disco_packet(a_key, Bytes::from(&data[..]), b_key)?; + clients.send_disco_packet(a_key, Bytes::from(&data[..]), b_key, &metrics)?; let frame = recv_frame(FrameType::RecvPacket, &mut a_rw).await?; assert_eq!( frame, diff --git a/iroh-relay/src/server/http_server.rs b/iroh-relay/src/server/http_server.rs index cead3ebe0b2..ca4ea755edb 100644 --- a/iroh-relay/src/server/http_server.rs +++ b/iroh-relay/src/server/http_server.rs @@ -13,7 +13,6 @@ use hyper::{ upgrade::Upgraded, HeaderMap, Method, Request, Response, StatusCode, }; -use iroh_metrics::inc; use n0_future::{FutureExt, SinkExt}; use tokio::net::{TcpListener, TcpStream}; use tokio_rustls_acme::AcmeAcceptor; @@ -176,6 +175,7 @@ pub(super) struct ServerBuilder { key_cache_capacity: usize, /// Access config for nodes. access: AccessConfig, + metrics: Option>, } impl ServerBuilder { @@ -189,9 +189,16 @@ impl ServerBuilder { client_rx_ratelimit: None, key_cache_capacity: DEFAULT_KEY_CACHE_CAPACITY, access: AccessConfig::Everyone, + metrics: None, } } + /// Sets the metrics collector. + pub(super) fn metrics(mut self, metrics: Arc) -> Self { + self.metrics = Some(metrics); + self + } + /// Set the access configuration. pub(super) fn access(mut self, access: AccessConfig) -> Self { self.access = access; @@ -248,6 +255,7 @@ impl ServerBuilder { self.client_rx_ratelimit, KeyCache::new(self.key_cache_capacity), self.access, + self.metrics.unwrap_or_default(), ); let addr = self.addr; @@ -327,6 +335,7 @@ struct Inner { rate_limit: Option, key_cache: KeyCache, access: AccessConfig, + metrics: Arc, } impl RelayService { @@ -517,11 +526,11 @@ impl Inner { trace!(?protocol, "accept: start"); let mut io = match protocol { Protocol::Relay => { - inc!(Metrics, relay_accepts); + self.metrics.relay_accepts.inc(); RelayedStream::Relay(Framed::new(io, RelayCodec::new(self.key_cache.clone()))) } Protocol::Websocket => { - inc!(Metrics, websocket_accepts); + self.metrics.websocket_accepts.inc(); // Since we already did the HTTP upgrade in the previous step, // we use tokio-websockets to handle this connection // Create a server builder with default config @@ -569,7 +578,9 @@ impl Inner { // build and register client, starting up read & write loops for the client // connection - self.clients.register(client_conn_builder).await; + self.clients + .register(client_conn_builder, self.metrics.clone()) + .await; Ok(()) } } @@ -591,6 +602,7 @@ impl RelayService { rate_limit: Option, key_cache: KeyCache, access: AccessConfig, + metrics: Arc, ) -> Self { Self(Arc::new(Inner { handlers, @@ -600,6 +612,7 @@ impl RelayService { rate_limit, key_cache, access, + metrics, })) } @@ -952,6 +965,7 @@ mod tests { None, KeyCache::test(), AccessConfig::Everyone, + Default::default(), ); info!("Create client A and connect it to the server."); @@ -1039,6 +1053,7 @@ mod tests { None, KeyCache::test(), AccessConfig::Everyone, + Default::default(), ); info!("Create client A and connect it to the server."); diff --git a/iroh-relay/src/server/metrics.rs b/iroh-relay/src/server/metrics.rs index c552b278b17..bf9e52df125 100644 --- a/iroh-relay/src/server/metrics.rs +++ b/iroh-relay/src/server/metrics.rs @@ -1,45 +1,59 @@ -use iroh_metrics::{ - core::{Counter, Metric}, - struct_iterable::Iterable, -}; +use std::sync::Arc; + +use iroh_metrics::{Counter, MetricsGroup, MetricsGroupSet}; /// Metrics tracked for the relay server -#[derive(Debug, Clone, Iterable)] +#[derive(Debug, Default, MetricsGroup)] +#[metrics(name = "relayserver")] pub struct Metrics { /* * Metrics about packets */ /// Bytes sent from a `FrameType::SendPacket` + #[metrics(help = "Number of bytes sent.")] pub bytes_sent: Counter, /// Bytes received from a `FrameType::SendPacket` + #[metrics(help = "Number of bytes received.")] pub bytes_recv: Counter, /// `FrameType::SendPacket` sent, that are not disco messages + #[metrics(help = "Number of 'send' packets relayed.")] pub send_packets_sent: Counter, /// `FrameType::SendPacket` received, that are not disco messages + #[metrics(help = "Number of 'send' packets received.")] pub send_packets_recv: Counter, /// `FrameType::SendPacket` dropped, that are not disco messages + #[metrics(help = "Number of 'send' packets dropped.")] pub send_packets_dropped: Counter, /// `FrameType::SendPacket` sent that are disco messages + #[metrics(help = "Number of disco packets sent.")] pub disco_packets_sent: Counter, /// `FrameType::SendPacket` received that are disco messages + #[metrics(help = "Number of disco packets received.")] pub disco_packets_recv: Counter, /// `FrameType::SendPacket` dropped that are disco messages + #[metrics(help = "Number of disco packets dropped.")] pub disco_packets_dropped: Counter, /// Packets of other `FrameType`s sent + #[metrics(help = "Number of packets sent that were not disco packets or 'send' packets")] pub other_packets_sent: Counter, /// Packets of other `FrameType`s received + #[metrics(help = "Number of packets received that were not disco packets or 'send' packets")] pub other_packets_recv: Counter, /// Packets of other `FrameType`s dropped + #[metrics(help = "Number of times a non-disco, non-send packet was dropped.")] pub other_packets_dropped: Counter, /// Number of `FrameType::Ping`s received + #[metrics(help = "Number of times the server has received a Ping from a client.")] pub got_ping: Counter, /// Number of `FrameType::Pong`s sent + #[metrics(help = "Number of times the server has sent a Pong to a client.")] pub sent_pong: Counter, /// Number of `FrameType::Unknown` received + #[metrics(help = "Number of unknown frames sent to this server.")] pub unknown_frames: Counter, /// Number of frames received from client connection which have been rate-limited. @@ -50,9 +64,10 @@ pub struct Metrics { /* * Metrics about peers */ - /// Number of connections we have accepted + /// Number of times this server has accepted a connection. pub accepts: Counter, /// Number of connections we have removed because of an error + #[metrics(help = "Number of clients that have then disconnected.")] pub disconnects: Counter, /// Number of unique client keys per day @@ -69,101 +84,28 @@ pub struct Metrics { // pub average_queue_duration: } -impl Default for Metrics { - fn default() -> Self { - Self { - /* - * Metrics about packets - */ - send_packets_sent: Counter::new("Number of 'send' packets relayed."), - bytes_sent: Counter::new("Number of bytes sent."), - send_packets_recv: Counter::new("Number of 'send' packets received."), - bytes_recv: Counter::new("Number of bytes received."), - send_packets_dropped: Counter::new("Number of 'send' packets dropped."), - disco_packets_sent: Counter::new("Number of disco packets sent."), - disco_packets_recv: Counter::new("Number of disco packets received."), - disco_packets_dropped: Counter::new("Number of disco packets dropped."), - - other_packets_sent: Counter::new( - "Number of packets sent that were not disco packets or 'send' packets", - ), - other_packets_recv: Counter::new( - "Number of packets received that were not disco packets or 'send' packets", - ), - other_packets_dropped: Counter::new( - "Number of times a non-disco, non-'send; packet was dropped.", - ), - got_ping: Counter::new("Number of times the server has received a Ping from a client."), - sent_pong: Counter::new("Number of times the server has sent a Pong to a client."), - unknown_frames: Counter::new("Number of unknown frames sent to this server."), - frames_rx_ratelimited_total: Counter::new( - "Number of frames received from client connection which have been rate-limited.", - ), - conns_rx_ratelimited_total: Counter::new( - "Number of client connections which have had any frames rate-limited.", - ), - - /* - * Metrics about peers - */ - accepts: Counter::new("Number of times this server has accepted a connection."), - disconnects: Counter::new("Number of clients that have then disconnected."), - - unique_client_keys: Counter::new("Number of unique client keys per day."), - - websocket_accepts: Counter::new("Number of accepted websocket connections"), - relay_accepts: Counter::new("Number of accepted 'iroh derp http' connection upgrades"), - // TODO: enable when we can have multiple connections for one node id - // pub duplicate_client_keys: Counter::new("Number of duplicate client keys."), - // pub duplicate_client_conns: Counter::new("Number of duplicate client connections."), - // TODO: only important stat that we cannot track right now - // pub average_queue_duration: - } - } -} - -impl Metric for Metrics { - fn name() -> &'static str { - "relayserver" - } -} - -/// StunMetrics tracked for the relay server -#[derive(Debug, Clone, Iterable)] +/// Metrics tracked for the STUN server. +#[derive(Debug, Default, MetricsGroup)] +#[metrics(name = "stun")] pub struct StunMetrics { - /* - * Metrics about STUN requests - */ - /// Number of stun requests made + /// Number of STUN requests made to the server. pub requests: Counter, - /// Number of successful requests over ipv4 + /// Number of successful ipv4 STUN requests served. pub ipv4_success: Counter, - /// Number of successful requests over ipv6 + /// Number of successful ipv6 STUN requests served. pub ipv6_success: Counter, - - /// Number of bad requests, either non-stun packets or incorrect binding request + /// Number of bad requests made to the STUN endpoint. pub bad_requests: Counter, - /// Number of failures + /// Number of STUN requests that end in failure. pub failures: Counter, } -impl Default for StunMetrics { - fn default() -> Self { - Self { - /* - * Metrics about STUN requests - */ - requests: Counter::new("Number of STUN requests made to the server."), - ipv4_success: Counter::new("Number of successful ipv4 STUN requests served."), - ipv6_success: Counter::new("Number of successful ipv6 STUN requests served."), - bad_requests: Counter::new("Number of bad requests made to the STUN endpoint."), - failures: Counter::new("Number of STUN requests that end in failure."), - } - } -} - -impl Metric for StunMetrics { - fn name() -> &'static str { - "stun" - } +/// All metrics tracked in the relay server. +#[derive(Debug, Default, Clone, MetricsGroupSet)] +#[metrics(name = "relay")] +pub struct RelayMetrics { + /// Metrics tracked for the STUN server. + pub stun: Arc, + /// Metrics tracked for the relay server. + pub server: Arc, } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index c6248de1ce4..b8bba068879 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -43,7 +43,7 @@ http = "1" iroh-base = { version = "0.34.1", default-features = false, features = ["key", "relay"], path = "../iroh-base" } iroh-relay = { version = "0.34", path = "../iroh-relay", default-features = false } n0-future = "0.1.2" -netwatch = { version = "0.4" } +netwatch = { version = "0.5" } pin-project = "1" pkarr = { version = "3.7", default-features = false, features = [ "relays", @@ -80,7 +80,7 @@ x509-parser = "0.16" z32 = "1.0.3" # metrics -iroh-metrics = { version = "0.32", default-features = false } +iroh-metrics = { version = "0.34", default-features = false } # local-swarm-discovery swarm-discovery = { version = "0.3.1", optional = true } @@ -100,9 +100,9 @@ parse-size = { version = "=1.0.0", optional = true } # pinned version to avoid b # non-wasm-in-browser dependencies [target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies] hickory-resolver = "0.25.1" -igd-next = { version = "0.15.1", features = ["aio_tokio"] } +igd-next = { version = "0.16", features = ["aio_tokio"] } netdev = { version = "0.31.0" } -portmapper = { version = "0.4", default-features = false } +portmapper = { version = "0.5.0", default-features = false } quinn = { package = "iroh-quinn", version = "0.13.0", default-features = false, features = ["runtime-tokio", "rustls-ring"] } tokio = { version = "1", features = [ "io-util", @@ -126,6 +126,7 @@ time = { version = "0.3", features = ["wasm-bindgen"] } # target-common test/dev dependencies [dev-dependencies] +postcard = { version = "1.1.1", features = ["use-std"] } testresult = "0.4.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/iroh/bench/Cargo.toml b/iroh/bench/Cargo.toml index 14542557d50..df38174a412 100644 --- a/iroh/bench/Cargo.toml +++ b/iroh/bench/Cargo.toml @@ -10,7 +10,7 @@ anyhow = "1.0.22" bytes = "1.7" hdrhistogram = { version = "7.2", default-features = false } iroh = { path = ".." } -iroh-metrics = "0.32" +iroh-metrics = "0.34" n0-future = "0.1.1" quinn = { package = "iroh-quinn", version = "0.13" } rand = "0.8" diff --git a/iroh/bench/src/bin/bulk.rs b/iroh/bench/src/bin/bulk.rs index 85514bef489..ac7ca7d982b 100644 --- a/iroh/bench/src/bin/bulk.rs +++ b/iroh/bench/src/bin/bulk.rs @@ -5,6 +5,7 @@ use clap::Parser; #[cfg(not(any(target_os = "freebsd", target_os = "openbsd", target_os = "netbsd")))] use iroh_bench::quinn; use iroh_bench::{configure_tracing_subscriber, iroh, rt, s2n, Commands, Opt}; +use iroh_metrics::{MetricValue, MetricsGroup}; fn main() { let cmd = Commands::parse(); @@ -31,28 +32,15 @@ fn main() { } pub fn run_iroh(opt: Opt) -> Result<()> { - if opt.metrics { - // enable recording metrics - iroh_metrics::core::Core::try_init(|reg, metrics| { - use iroh_metrics::core::Metric; - metrics.insert(::iroh::metrics::MagicsockMetrics::new(reg)); - metrics.insert(::iroh::metrics::NetReportMetrics::new(reg)); - metrics.insert(::iroh::metrics::PortmapMetrics::new(reg)); - #[cfg(feature = "local-relay")] - if opt.only_relay { - metrics.insert(::iroh::metrics::RelayMetrics::new(reg)); - } - })?; - } - let server_span = tracing::error_span!("server"); let runtime = rt(); #[cfg(feature = "local-relay")] - let (relay_url, _guard) = if opt.only_relay { - let (_, relay_url, _guard) = runtime.block_on(::iroh::test_utils::run_relay_server())?; + let (relay_url, relay_server) = if opt.only_relay { + let (_, relay_url, relay_server) = + runtime.block_on(::iroh::test_utils::run_relay_server())?; - (Some(relay_url), Some(_guard)) + (Some(relay_url), Some(relay_server)) } else { (None, None) }; @@ -64,6 +52,8 @@ pub fn run_iroh(opt: Opt) -> Result<()> { iroh::server_endpoint(&runtime, &relay_url, &opt) }; + let endpoint_metrics = endpoint.metrics().clone(); + let server_thread = std::thread::spawn(move || { let _guard = server_span.entered(); if let Err(e) = runtime.block_on(iroh::server(endpoint, opt)) { @@ -98,27 +88,15 @@ pub fn run_iroh(opt: Opt) -> Result<()> { if opt.metrics { // print metrics - let core = - iroh_metrics::core::Core::get().ok_or_else(|| anyhow::anyhow!("Missing metrics"))?; println!("\nMetrics:"); - collect_and_print( - "MagicsockMetrics", - core.get_collector::<::iroh::metrics::MagicsockMetrics>(), - ); - collect_and_print( - "NetReportMetrics", - core.get_collector::<::iroh::metrics::NetReportMetrics>(), - ); - collect_and_print( - "PortmapMetrics", - core.get_collector::<::iroh::metrics::PortmapMetrics>(), - ); - // if None, (this is the case if opt.only_relay is false), then this is skipped internally: + collect_and_print("MagicsockMetrics", &*endpoint_metrics.magicsock); + collect_and_print("NetReportMetrics", &*endpoint_metrics.net_report); + collect_and_print("PortmapMetrics", &*endpoint_metrics.portmapper); #[cfg(feature = "local-relay")] - collect_and_print( - "RelayMetrics", - core.get_collector::<::iroh::metrics::RelayMetrics>(), - ); + if let Some(relay_server) = relay_server.as_ref() { + collect_and_print("RelayServerMetrics", &*relay_server.metrics().server); + collect_and_print("RelayStunMetrics", &*relay_server.metrics().stun); + } } server_thread.join().expect("server thread"); @@ -181,18 +159,15 @@ pub fn run_s2n(_opt: s2n::Opt) -> Result<()> { unimplemented!() } -fn collect_and_print( - category: &'static str, - metrics: Option<&impl iroh_metrics::struct_iterable::Iterable>, -) { - let Some(metrics) = metrics else { - return; - }; +fn collect_and_print(category: &'static str, metrics: &dyn MetricsGroup) { let mut map = BTreeMap::new(); - for (name, counter) in metrics.iter() { - if let Some(counter) = counter.downcast_ref::() { - map.insert(name.to_string(), counter.get()); - } + for item in metrics.iter() { + let value: i64 = match item.value() { + MetricValue::Counter(v) => v as i64, + MetricValue::Gauge(v) => v, + _ => continue, + }; + map.insert(item.name().to_string(), value); } println!("{category}: {map:#?}"); } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 2f951680768..55d199a1800 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -41,6 +41,7 @@ use crate::{ DiscoveryTask, Lagged, UserData, }, magicsock::{self, Handle, NodeIdMappedAddr}, + metrics::EndpointMetrics, tls, watchable::Watcher, RelayProtocol, @@ -192,6 +193,8 @@ impl Builder { }; let server_config = static_config.create_server_config(self.alpn_protocols)?; + let metrics = EndpointMetrics::default(); + let msock_opts = magicsock::Options { addr_v4: self.addr_v4, addr_v6: self.addr_v6, @@ -209,6 +212,7 @@ impl Builder { insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, #[cfg(any(test, feature = "test-utils"))] path_selection: self.path_selection, + metrics, }; Endpoint::bind(static_config, msock_opts).await } @@ -620,7 +624,7 @@ impl Endpoint { let ep = Self { msock: msock.clone(), - rtt_actor: Arc::new(rtt_actor::RttHandle::new()), + rtt_actor: Arc::new(rtt_actor::RttHandle::new(msock.metrics.magicsock.clone())), static_config: Arc::new(static_config), session_store: Arc::new(rustls::client::ClientSessionMemoryCache::new( MAX_TLS_TICKETS, @@ -1104,6 +1108,122 @@ impl Endpoint { self.msock.discovery() } + /// Returns metrics collected for this endpoint. + /// + /// The endpoint internally collects various metrics about its operation. + /// The returned [`EndpointMetrics`] struct contains all of these metrics. + /// + /// You can access individual metrics directly by using the public fields: + /// ```rust + /// # use std::collections::BTreeMap; + /// # use iroh::endpoint::Endpoint; + /// # async fn wrapper() -> testresult::TestResult { + /// let endpoint = Endpoint::builder().bind().await?; + /// assert_eq!(endpoint.metrics().magicsock.recv_datagrams.get(), 0); + /// # Ok(()) + /// # } + /// ``` + /// + /// [`EndpointMetrics`] implements [`MetricsGroupSet`], and each field + /// implements [`MetricsGroup`]. These traits provide methods to iterate over + /// the groups in the set, and over the individual metrics in each group, without having + /// to access each field manually. With these methods, it is straightforward to collect + /// all metrics into a map or push their values to a metrics collector. + /// + /// For example, the following snippet collects all metrics into a map: + /// ```rust + /// # use std::collections::BTreeMap; + /// # use iroh_metrics::{Metric, MetricsGroup, MetricValue, MetricsGroupSet}; + /// # use iroh::endpoint::Endpoint; + /// # async fn wrapper() -> testresult::TestResult { + /// let endpoint = Endpoint::builder().bind().await?; + /// let metrics: BTreeMap = endpoint + /// .metrics() + /// .iter() + /// .map(|(group, metric)| { + /// let name = [group, metric.name()].join(":"); + /// (name, metric.value()) + /// }) + /// .collect(); + /// + /// assert_eq!(metrics["magicsock:recv_datagrams"], MetricValue::Counter(0)); + /// # Ok(()) + /// # } + /// ``` + /// + /// The metrics can also be encoded into the OpenMetrics text format, as used by Prometheus. + /// To do so, use the [`iroh_metrics::Registry`], add the endpoint metrics to the + /// registry with [`Registry::register_all`], and encode the metrics to a string with + /// [`encode_openmetrics_to_string`]: + /// ```rust + /// # use iroh_metrics::{Registry, MetricsSource}; + /// # use iroh::endpoint::Endpoint; + /// # async fn wrapper() -> testresult::TestResult { + /// let endpoint = Endpoint::builder().bind().await?; + /// let mut registry = Registry::default(); + /// registry.register_all(endpoint.metrics()); + /// let s = registry.encode_openmetrics_to_string()?; + /// assert!(s.contains(r#"TYPE magicsock_recv_datagrams counter"#)); + /// assert!(s.contains(r#"magicsock_recv_datagrams_total 0"#)); + /// # Ok(()) + /// # } + /// ``` + /// + /// Through a registry, you can also add labels or prefixes to metrics with + /// [`Registry::sub_registry_with_label`] or [`Registry::sub_registry_with_prefix`]. + /// Furthermore, [`iroh_metrics::service`] provides functions to easily start services + /// to serve the metrics with a HTTP server, dump them to a file, or push them + /// to a Prometheus gateway. + /// + /// For example, the following snippet launches an HTTP server that serves the metrics in the + /// OpenMetrics text format: + /// ```no_run + /// # use std::{sync::{Arc, RwLock}, time::Duration}; + /// # use iroh_metrics::{Registry, MetricsSource}; + /// # use iroh::endpoint::Endpoint; + /// # async fn wrapper() -> testresult::TestResult { + /// // Create a registry, wrapped in a read-write lock so that we can register and serve + /// // the metrics independently. + /// let registry = Arc::new(RwLock::new(Registry::default())); + /// // Spawn a task to serve the metrics on an OpenMetrics HTTP endpoint. + /// let metrics_task = tokio::task::spawn({ + /// let registry = registry.clone(); + /// async move { + /// let addr = "0.0.0.0:9100".parse().unwrap(); + /// iroh_metrics::service::start_metrics_server(addr, registry).await + /// } + /// }); + /// + /// // Spawn an endpoint and add the metrics to the registry. + /// let endpoint = Endpoint::builder().bind().await?; + /// registry.write().unwrap().register_all(endpoint.metrics()); + /// + /// // Wait for the metrics server to bind, then fetch the metrics via HTTP. + /// tokio::time::sleep(Duration::from_millis(500)); + /// let res = reqwest::get("http://localhost:9100/metrics") + /// .await? + /// .text() + /// .await?; + /// + /// assert!(res.contains(r#"TYPE magicsock_recv_datagrams counter"#)); + /// assert!(res.contains(r#"magicsock_recv_datagrams_total 0"#)); + /// # metrics_task.abort(); + /// # Ok(()) + /// # } + /// ``` + /// + /// [`Registry`]: iroh_metrics::Registry + /// [`Registry::register_all`]: iroh_metrics::Registry::register_all + /// [`Registry::sub_registry_with_label`]: iroh_metrics::Registry::sub_registry_with_label + /// [`Registry::sub_registry_with_prefix`]: iroh_metrics::Registry::sub_registry_with_prefix + /// [`encode_openmetrics_to_string`]: iroh_metrics::MetricsSource::encode_openmetrics_to_string + /// [`MetricsGroup`]: iroh_metrics::MetricsGroup + /// [`MetricsGroupSet`]: iroh_metrics::MetricsGroupSet + #[cfg(feature = "metrics")] + pub fn metrics(&self) -> &EndpointMetrics { + &self.msock.metrics + } + // # Methods for less common state updates. /// Notifies the system of potential network changes. @@ -2053,6 +2173,7 @@ mod tests { use std::time::Instant; + use iroh_metrics::MetricsSource; use iroh_relay::http::Protocol; use n0_future::StreamExt; use rand::SeedableRng; @@ -2807,6 +2928,74 @@ mod tests { Ok(()) } + #[tokio::test] + #[traced_test] + async fn metrics_smoke() -> testresult::TestResult { + use iroh_metrics::Registry; + + let secret_key = SecretKey::from_bytes(&[0u8; 32]); + let client = Endpoint::builder() + .secret_key(secret_key) + .relay_mode(RelayMode::Disabled) + .bind() + .await?; + let secret_key = SecretKey::from_bytes(&[1u8; 32]); + let server = Endpoint::builder() + .secret_key(secret_key) + .relay_mode(RelayMode::Disabled) + .alpns(vec![TEST_ALPN.to_vec()]) + .bind() + .await?; + let server_addr = server.node_addr().await?; + let server_task = tokio::task::spawn(async move { + let conn = server + .accept() + .await + .context("expected conn")? + .accept()? + .await?; + let mut uni = conn.accept_uni().await?; + uni.read_to_end(10).await?; + drop(conn); + anyhow::Ok(server) + }); + let conn = client.connect(server_addr, TEST_ALPN).await?; + let mut uni = conn.open_uni().await?; + uni.write_all(b"helloworld").await?; + uni.finish()?; + conn.closed().await; + drop(conn); + let server = server_task.await??; + + let m = client.metrics(); + assert_eq!(m.magicsock.num_direct_conns_added.get(), 1); + assert_eq!(m.magicsock.connection_became_direct.get(), 1); + assert_eq!(m.magicsock.connection_handshake_success.get(), 1); + assert_eq!(m.magicsock.nodes_contacted_directly.get(), 1); + assert!(m.magicsock.recv_datagrams.get() > 0); + + let m = server.metrics(); + assert_eq!(m.magicsock.num_direct_conns_added.get(), 1); + assert_eq!(m.magicsock.connection_became_direct.get(), 1); + assert_eq!(m.magicsock.nodes_contacted_directly.get(), 1); + assert_eq!(m.magicsock.connection_handshake_success.get(), 1); + assert!(m.magicsock.recv_datagrams.get() > 0); + + // test openmetrics encoding with labeled subregistries per endpoint + fn register_endpoint(registry: &mut Registry, endpoint: &Endpoint) { + let id = endpoint.node_id().fmt_short(); + let sub_registry = registry.sub_registry_with_label("id", id); + sub_registry.register_all(endpoint.metrics()); + } + let mut registry = Registry::default(); + register_endpoint(&mut registry, &client); + register_endpoint(&mut registry, &server); + let s = registry.encode_openmetrics_to_string()?; + assert!(s.contains(r#"magicsock_nodes_contacted_directly_total{id="3b6a27bcce"} 1"#)); + assert!(s.contains(r#"magicsock_nodes_contacted_directly_total{id="8a88e3dd74"} 1"#)); + Ok(()) + } + /// Configures the accept side to take `accept_alpns` ALPNs, then connects to it with `primary_connect_alpn` /// with `secondary_connect_alpns` set, and finally returns the negotiated ALPN. async fn alpn_connection_test( diff --git a/iroh/src/endpoint/rtt_actor.rs b/iroh/src/endpoint/rtt_actor.rs index 6fe4e0ac38f..51fc433c467 100644 --- a/iroh/src/endpoint/rtt_actor.rs +++ b/iroh/src/endpoint/rtt_actor.rs @@ -1,9 +1,8 @@ //! Actor which coordinates the congestion controller for the magic socket -use std::{pin::Pin, task::Poll}; +use std::{pin::Pin, sync::Arc, task::Poll}; use iroh_base::NodeId; -use iroh_metrics::inc; use n0_future::{ task::{self, AbortOnDropHandle}, MergeUnbounded, Stream, StreamExt, @@ -21,9 +20,10 @@ pub(super) struct RttHandle { } impl RttHandle { - pub(super) fn new() -> Self { + pub(super) fn new(metrics: Arc) -> Self { let mut actor = RttActor { connection_events: Default::default(), + metrics, }; let (msg_tx, msg_rx) = mpsc::channel(16); let handle = task::spawn( @@ -62,6 +62,7 @@ struct RttActor { /// Stream of connection type changes. #[debug("MergeUnbounded>")] connection_events: MergeUnbounded, + metrics: Arc, } #[derive(Debug)] @@ -75,8 +76,12 @@ struct MappedStream { was_direct_before: bool, } +struct ConnectionEvent { + became_direct: bool, +} + impl Stream for MappedStream { - type Item = ConnectionType; + type Item = ConnectionEvent; /// Performs the congestion controller reset for a magic socket path change. /// @@ -90,6 +95,7 @@ impl Stream for MappedStream { ) -> Poll> { match Pin::new(&mut self.stream).poll_next(cx) { Poll::Ready(Some(new_conn_type)) => { + let mut became_direct = false; if self.connection.network_path_changed() { debug!( node_id = %self.node_id.fmt_short(), @@ -99,10 +105,10 @@ impl Stream for MappedStream { if !self.was_direct_before && matches!(new_conn_type, ConnectionType::Direct(_)) { self.was_direct_before = true; - inc!(MagicsockMetrics, connection_became_direct); + became_direct = true } - } - Poll::Ready(Some(new_conn_type)) + }; + Poll::Ready(Some(ConnectionEvent { became_direct })) } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, @@ -124,7 +130,11 @@ impl RttActor { None => break, } } - _item = self.connection_events.next(), if !self.connection_events.is_empty() => {} + event = self.connection_events.next(), if !self.connection_events.is_empty() => { + if event.map(|e| e.became_direct).unwrap_or(false) { + self.metrics.connection_became_direct.inc(); + } + } } } debug!("rtt-actor finished"); @@ -156,6 +166,6 @@ impl RttActor { node_id, was_direct_before: false, }); - inc!(MagicsockMetrics, connection_handshake_success); + self.metrics.connection_handshake_success.inc(); } } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index c54d9e20fdf..5b051840eac 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -34,7 +34,6 @@ use bytes::Bytes; use concurrent_queue::ConcurrentQueue; use data_encoding::HEXLOWER; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey}; -use iroh_metrics::{inc, inc_by}; use iroh_relay::{protos::stun, RelayMap}; use n0_future::{ boxed::BoxStream, @@ -75,6 +74,7 @@ use crate::{ disco::{self, CallMeMaybe, SendAddr}, discovery::{Discovery, DiscoveryItem, DiscoverySubscribers, NodeData, UserData}, key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret}, + metrics::EndpointMetrics, net_report::{self, IpMappedAddresses}, watchable::{Watchable, Watcher}, }; @@ -150,6 +150,8 @@ pub(crate) struct Options { /// Configuration for what path selection to use #[cfg(any(test, feature = "test-utils"))] pub(crate) path_selection: PathSelection, + + pub(crate) metrics: EndpointMetrics, } /// Contents of a relay message. Use a SmallVec to avoid allocations for the very @@ -257,6 +259,8 @@ pub(crate) struct MagicSock { /// Broadcast channel for listening to discovery updates. discovery_subscribers: DiscoverySubscribers, + + pub(crate) metrics: EndpointMetrics, } /// Sockets and related state, grouped together so we can cfg them out for browsers. @@ -387,7 +391,8 @@ impl MagicSock { } } if !addr.is_empty() { - self.node_map.add_node_addr(addr, source); + self.node_map + .add_node_addr(addr, source, &self.metrics.magicsock); Ok(()) } else if pruned != 0 { Err(anyhow::anyhow!( @@ -464,14 +469,16 @@ impl MagicSock { /// Implementation for AsyncUdpSocket::try_send #[instrument(skip_all)] fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> { - inc_by!(MagicsockMetrics, send_data, transmit.contents.len() as _); + self.metrics + .magicsock + .send_data + .inc_by(transmit.contents.len() as _); if self.is_closed() { - inc_by!( - MagicsockMetrics, - send_data_network_down, - transmit.contents.len() as _ - ); + self.metrics + .magicsock + .send_data_network_down + .inc_by(transmit.contents.len() as _); return Err(io::Error::new( io::ErrorKind::NotConnected, "connection closed", @@ -499,10 +506,11 @@ impl MagicSock { // Get the node's relay address and best direct address, as well // as any pings that need to be sent for hole-punching purposes. let mut transmit = transmit.clone(); - match self - .node_map - .get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed)) - { + match self.node_map.get_send_addrs( + dest, + self.ipv6_reported.load(Ordering::Relaxed), + &self.metrics.magicsock, + ) { Some((node_id, udp_addr, relay_url, msgs)) => { let mut pings_sent = false; // If we have pings to send, we *have* to send them out first. @@ -722,9 +730,9 @@ impl MagicSock { conn.try_send(transmit)?; let total_bytes: u64 = transmit.contents.len() as u64; if addr.is_ipv6() { - inc_by!(MagicsockMetrics, send_ipv6, total_bytes); + self.metrics.magicsock.send_ipv6.inc_by(total_bytes); } else { - inc_by!(MagicsockMetrics, send_ipv4, total_bytes); + self.metrics.magicsock.send_ipv4.inc_by(total_bytes); } Ok(()) } @@ -869,7 +877,7 @@ impl MagicSock { let mut quic_datagram_count = 0; if meta.len > meta.stride { trace!(%meta.len, %meta.stride, "GRO datagram received"); - inc!(MagicsockMetrics, recv_gro_datagrams); + self.metrics.magicsock.recv_gro_datagrams.inc(); } // Chunk through the datagrams in this GRO payload to find disco and stun @@ -903,9 +911,15 @@ impl MagicSock { } else { trace!(src = %meta.addr, len = %meta.stride, "UDP recv: quic packet"); if from_ipv4 { - inc_by!(MagicsockMetrics, recv_data_ipv4, datagram.len() as _); + self.metrics + .magicsock + .recv_data_ipv4 + .inc_by(datagram.len() as _); } else { - inc_by!(MagicsockMetrics, recv_data_ipv6, datagram.len() as _); + self.metrics + .magicsock + .recv_data_ipv6 + .inc_by(datagram.len() as _); } quic_datagram_count += 1; buf_contains_quic_datagrams = true; @@ -962,7 +976,10 @@ impl MagicSock { } if quic_packets_total > 0 { - inc_by!(MagicsockMetrics, recv_datagrams, quic_packets_total as _); + self.metrics + .magicsock + .recv_datagrams + .inc_by(quic_packets_total as _); trace!("UDP recv: {} packets", quic_packets_total); } } @@ -1002,7 +1019,10 @@ impl MagicSock { continue; } Some((node_id, meta, buf)) => { - inc_by!(MagicsockMetrics, recv_data_relay, buf.len() as _); + self.metrics + .magicsock + .recv_data_relay + .inc_by(buf.len() as _); trace!( src = %meta.addr, node = %node_id.fmt_short(), @@ -1021,7 +1041,7 @@ impl MagicSock { // If we have any msgs to report, they are in the first `num_msgs_total` slots if num_msgs > 0 { - inc_by!(MagicsockMetrics, recv_datagrams, num_msgs as _); + self.metrics.magicsock.recv_datagrams.inc_by(num_msgs as _); Poll::Ready(Ok(num_msgs)) } else { Poll::Pending @@ -1114,7 +1134,7 @@ impl MagicSock { Ok(dm) => dm, Err(DiscoBoxError::Open(err)) => { warn!(?err, "failed to open disco box"); - inc!(MagicsockMetrics, recv_disco_bad_key); + self.metrics.magicsock.recv_disco_bad_key.inc(); return; } Err(DiscoBoxError::Parse(err)) => { @@ -1124,16 +1144,16 @@ impl MagicSock { // understand. Not even worth logging about, lest it // be too spammy for old clients. - inc!(MagicsockMetrics, recv_disco_bad_parse); + self.metrics.magicsock.recv_disco_bad_parse.inc(); debug!(?err, "failed to parse disco message"); return; } }; if src.is_relay() { - inc!(MagicsockMetrics, recv_disco_relay); + self.metrics.magicsock.recv_disco_relay.inc(); } else { - inc!(MagicsockMetrics, recv_disco_udp); + self.metrics.magicsock.recv_disco_udp.inc(); } let span = trace_span!("handle_disco", ?dm); @@ -1141,15 +1161,15 @@ impl MagicSock { trace!("receive disco message"); match dm { disco::Message::Ping(ping) => { - inc!(MagicsockMetrics, recv_disco_ping); + self.metrics.magicsock.recv_disco_ping.inc(); self.handle_ping(ping, sender, src); } disco::Message::Pong(pong) => { - inc!(MagicsockMetrics, recv_disco_pong); + self.metrics.magicsock.recv_disco_pong.inc(); self.node_map.handle_pong(sender, &src, pong); } disco::Message::CallMeMaybe(cm) => { - inc!(MagicsockMetrics, recv_disco_call_me_maybe); + self.metrics.magicsock.recv_disco_call_me_maybe.inc(); match src { DiscoMessageSource::Relay { url, .. } => { event!( @@ -1165,7 +1185,9 @@ impl MagicSock { return; } } - let ping_actions = self.node_map.handle_call_me_maybe(sender, cm); + let ping_actions = + self.node_map + .handle_call_me_maybe(sender, cm, &self.metrics.magicsock); for action in ping_actions { match action { PingAction::SendCallMeMaybe { .. } => { @@ -1347,7 +1369,7 @@ impl MagicSock { fn send_disco_message_relay(&self, url: &RelayUrl, dst: NodeId, msg: disco::Message) -> bool { debug!(node = %dst.fmt_short(), %url, %msg, "send disco message (relay)"); let pkt = self.encode_disco_message(dst, &msg); - inc!(MagicsockMetrics, send_disco_relay); + self.metrics.magicsock.send_disco_relay.inc(); match self.try_send_relay(url, dst, smallvec![pkt]) { Ok(()) => { if let disco::Message::CallMeMaybe(CallMeMaybe { ref my_numbers }) = msg { @@ -1359,8 +1381,8 @@ impl MagicSock { addrs = ?my_numbers, ); } - inc!(MagicsockMetrics, sent_disco_relay); - disco_message_sent(&msg); + self.metrics.magicsock.sent_disco_relay.inc(); + disco_message_sent(&msg, &self.metrics.magicsock); true } Err(_) => false, @@ -1411,7 +1433,7 @@ impl MagicSock { let pkt = self.encode_disco_message(dst_node, msg); // TODO: These metrics will be wrong with the poll impl // Also - do we need it? I'd say the `sent_disco_udp` below is enough. - inc!(MagicsockMetrics, send_disco_udp); + self.metrics.magicsock.send_disco_udp.inc(); let transmit = quinn_udp::Transmit { destination: dst, contents: &pkt, @@ -1423,8 +1445,8 @@ impl MagicSock { match sent { Ok(()) => { trace!(%dst, node = %dst_node.fmt_short(), %msg, "sent disco message"); - inc!(MagicsockMetrics, sent_disco_udp); - disco_message_sent(msg); + self.metrics.magicsock.sent_disco_udp.inc(); + disco_message_sent(msg, &self.metrics.magicsock); Ok(()) } Err(err) => { @@ -1516,7 +1538,7 @@ impl MagicSock { #[instrument(skip_all)] fn re_stun(&self, why: &'static str) { debug!("re_stun: {}", why); - inc!(MagicsockMetrics, re_stun_calls); + self.metrics.magicsock.re_stun_calls.inc(); self.direct_addr_update_state.schedule_run(why); } @@ -1685,10 +1707,11 @@ impl Handle { insecure_skip_relay_cert_verify, #[cfg(any(test, feature = "test-utils"))] path_selection, + metrics, } = opts; #[cfg(not(wasm_browser))] - let actor_sockets = ActorSocketState::bind(addr_v4, addr_v6)?; + let actor_sockets = ActorSocketState::bind(addr_v4, addr_v6, metrics.portmapper.clone())?; #[cfg(not(wasm_browser))] let sockets = actor_sockets.msock_socket_state()?; @@ -1702,6 +1725,7 @@ impl Handle { dns_resolver.clone(), #[cfg(not(wasm_browser))] Some(ip_mapped_addrs.clone()), + metrics.net_report.clone(), )?; let (actor_sender, actor_receiver) = mpsc::channel(256); @@ -1713,9 +1737,9 @@ impl Handle { // load the node data let node_map = node_map.unwrap_or_default(); #[cfg(any(test, feature = "test-utils"))] - let node_map = NodeMap::load_from_vec(node_map, path_selection); + let node_map = NodeMap::load_from_vec(node_map, path_selection, &metrics.magicsock); #[cfg(not(any(test, feature = "test-utils")))] - let node_map = NodeMap::load_from_vec(node_map); + let node_map = NodeMap::load_from_vec(node_map, &metrics.magicsock); let secret_encryption_key = secret_ed_box(secret_key.secret()); @@ -1750,6 +1774,7 @@ impl Handle { #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify, discovery_subscribers: DiscoverySubscribers::new(), + metrics, }); let mut endpoint_config = quinn::EndpointConfig::default(); @@ -2329,8 +2354,12 @@ struct ActorSocketState { #[cfg(not(wasm_browser))] impl ActorSocketState { - fn bind(addr_v4: Option, addr_v6: Option) -> Result { - let port_mapper = portmapper::Client::default(); + fn bind( + addr_v4: Option, + addr_v6: Option, + metrics: Arc, + ) -> Result { + let port_mapper = portmapper::Client::with_metrics(Default::default(), metrics); let (v4, v6) = Self::bind_sockets(addr_v4, addr_v6)?; let this = Self { @@ -2432,7 +2461,7 @@ impl Actor { let mut portmap_watcher_closed = false; let mut link_change_closed = false; loop { - inc!(Metrics, actor_tick_main); + self.msock.metrics.magicsock.actor_tick_main.inc(); #[cfg(not(wasm_browser))] let portmap_watcher_changed = portmap_watcher.changed(); #[cfg(wasm_browser)] @@ -2447,21 +2476,21 @@ impl Actor { msg = self.msg_receiver.recv(), if !receiver_closed => { let Some(msg) = msg else { trace!("tick: magicsock receiver closed"); - inc!(Metrics, actor_tick_other); + self.msock.metrics.magicsock.actor_tick_other.inc(); receiver_closed = true; continue; }; trace!(?msg, "tick: msg"); - inc!(Metrics, actor_tick_msg); + self.msock.metrics.magicsock.actor_tick_msg.inc(); if self.handle_actor_message(msg).await { return Ok(()); } } tick = self.periodic_re_stun_timer.tick() => { trace!("tick: re_stun {:?}", tick); - inc!(Metrics, actor_tick_re_stun); + self.msock.metrics.magicsock.actor_tick_re_stun.inc(); self.msock.re_stun("periodic"); } change = portmap_watcher_changed, if !portmap_watcher_closed => { @@ -2469,14 +2498,14 @@ impl Actor { { if change.is_err() { trace!("tick: portmap watcher closed"); - inc!(Metrics, actor_tick_other); + self.msock.metrics.magicsock.actor_tick_other.inc(); portmap_watcher_closed = true; continue; } trace!("tick: portmap changed"); - inc!(Metrics, actor_tick_portmap_changed); + self.msock.metrics.magicsock.actor_tick_portmap_changed.inc(); let new_external_address = *portmap_watcher.borrow(); debug!("external address updated: {new_external_address:?}"); self.msock.re_stun("portmap_updated"); @@ -2491,7 +2520,7 @@ impl Actor { "tick: direct addr heartbeat {} direct addrs", self.msock.node_map.node_count(), ); - inc!(Metrics, actor_tick_direct_addr_heartbeat); + self.msock.metrics.magicsock.actor_tick_direct_addr_heartbeat.inc(); // TODO: this might trigger too many packets at once, pace this self.msock.node_map.prune_inactive(); @@ -2502,7 +2531,7 @@ impl Actor { _ = direct_addr_update_receiver.changed() => { let reason = *direct_addr_update_receiver.borrow(); trace!("tick: direct addr update receiver {:?}", reason); - inc!(Metrics, actor_tick_direct_addr_update_receiver); + self.msock.metrics.magicsock.actor_tick_direct_addr_update_receiver.inc(); if let Some(reason) = reason { self.refresh_direct_addrs(reason).await; } @@ -2510,14 +2539,14 @@ impl Actor { is_major = link_change_r.recv(), if !link_change_closed => { let Some(is_major) = is_major else { trace!("tick: link change receiver closed"); - inc!(Metrics, actor_tick_other); + self.msock.metrics.magicsock.actor_tick_other.inc(); link_change_closed = true; continue; }; trace!("tick: link change {}", is_major); - inc!(Metrics, actor_link_change); + self.msock.metrics.magicsock.actor_link_change.inc(); self.handle_network_change(is_major).await; } // Even if `discovery_events` yields `None`, it could begin to yield @@ -2631,7 +2660,7 @@ impl Actor { /// mistake to be made. #[instrument(level = "debug", skip_all)] async fn refresh_direct_addrs(&mut self, why: &'static str) { - inc!(MagicsockMetrics, update_direct_addrs); + self.msock.metrics.magicsock.update_direct_addrs.inc(); debug!("starting direct addr update ({})", why); #[cfg(not(wasm_browser))] @@ -2938,7 +2967,7 @@ impl Actor { let old_relay = self.msock.set_my_relay(relay_url.clone()); if let Some(ref relay_url) = relay_url { - inc!(MagicsockMetrics, relay_home_change); + self.msock.metrics.magicsock.relay_home_change.inc(); // On change, notify all currently connected relay servers and // start connecting to our home relay if we are not already. @@ -3238,16 +3267,16 @@ impl std::fmt::Display for NodeIdMappedAddr { } } -fn disco_message_sent(msg: &disco::Message) { +fn disco_message_sent(msg: &disco::Message, metrics: &MagicsockMetrics) { match msg { disco::Message::Ping(_) => { - inc!(MagicsockMetrics, sent_disco_ping); + metrics.sent_disco_ping.inc(); } disco::Message::Pong(_) => { - inc!(MagicsockMetrics, sent_disco_pong); + metrics.sent_disco_pong.inc(); } disco::Message::CallMeMaybe(_) => { - inc!(MagicsockMetrics, sent_disco_call_me_maybe); + metrics.sent_disco_call_me_maybe.inc(); } } } @@ -3421,6 +3450,7 @@ mod tests { #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), discovery_user_data: None, + metrics: Default::default(), } } } @@ -4032,6 +4062,7 @@ mod tests { server_config, insecure_skip_relay_cert_verify: true, path_selection: PathSelection::default(), + metrics: Default::default(), }; let msock = MagicSock::spawn(opts).await?; Ok(msock) @@ -4237,6 +4268,7 @@ mod tests { Source::NamedApp { name: "test".into(), }, + &msock_1.metrics.magicsock, ); let addr_2 = msock_1.get_mapping_addr(node_id_2).unwrap(); @@ -4281,6 +4313,7 @@ mod tests { Source::NamedApp { name: "test".into(), }, + &msock_1.metrics.magicsock, ); // We can now connect diff --git a/iroh/src/magicsock/metrics.rs b/iroh/src/magicsock/metrics.rs index 90b5ae9d471..b6d7fe5d44c 100644 --- a/iroh/src/magicsock/metrics.rs +++ b/iroh/src/magicsock/metrics.rs @@ -1,12 +1,12 @@ -use iroh_metrics::{ - core::{Counter, Metric}, - struct_iterable::Iterable, -}; +use iroh_metrics::{Counter, MetricsGroup}; +use serde::{Deserialize, Serialize}; /// Enum of metrics for the module +// TODO(frando): Add description doc strings for each metric. #[allow(missing_docs)] -#[derive(Debug, Clone, Iterable)] +#[derive(Debug, Default, Serialize, Deserialize, MetricsGroup)] #[non_exhaustive] +#[metrics(name = "magicsock")] pub struct Metrics { pub re_stun_calls: Counter, pub update_direct_addrs: Counter, @@ -80,81 +80,3 @@ pub struct Metrics { /// Number of connections with a successful handshake that became direct. pub connection_became_direct: Counter, } - -impl Default for Metrics { - fn default() -> Self { - Self { - num_relay_conns_added: Counter::new("num_relay_conns added"), - num_relay_conns_removed: Counter::new("num_relay_conns removed"), - - re_stun_calls: Counter::new("restun_calls"), - update_direct_addrs: Counter::new("update_endpoints"), - - // Sends (data or disco) - send_ipv4: Counter::new("send_ipv4"), - send_ipv6: Counter::new("send_ipv6"), - send_relay: Counter::new("send_relay"), - send_relay_error: Counter::new("send_relay_error"), - - // Data packets (non-disco) - send_data: Counter::new("send_data"), - send_data_network_down: Counter::new("send_data_network_down"), - recv_data_relay: Counter::new("recv_data_relay"), - recv_data_ipv4: Counter::new("recv_data_ipv4"), - recv_data_ipv6: Counter::new("recv_data_ipv6"), - recv_datagrams: Counter::new("recv_datagrams"), - recv_gro_datagrams: Counter::new("recv_gro_packets"), - - // Disco packets - send_disco_udp: Counter::new("disco_send_udp"), - send_disco_relay: Counter::new("disco_send_relay"), - sent_disco_udp: Counter::new("disco_sent_udp"), - sent_disco_relay: Counter::new("disco_sent_relay"), - sent_disco_ping: Counter::new("disco_sent_ping"), - sent_disco_pong: Counter::new("disco_sent_pong"), - sent_disco_call_me_maybe: Counter::new("disco_sent_callmemaybe"), - recv_disco_bad_key: Counter::new("disco_recv_bad_key"), - recv_disco_bad_parse: Counter::new("disco_recv_bad_parse"), - - recv_disco_udp: Counter::new("disco_recv_udp"), - recv_disco_relay: Counter::new("disco_recv_relay"), - recv_disco_ping: Counter::new("disco_recv_ping"), - recv_disco_pong: Counter::new("disco_recv_pong"), - recv_disco_call_me_maybe: Counter::new("disco_recv_callmemaybe"), - recv_disco_call_me_maybe_bad_disco: Counter::new("disco_recv_callmemaybe_bad_disco"), - - // How many times our relay home node DI has changed from non-zero to a different non-zero. - relay_home_change: Counter::new("relay_home_change"), - - num_direct_conns_added: Counter::new( - "number of direct connections to a peer we have added", - ), - num_direct_conns_removed: Counter::new( - "number of direct connections to a peer we have removed", - ), - - actor_tick_main: Counter::new("actor_tick_main"), - actor_tick_msg: Counter::new("actor_tick_msg"), - actor_tick_re_stun: Counter::new("actor_tick_re_stun"), - actor_tick_portmap_changed: Counter::new("actor_tick_portmap_changed"), - actor_tick_direct_addr_heartbeat: Counter::new("actor_tick_direct_addr_heartbeat"), - actor_tick_direct_addr_update_receiver: Counter::new( - "actor_tick_direct_addr_update_receiver", - ), - actor_link_change: Counter::new("actor_link_change"), - actor_tick_other: Counter::new("actor_tick_other"), - - nodes_contacted: Counter::new("nodes_contacted"), - nodes_contacted_directly: Counter::new("nodes_contacted_directly"), - - connection_handshake_success: Counter::new("connection_handshake_success"), - connection_became_direct: Counter::new("connection_became_direct"), - } - } -} - -impl Metric for Metrics { - fn name() -> &'static str { - "magicsock" - } -} diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index db01a71f111..af0634a13bb 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -6,7 +6,6 @@ use std::{ }; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; -use iroh_metrics::inc; use n0_future::time::Instant; use serde::{Deserialize, Serialize}; use stun_rs::TransactionId; @@ -16,9 +15,7 @@ use self::{ best_addr::ClearReason, node_state::{NodeState, Options, PingHandled}, }; -use super::{ - metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, NodeIdMappedAddr, -}; +use super::{metrics::Metrics, ActorMessage, DiscoMessageSource, NodeIdMappedAddr}; #[cfg(any(test, feature = "test-utils"))] use crate::endpoint::PathSelection; use crate::{ @@ -55,7 +52,7 @@ const MAX_INACTIVE_NODES: usize = 30; /// These come and go as the node moves around on the internet /// /// An index of nodeInfos by node key, NodeIdMappedAddr, and discovered ip:port endpoints. -#[derive(Default, Debug)] +#[derive(Debug, Default)] pub(super) struct NodeMap { inner: Mutex, } @@ -129,14 +126,18 @@ pub enum Source { impl NodeMap { #[cfg(not(any(test, feature = "test-utils")))] /// Create a new [`NodeMap`] from a list of [`NodeAddr`]s. - pub(super) fn load_from_vec(nodes: Vec) -> Self { - Self::from_inner(NodeMapInner::load_from_vec(nodes)) + pub(super) fn load_from_vec(nodes: Vec, metrics: &Metrics) -> Self { + Self::from_inner(NodeMapInner::load_from_vec(nodes, metrics)) } #[cfg(any(test, feature = "test-utils"))] /// Create a new [`NodeMap`] from a list of [`NodeAddr`]s. - pub(super) fn load_from_vec(nodes: Vec, path_selection: PathSelection) -> Self { - Self::from_inner(NodeMapInner::load_from_vec(nodes, path_selection)) + pub(super) fn load_from_vec( + nodes: Vec, + path_selection: PathSelection, + metrics: &Metrics, + ) -> Self { + Self::from_inner(NodeMapInner::load_from_vec(nodes, path_selection, metrics)) } fn from_inner(inner: NodeMapInner) -> Self { @@ -146,11 +147,11 @@ impl NodeMap { } /// Add the contact information for a node. - pub(super) fn add_node_addr(&self, node_addr: NodeAddr, source: Source) { + pub(super) fn add_node_addr(&self, node_addr: NodeAddr, source: Source, metrics: &Metrics) { self.inner .lock() .expect("poisoned") - .add_node_addr(node_addr, source) + .add_node_addr(node_addr, source, metrics) } /// Number of nodes currently listed. @@ -239,11 +240,12 @@ impl NodeMap { &self, sender: PublicKey, cm: CallMeMaybe, + metrics: &Metrics, ) -> Vec { self.inner .lock() .expect("poisoned") - .handle_call_me_maybe(sender, cm) + .handle_call_me_maybe(sender, cm, metrics) } #[allow(clippy::type_complexity)] @@ -251,6 +253,7 @@ impl NodeMap { &self, addr: NodeIdMappedAddr, have_ipv6: bool, + metrics: &Metrics, ) -> Option<( PublicKey, Option, @@ -261,7 +264,7 @@ impl NodeMap { let ep = inner.get_mut(NodeStateKey::NodeIdMappedAddr(addr))?; let public_key = *ep.public_key(); trace!(dest = %addr, node_id = %public_key.fmt_short(), "dst mapped to NodeId"); - let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6); + let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6, metrics); Some((public_key, udp_addr, relay_url, msgs)) } @@ -331,30 +334,34 @@ impl NodeMap { impl NodeMapInner { #[cfg(not(any(test, feature = "test-utils")))] /// Create a new [`NodeMap`] from a list of [`NodeAddr`]s. - fn load_from_vec(nodes: Vec) -> Self { + fn load_from_vec(nodes: Vec, metrics: &Metrics) -> Self { let mut me = Self::default(); for node_addr in nodes { - me.add_node_addr(node_addr, Source::Saved); + me.add_node_addr(node_addr, Source::Saved, metrics); } me } #[cfg(any(test, feature = "test-utils"))] /// Create a new [`NodeMap`] from a list of [`NodeAddr`]s. - fn load_from_vec(nodes: Vec, path_selection: PathSelection) -> Self { + fn load_from_vec( + nodes: Vec, + path_selection: PathSelection, + metrics: &Metrics, + ) -> Self { let mut me = Self { path_selection, ..Default::default() }; for node_addr in nodes { - me.add_node_addr(node_addr, Source::Saved); + me.add_node_addr(node_addr, Source::Saved, metrics); } me } /// Add the contact information for a node. #[instrument(skip_all, fields(node = %node_addr.node_id.fmt_short()))] - fn add_node_addr(&mut self, node_addr: NodeAddr, source: Source) { + fn add_node_addr(&mut self, node_addr: NodeAddr, source: Source, metrics: &Metrics) { let source0 = source.clone(); let node_id = node_addr.node_id; let relay_url = node_addr.relay_url.clone(); @@ -372,6 +379,7 @@ impl NodeMapInner { node_addr.relay_url.as_ref(), &node_addr.direct_addresses, source0, + metrics, ); let id = node_state.id(); for addr in node_addr.direct_addresses() { @@ -517,7 +525,12 @@ impl NodeMapInner { } #[must_use = "actions must be handled"] - fn handle_call_me_maybe(&mut self, sender: NodeId, cm: CallMeMaybe) -> Vec { + fn handle_call_me_maybe( + &mut self, + sender: NodeId, + cm: CallMeMaybe, + metrics: &Metrics, + ) -> Vec { let ns_id = NodeStateKey::NodeId(sender); if let Some(id) = self.get_id(ns_id.clone()) { for number in &cm.my_numbers { @@ -527,8 +540,8 @@ impl NodeMapInner { } match self.get_mut(ns_id) { None => { - inc!(MagicsockMetrics, recv_disco_call_me_maybe_bad_disco); debug!("received call-me-maybe: ignore, node is unknown"); + metrics.recv_disco_call_me_maybe_bad_disco.inc(); vec![] } Some(ns) => { @@ -712,6 +725,7 @@ mod tests { Source::NamedApp { name: "test".into(), }, + &Default::default(), ) } } @@ -757,7 +771,8 @@ mod tests { Some(addr) }) .collect(); - let loaded_node_map = NodeMap::load_from_vec(addrs.clone(), PathSelection::default()); + let loaded_node_map = + NodeMap::load_from_vec(addrs.clone(), PathSelection::default(), &Default::default()); let mut loaded: Vec = loaded_node_map .list_remote_infos(Instant::now()) diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index 1c3d5ff7e63..adbd155a791 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -6,7 +6,6 @@ use std::{ use data_encoding::HEXLOWER; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; -use iroh_metrics::inc; use iroh_relay::protos::stun; use n0_future::{ task::{self, AbortOnDropHandle}, @@ -160,10 +159,12 @@ impl NodeState { pub(super) fn new(id: usize, options: Options) -> Self { let quic_mapped_addr = NodeIdMappedAddr::generate(); - if options.relay_url.is_some() { - // we potentially have a relay connection to the node - inc!(MagicsockMetrics, num_relay_conns_added); - } + // TODO(frando): I don't think we need to track the `num_relay_conns_added` + // metric here. We do so in `Self::addr_for_send`. + // if options.relay_url.is_some() { + // // we potentially have a relay connection to the node + // inc!(MagicsockMetrics, num_relay_conns_added); + // } let now = Instant::now(); @@ -280,6 +281,7 @@ impl NodeState { &mut self, now: &Instant, have_ipv6: bool, + metrics: &MagicsockMetrics, ) -> (Option, Option) { #[cfg(any(test, feature = "test-utils"))] if self.path_selection == PathSelection::RelayOnly { @@ -316,7 +318,7 @@ impl NodeState { }; if !self.has_been_direct && matches!(&typ, ConnectionType::Direct(_)) { self.has_been_direct = true; - inc!(MagicsockMetrics, nodes_contacted_directly); + metrics.nodes_contacted_directly.inc(); } if let Ok(prev_typ) = self.conn_type.set(typ.clone()) { // The connection type has changed. @@ -332,27 +334,27 @@ impl NodeState { match (prev_typ, typ) { (ConnectionType::Relay(_), ConnectionType::Direct(_)) | (ConnectionType::Mixed(_, _), ConnectionType::Direct(_)) => { - inc!(MagicsockMetrics, num_direct_conns_added); - inc!(MagicsockMetrics, num_relay_conns_removed); + metrics.num_direct_conns_added.inc(); + metrics.num_relay_conns_removed.inc(); } (ConnectionType::Direct(_), ConnectionType::Relay(_)) | (ConnectionType::Direct(_), ConnectionType::Mixed(_, _)) => { - inc!(MagicsockMetrics, num_direct_conns_removed); - inc!(MagicsockMetrics, num_relay_conns_added); + metrics.num_direct_conns_removed.inc(); + metrics.num_relay_conns_added.inc(); } (ConnectionType::None, ConnectionType::Direct(_)) => { - inc!(MagicsockMetrics, num_direct_conns_added) + metrics.num_direct_conns_added.inc(); } (ConnectionType::Direct(_), ConnectionType::None) => { - inc!(MagicsockMetrics, num_direct_conns_removed) + metrics.num_direct_conns_removed.inc(); } (ConnectionType::None, ConnectionType::Relay(_)) | (ConnectionType::None, ConnectionType::Mixed(_, _)) => { - inc!(MagicsockMetrics, num_relay_conns_added) + metrics.num_relay_conns_added.inc(); } (ConnectionType::Relay(_), ConnectionType::None) | (ConnectionType::Mixed(_, _), ConnectionType::None) => { - inc!(MagicsockMetrics, num_relay_conns_removed) + metrics.num_relay_conns_removed.inc(); } _ => (), } @@ -663,16 +665,17 @@ impl NodeState { new_relay_url: Option<&RelayUrl>, new_addrs: &BTreeSet, source: super::Source, + metrics: &MagicsockMetrics, ) { if self.udp_paths.best_addr.is_empty() { // we do not have a direct connection, so changing the relay information may // have an effect on our connection status if self.relay_url.is_none() && new_relay_url.is_some() { // we did not have a relay connection before, but now we do - inc!(MagicsockMetrics, num_relay_conns_added) + metrics.num_relay_conns_added.inc(); } else if self.relay_url.is_some() && new_relay_url.is_none() { // we had a relay connection before but do not have one now - inc!(MagicsockMetrics, num_relay_conns_removed) + metrics.num_relay_conns_removed.inc(); } } @@ -1165,14 +1168,15 @@ impl NodeState { pub(crate) fn get_send_addrs( &mut self, have_ipv6: bool, + metrics: &MagicsockMetrics, ) -> (Option, Option, Vec) { let now = Instant::now(); let prev = self.last_used.replace(now); if prev.is_none() { // this is the first time we are trying to connect to this node - inc!(MagicsockMetrics, nodes_contacted); + metrics.nodes_contacted.inc(); } - let (udp_addr, relay_url) = self.addr_for_send(&now, have_ipv6); + let (udp_addr, relay_url) = self.addr_for_send(&now, have_ipv6, metrics); let mut ping_msgs = Vec::new(); if self.want_call_me_maybe(&now) { diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index f12b6442314..d603724fe19 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -41,7 +41,6 @@ use anyhow::{anyhow, Context, Result}; use backon::{Backoff, BackoffBuilder, ExponentialBuilder}; use bytes::{Bytes, BytesMut}; use iroh_base::{NodeId, PublicKey, RelayUrl, SecretKey}; -use iroh_metrics::{inc, inc_by}; use iroh_relay::{ self as relay, client::{Client, ReceivedMessage, SendMessage}, @@ -157,6 +156,7 @@ struct ActiveRelayActor { inactive_timeout: Pin>, /// Token indicating the [`ActiveRelayActor`] should stop. stop_token: CancellationToken, + metrics: Arc, } #[derive(Debug)] @@ -199,6 +199,7 @@ struct ActiveRelayActorOptions { relay_datagrams_recv: Arc, connection_opts: RelayConnectionOptions, stop_token: CancellationToken, + metrics: Arc, } /// Configuration needed to create a connection to a relay server. @@ -235,6 +236,7 @@ impl ActiveRelayActor { relay_datagrams_recv, connection_opts, stop_token, + metrics, } = opts; let relay_client_builder = Self::create_relay_builder(url.clone(), connection_opts); ActiveRelayActor { @@ -247,6 +249,7 @@ impl ActiveRelayActor { is_home_relay: false, inactive_timeout: Box::pin(time::sleep(RELAY_INACTIVE_CLEANUP_TIME)), stop_token, + metrics, } } @@ -285,7 +288,9 @@ impl ActiveRelayActor { /// /// Primarily switches between the dialing and connected states. async fn run(mut self) -> Result<()> { - inc!(MagicsockMetrics, num_relay_conns_added); + // TODO(frando): decide what this metric means, it's either wrong here or in node_state.rs. + // From the existing description, it is wrong here. + // self.metrics.num_relay_conns_added.inc(); let mut backoff = Self::build_backoff(); @@ -307,7 +312,9 @@ impl ActiveRelayActor { } } debug!("exiting"); - inc!(MagicsockMetrics, num_relay_conns_removed); + // TODO(frando): decide what this metric means, it's either wrong here or in node_state.rs. + // From the existing description, it is wrong here. + // self.metrics.num_relay_conns_removed.inc(); Ok(()) } @@ -573,18 +580,22 @@ impl ActiveRelayActor { &mut send_datagrams_buf, Vec::with_capacity(SEND_DATAGRAM_BATCH_SIZE), ); + // TODO(frando): can we avoid the clone here? + let metrics = self.metrics.clone(); let packet_iter = dgrams.into_iter().flat_map(|datagrams| { PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new( datagrams.remote_node, datagrams.datagrams.clone(), ) .map(|p| { - inc_by!(MagicsockMetrics, send_relay, p.payload.len() as _); - SendMessage::SendPacket(p.node_id, p.payload) + Ok(SendMessage::SendPacket(p.node_id, p.payload)) }) - .map(Ok) }); - let mut packet_stream = n0_future::stream::iter(packet_iter); + let mut packet_stream = n0_future::stream::iter(packet_iter).inspect(|m| { + if let Ok(SendMessage::SendPacket(_node_id, payload)) = m { + metrics.send_relay.inc_by(payload.len() as _); + } + }); let fut = client_sink.send_all(&mut packet_stream); self.run_sending(fut, &mut state, &mut client_stream).await?; } @@ -1047,6 +1058,7 @@ impl RelayActor { relay_datagrams_recv: self.relay_datagram_recv_queue.clone(), connection_opts, stop_token: self.cancel_token.child_token(), + metrics: self.msock.metrics.magicsock.clone(), }; let actor = ActiveRelayActor::new(opts); self.active_relay_tasks.spawn( @@ -1336,6 +1348,7 @@ mod tests { protocol: iroh_relay::http::Protocol::default(), }, stop_token, + metrics: Default::default(), }; let task = tokio::spawn(ActiveRelayActor::new(opts).run().instrument(span)); AbortOnDropHandle::new(task) diff --git a/iroh/src/metrics.rs b/iroh/src/metrics.rs index 477df0eeef0..f62f85a214c 100644 --- a/iroh/src/metrics.rs +++ b/iroh/src/metrics.rs @@ -1,7 +1,42 @@ //! Co-locating all of the iroh metrics structs +use std::sync::Arc; + +use iroh_metrics::MetricsGroupSet; #[cfg(feature = "test-utils")] pub use iroh_relay::server::Metrics as RelayMetrics; #[cfg(not(wasm_browser))] pub use portmapper::Metrics as PortmapMetrics; +use serde::{Deserialize, Serialize}; pub use crate::{magicsock::Metrics as MagicsockMetrics, net_report::Metrics as NetReportMetrics}; + +/// Metrics collected by an [`crate::endpoint::Endpoint`]. +/// +/// See [`crate::endpoint::Endpoint::metrics`] for details. +#[derive(Default, Debug, Clone, Serialize, Deserialize, MetricsGroupSet)] +#[metrics(name = "endpoint")] +#[non_exhaustive] +pub struct EndpointMetrics { + /// Metrics collected by the endpoint's socket. + pub magicsock: Arc, + /// Metrics collected by net reports. + pub net_report: Arc, + /// Metrics collected by the portmapper service. + #[cfg(not(wasm_browser))] + pub portmapper: Arc, +} + +#[cfg(test)] +mod tests { + use super::EndpointMetrics; + #[test] + fn test_serde() { + let metrics = EndpointMetrics::default(); + metrics.magicsock.actor_link_change.inc(); + metrics.net_report.reports.inc_by(10); + let encoded = postcard::to_stdvec(&metrics).unwrap(); + let decoded: EndpointMetrics = postcard::from_bytes(&encoded).unwrap(); + assert_eq!(decoded.magicsock.actor_link_change.get(), 1); + assert_eq!(decoded.net_report.reports.get(), 10); + } +} diff --git a/iroh/src/net_report.rs b/iroh/src/net_report.rs index 18a80928046..a2e1453a222 100644 --- a/iroh/src/net_report.rs +++ b/iroh/src/net_report.rs @@ -20,8 +20,6 @@ use std::{ use anyhow::{anyhow, Result}; use bytes::Bytes; use iroh_base::RelayUrl; -#[cfg(feature = "metrics")] -use iroh_metrics::inc; #[cfg(not(wasm_browser))] use iroh_relay::dns::DnsResolver; use iroh_relay::{protos::stun, RelayMap}; @@ -249,6 +247,7 @@ impl Client { #[cfg(not(wasm_browser))] port_mapper: Option, #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, #[cfg(not(wasm_browser))] ip_mapped_addrs: Option, + metrics: Arc, ) -> Result { let mut actor = Actor::new( #[cfg(not(wasm_browser))] @@ -257,6 +256,7 @@ impl Client { dns_resolver, #[cfg(not(wasm_browser))] ip_mapped_addrs, + metrics, )?; let addr = actor.addr(); let task = task::spawn( @@ -404,6 +404,7 @@ pub(crate) enum Message { #[derive(Debug, Clone)] pub struct Addr { sender: mpsc::Sender, + metrics: Arc, } impl Addr { @@ -425,8 +426,7 @@ impl Addr { payload, from_addr: src, }) { - #[cfg(feature = "metrics")] - inc!(Metrics, stun_packets_dropped); + self.metrics.stun_packets_dropped.inc(); warn!("dropping stun packet from {}", src); } } @@ -480,6 +480,7 @@ struct Actor { /// The [`IpMappedAddresses`] that allows you to do QAD in iroh #[cfg(not(wasm_browser))] ip_mapped_addrs: Option, + metrics: Arc, } impl Actor { @@ -491,6 +492,7 @@ impl Actor { #[cfg(not(wasm_browser))] port_mapper: Option, #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, #[cfg(not(wasm_browser))] ip_mapped_addrs: Option, + metrics: Arc, ) -> Result { // TODO: consider an instrumented flume channel so we have metrics. let (sender, receiver) = mpsc::channel(32); @@ -506,6 +508,7 @@ impl Actor { dns_resolver, #[cfg(not(wasm_browser))] ip_mapped_addrs, + metrics, }) } @@ -513,6 +516,7 @@ impl Actor { fn addr(&self) -> Addr { Addr { sender: self.sender.clone(), + metrics: self.metrics.clone(), } } @@ -596,17 +600,16 @@ impl Actor { self.reports.last = None; // causes ProbePlan::new below to do a full (initial) plan self.reports.next_full = false; self.reports.last_full = now; - #[cfg(feature = "metrics")] - inc!(Metrics, reports_full); + self.metrics.reports_full.inc(); } - #[cfg(feature = "metrics")] - inc!(Metrics, reports); + self.metrics.reports.inc(); let actor = reportgen::Client::new( self.addr(), self.reports.last.clone(), relay_map, protocols, + self.metrics.clone(), #[cfg(not(wasm_browser))] socket_state, ); @@ -645,10 +648,10 @@ impl Actor { #[cfg(feature = "metrics")] match &src { SocketAddr::V4(_) => { - inc!(Metrics, stun_packets_recv_ipv4); + self.metrics.stun_packets_recv_ipv4.inc(); } SocketAddr::V6(_) => { - inc!(Metrics, stun_packets_recv_ipv6); + self.metrics.stun_packets_recv_ipv6.inc(); } } @@ -1064,7 +1067,7 @@ mod tests { stun_utils::serve("127.0.0.1".parse().unwrap()).await?; let resolver = dns::tests::resolver(); - let mut client = Client::new(None, resolver.clone(), None)?; + let mut client = Client::new(None, resolver.clone(), None, Default::default())?; let dm = stun_utils::relay_map_of([stun_addr].into_iter()); // Note that the ProbePlan will change with each iteration. @@ -1111,7 +1114,7 @@ mod tests { // Now create a client and generate a report. let resolver = dns::tests::resolver(); - let mut client = Client::new(None, resolver.clone(), None)?; + let mut client = Client::new(None, resolver.clone(), None, Default::default())?; let r = client.get_report_all(dm, None, None, None).await?; let mut r: Report = (*r).clone(); @@ -1314,7 +1317,7 @@ mod tests { let resolver = dns::tests::resolver(); for mut tt in tests { println!("test: {}", tt.name); - let mut actor = Actor::new(None, resolver.clone(), None).unwrap(); + let mut actor = Actor::new(None, resolver.clone(), None, Default::default()).unwrap(); for s in &mut tt.steps { // trigger the timer tokio::time::advance(Duration::from_secs(s.after)).await; @@ -1349,7 +1352,7 @@ mod tests { dbg!(&dm); let resolver = dns::tests::resolver().clone(); - let mut client = Client::new(None, resolver, None)?; + let mut client = Client::new(None, resolver, None, Default::default())?; // Set up an external socket to send STUN requests from, this will be discovered as // our public socket address by STUN. We send back any packets received on this diff --git a/iroh/src/net_report/metrics.rs b/iroh/src/net_report/metrics.rs index e68daff12e9..1698c1d8ae7 100644 --- a/iroh/src/net_report/metrics.rs +++ b/iroh/src/net_report/metrics.rs @@ -1,41 +1,23 @@ -use iroh_metrics::{ - core::{Counter, Metric}, - struct_iterable::Iterable, -}; +use iroh_metrics::{Counter, MetricsGroup}; +use serde::{Deserialize, Serialize}; /// Enum of metrics for the module -#[allow(missing_docs)] -#[derive(Debug, Clone, Iterable)] +#[derive(Debug, Default, MetricsGroup, Serialize, Deserialize)] +#[metrics(name = "net_report")] +#[non_exhaustive] pub struct Metrics { + /// Incoming STUN packets dropped due to a full receiving queue. pub stun_packets_dropped: Counter, + /// Number of IPv4 STUN packets sent. pub stun_packets_sent_ipv4: Counter, + /// Number of IPv6 STUN packets sent. pub stun_packets_sent_ipv6: Counter, + /// Number of IPv4 STUN packets received. pub stun_packets_recv_ipv4: Counter, + /// Number of IPv6 STUN packets received. pub stun_packets_recv_ipv6: Counter, + /// Number of reports executed by net_report, including full reports. pub reports: Counter, + /// Number of full reports executed by net_report pub reports_full: Counter, } - -impl Default for Metrics { - fn default() -> Self { - Self { - stun_packets_dropped: Counter::new( - "Incoming STUN packets dropped due to a full receiving queue.", - ), - stun_packets_sent_ipv4: Counter::new("Number of IPv4 STUN packets sent"), - stun_packets_sent_ipv6: Counter::new("Number of IPv6 STUN packets sent"), - stun_packets_recv_ipv4: Counter::new("Number of IPv4 STUN packets received"), - stun_packets_recv_ipv6: Counter::new("Number of IPv6 STUN packets received"), - reports: Counter::new( - "Number of reports executed by net_report, including full reports", - ), - reports_full: Counter::new("Number of full reports executed by net_report"), - } - } -} - -impl Metric for Metrics { - fn name() -> &'static str { - "net_report" - } -} diff --git a/iroh/src/net_report/reportgen.rs b/iroh/src/net_report/reportgen.rs index 796f5694d49..54f5ad1c26e 100644 --- a/iroh/src/net_report/reportgen.rs +++ b/iroh/src/net_report/reportgen.rs @@ -27,8 +27,6 @@ use std::{ use anyhow::{anyhow, bail, Context as _, Result}; use iroh_base::RelayUrl; -#[cfg(feature = "metrics")] -use iroh_metrics::inc; #[cfg(not(wasm_browser))] use iroh_relay::dns::DnsResolver; use iroh_relay::{ @@ -53,9 +51,7 @@ use url::Host; #[cfg(wasm_browser)] use crate::net_report::portmapper; // We stub the library -#[cfg(feature = "metrics")] -use crate::net_report::Metrics; -use crate::net_report::{self, Report}; +use crate::net_report::{self, Metrics, Report}; #[cfg(not(wasm_browser))] use crate::net_report::{ defaults::timeouts::DNS_TIMEOUT, @@ -116,6 +112,7 @@ impl Client { last_report: Option>, relay_map: RelayMap, protocols: BTreeSet, + metrics: Arc, #[cfg(not(wasm_browser))] socket_state: SocketState, ) -> Self { let (msg_tx, msg_rx) = mpsc::channel(32); @@ -135,6 +132,7 @@ impl Client { socket_state, #[cfg(not(wasm_browser))] hairpin_actor: hairpin::Client::new(net_report, addr), + metrics, }; let task = task::spawn(async move { actor.run().await }.instrument(info_span!("reportgen.actor"))); @@ -215,6 +213,7 @@ struct Actor { /// The hairpin actor. #[cfg(not(wasm_browser))] hairpin_actor: hairpin::Client, + metrics: Arc, } impl Actor { @@ -615,12 +614,14 @@ impl Actor { #[cfg(not(wasm_browser))] let socket_state = self.socket_state.clone(); + let metrics = self.metrics.clone(); set.spawn( run_probe( reportstate, relay_node, probe.clone(), net_report, + metrics, #[cfg(not(wasm_browser))] pinger, #[cfg(not(wasm_browser))] @@ -754,6 +755,7 @@ async fn run_probe( relay_node: Arc, probe: Probe, net_report: net_report::Addr, + metrics: Arc, #[cfg(not(wasm_browser))] pinger: Pinger, #[cfg(not(wasm_browser))] socket_state: SocketState, ) -> Result { @@ -806,7 +808,7 @@ async fn run_probe( }; match maybe_sock { Some(sock) => { - result = run_stun_probe(sock, relay_addr, net_report, probe).await?; + result = run_stun_probe(sock, relay_addr, net_report, probe, &metrics).await?; } None => { return Err(ProbeError::AbortSet( @@ -885,6 +887,7 @@ async fn run_stun_probe( relay_addr: SocketAddr, net_report: net_report::Addr, probe: Probe, + metrics: &Metrics, ) -> Result { match probe.proto() { ProbeProto::StunIpv4 => debug_assert!(relay_addr.is_ipv4()), @@ -920,12 +923,10 @@ async fn run_stun_probe( if matches!(probe, Probe::StunIpv4 { .. }) { result.ipv4_can_send = true; - #[cfg(feature = "metrics")] - inc!(Metrics, stun_packets_sent_ipv4); + metrics.stun_packets_sent_ipv4.inc(); } else { result.ipv6_can_send = true; - #[cfg(feature = "metrics")] - inc!(Metrics, stun_packets_sent_ipv6); + metrics.stun_packets_sent_ipv6.inc(); } let (delay, addr) = stun_rx .await diff --git a/iroh/src/net_report/reportgen/hairpin.rs b/iroh/src/net_report/reportgen/hairpin.rs index 6729614baa9..7e62e3dd726 100644 --- a/iroh/src/net_report/reportgen/hairpin.rs +++ b/iroh/src/net_report/reportgen/hairpin.rs @@ -206,6 +206,7 @@ mod tests { let (net_report_tx, mut net_report_rx) = mpsc::channel(32); let net_report_addr = net_report::Addr { sender: net_report_tx, + metrics: Default::default(), }; let (reportstate_tx, mut reportstate_rx) = mpsc::channel(32); let reportstate_addr = reportgen::Addr { @@ -282,6 +283,7 @@ mod tests { let (net_report_tx, _net_report_rx) = mpsc::channel(32); let net_report_addr = net_report::Addr { sender: net_report_tx, + metrics: Default::default(), }; let (reportstate_tx, _reportstate_rx) = mpsc::channel(32); let reportstate_addr = reportgen::Addr {