diff --git a/Cargo.lock b/Cargo.lock index a3900e7..e4a67a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -326,6 +326,60 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "axum" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8" +dependencies = [ + "axum-core", + "bytes", + "form_urlencoded", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "az" version = "1.2.1" @@ -400,6 +454,7 @@ name = "bestool" version = "0.26.6" dependencies = [ "algae-cli", + "axum", "binstalk-downloader", "bitflags 2.8.0", "bitvec", @@ -436,6 +491,11 @@ dependencies = [ "mimalloc", "node-semver", "openssl-sys", + "opentelemetry", + "opentelemetry-prometheus", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "prometheus", "pulldown-cmark", "regex", "reqwest", @@ -2122,6 +2182,7 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -2801,6 +2862,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "maybe-owned" version = "0.3.4" @@ -3260,6 +3327,62 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.11", + "tracing", +] + +[[package]] +name = "opentelemetry-prometheus" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765a76ba13ec77043903322f85dc5434d7d01a37e75536d0f871ed7b9b5bbf0d" +dependencies = [ + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "prometheus", + "protobuf", + "tracing", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fb3a2f78c2d55362cd6c313b8abedfbc0142ab3c2676822068fd2ab7d51f9b7" + +[[package]] +name = "opentelemetry_sdk" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "http", + "opentelemetry", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror 2.0.11", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -3771,6 +3894,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "pulldown-cmark" version = "0.13.0" @@ -4517,6 +4661,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.8" @@ -5285,6 +5439,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] diff --git a/crates/bestool/Cargo.toml b/crates/bestool/Cargo.toml index 7ce780f..14c1951 100644 --- a/crates/bestool/Cargo.toml +++ b/crates/bestool/Cargo.toml @@ -16,6 +16,7 @@ repository = "https://github.com/beyondessential/bestool" [dependencies] algae-cli = { version = "1.0.6", path = "../algae-cli", optional = true } +axum = { version = "0.8.1", optional = true } binstalk-downloader = { version = "0.13.12", optional = true, features = [ "hickory-dns", ] } @@ -61,6 +62,11 @@ miette = { version = "7.5.0", features = ["fancy"] } mimalloc = "0.1.41" node-semver = { version = "2.2.0", optional = true } pulldown-cmark = { version = "0.13.0", optional = true } +opentelemetry = { version = "0.28.0", optional = true } +opentelemetry-prometheus = { version = "0.28.0", optional = true } +opentelemetry-semantic-conventions = { version = "0.28.0", optional = true, features = ["semconv_experimental"] } +opentelemetry_sdk = { version = "0.28.0", features = ["http", "metrics", "rt-tokio"], optional = true } +prometheus = { version = "0.13.4", optional = true } regex = { version = "1.10.6", optional = true } reqwest = { version = "0.12.11", features = [ "default-tls", @@ -111,6 +117,7 @@ default = [ "caddy", "completions", "crypto", + "observability", "self-update", "ssh", "tamanu", @@ -128,6 +135,14 @@ download = [ caddy = ["download", "dep:tera"] completions = ["dep:clap_complete", "dep:clap_complete_nushell"] crypto = ["dep:algae-cli", "dep:blake3", "dep:merkle_hash"] +observability = [ + "dep:axum", + "dep:opentelemetry", + "dep:opentelemetry-prometheus", + "dep:opentelemetry-semantic-conventions", + "dep:opentelemetry_sdk", + "dep:prometheus" +] self-update = ["download", "dep:upgrade", "dep:windows-env"] ssh = [ "dep:dirs", diff --git a/crates/bestool/src/actions.rs b/crates/bestool/src/actions.rs index ab8e90c..e85fcb5 100644 --- a/crates/bestool/src/actions.rs +++ b/crates/bestool/src/actions.rs @@ -58,6 +58,8 @@ subcommands! { crypto => Crypto(CryptoArgs), #[cfg(feature = "__iti")] iti => Iti(ItiArgs), + #[cfg(feature = "observability")] + observe => Observe(ObserveArgs), #[cfg(feature = "self-update")] self_update => SelfUpdate(SelfUpdateArgs), #[cfg(feature = "ssh")] diff --git a/crates/bestool/src/actions/observe.rs b/crates/bestool/src/actions/observe.rs new file mode 100644 index 0000000..42874c6 --- /dev/null +++ b/crates/bestool/src/actions/observe.rs @@ -0,0 +1,201 @@ +use std::net::SocketAddr; + +use axum::{body::Body, extract::State, response::Response, routing::get, Router}; +use clap::Parser; +use miette::{IntoDiagnostic, Result}; +use opentelemetry::{ + global, + metrics::{Counter, Gauge, UpDownCounter}, + KeyValue, +}; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_semantic_conventions as semantics; +use prometheus::{Encoder, TextEncoder}; +use reqwest::StatusCode; +use sysinfo::{Disks, Networks}; +use tokio::net::TcpListener; +use tracing::{debug, info}; + +use super::Context; + +/// Collect metrics as a daemon. +#[derive(Debug, Clone, Parser)] +pub struct ObserveArgs {} + +pub async fn run(ctx: Context) -> Result<()> { + let app = Router::new() + .route("/metrics", get(metrics)) + .with_state(AppState::new()?); + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + info!(?addr, "Listening on"); + + let listener = TcpListener::bind(addr).await.into_diagnostic()?; + + axum::serve(listener, app.into_make_service()) + .await + .into_diagnostic()?; + + Ok(()) +} + +#[derive(Clone)] +struct AppState { + registry: prometheus::Registry, + memory_usage: UpDownCounter, + memory_utilization: Gauge, + disk_io: Counter, + network_packets: Counter, + network_errors: Counter, + network_io: Counter, +} + +impl AppState { + fn new() -> Result { + let registry = prometheus::Registry::new(); + let metric_reader = opentelemetry_prometheus::exporter() + .with_registry(registry.clone()) + .build() + .into_diagnostic()?; + + let metrics = SdkMeterProvider::builder() + .with_reader(metric_reader) + .build(); + opentelemetry::global::set_meter_provider(metrics.clone()); + + let meter = global::meter("bestool"); + + let memory_usage = meter + .i64_up_down_counter(semantics::metric::SYSTEM_MEMORY_USAGE) + .with_unit("By") + .build(); + + let memory_utilization = meter + .f64_gauge(semantics::metric::SYSTEM_MEMORY_UTILIZATION) + .with_unit("1") + .build(); + + let disk_io = meter + .u64_counter(semantics::metric::SYSTEM_DISK_IO) + .with_unit("By") + .build(); + + let network_packets = meter + .u64_counter(semantics::metric::SYSTEM_NETWORK_PACKETS) + .with_unit("{packet}") + .build(); + + let network_errors = meter + .u64_counter(semantics::metric::SYSTEM_NETWORK_ERRORS) + .with_unit("{error}") + .build(); + + let network_io = meter + .u64_counter(semantics::metric::SYSTEM_NETWORK_IO) + .with_unit("By") + .build(); + + Ok(AppState { + registry, + memory_usage, + memory_utilization, + disk_io, + network_packets, + network_errors, + network_io, + }) + } +} + +async fn metrics( + State(AppState { + registry, + memory_usage, + memory_utilization, + disk_io, + network_packets, + network_errors, + network_io, + }): State, +) -> Result { + debug!("get metrics"); + let sysinfo = sysinfo::System::new_all(); + + let used = sysinfo.used_memory(); + memory_usage.add( + used as _, + &[KeyValue::new( + semantics::attribute::SYSTEM_MEMORY_STATE, + "used", + )], + ); + let utilization = used as f64 / sysinfo.total_memory() as f64 * 100.0; + memory_utilization.record(utilization, &[]); + + for disk in Disks::new_with_refreshed_list().list() { + let usage = disk.usage(); + let direction_read = KeyValue::new(semantics::attribute::DISK_IO_DIRECTION, "read"); + let direction_write = KeyValue::new(semantics::attribute::DISK_IO_DIRECTION, "write"); + disk_io.add( + usage.read_bytes, + &[ + direction_read, + KeyValue::new( + semantics::attribute::DEVICE_ID, + disk.name().to_string_lossy().into_owned(), + ), + ], + ); + disk_io.add( + usage.written_bytes, + &[ + direction_write, + KeyValue::new( + semantics::attribute::DEVICE_ID, + disk.name().to_string_lossy().into_owned(), + ), + ], + ); + } + + for (name, network) in Networks::new_with_refreshed_list().list() { + let direction_receive = + KeyValue::new(semantics::attribute::NETWORK_IO_DIRECTION, "receive"); + let direction_transmit = + KeyValue::new(semantics::attribute::NETWORK_IO_DIRECTION, "transmit"); + let device = KeyValue::new(semantics::attribute::SYSTEM_DEVICE, name.clone()); + let interface_name = + KeyValue::new(semantics::attribute::NETWORK_INTERFACE_NAME, name.clone()); + network_packets.add( + network.packets_received(), + &[direction_receive.clone(), device.clone()], + ); + network_packets.add( + network.packets_transmitted(), + &[direction_transmit.clone(), device], + ); + network_errors.add( + network.errors_on_received(), + &[direction_receive.clone(), interface_name.clone()], + ); + network_errors.add( + network.errors_on_transmitted(), + &[direction_transmit.clone(), interface_name.clone()], + ); + network_io.add( + network.received(), + &[direction_receive, interface_name.clone()], + ); + network_io.add(network.transmitted(), &[direction_transmit, interface_name]); + } + + let encoder = TextEncoder::new(); + let metric_families = registry.gather(); + + let mut buf = Vec::new(); + let res = encoder.encode(&metric_families, &mut buf); + if let Err(e) = res { + return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())); + } + + Ok(Response::new(Body::from(buf))) +}