diff --git a/Cargo.lock b/Cargo.lock index 6b9682b24..a43828e54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1221,6 +1221,7 @@ dependencies = [ "job-queue", "job-queue-libsql", "multibase", + "network-monitor", "network-scanner", "ngrok", "nonempty", @@ -1241,6 +1242,7 @@ dependencies = [ "smol_str", "sysinfo", "tap", + "tempdir", "terminal-streamer", "thiserror 2.0.12", "time", @@ -1775,6 +1777,12 @@ dependencies = [ "libc", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "funty" version = "2.0.0" @@ -3635,6 +3643,25 @@ dependencies = [ "winapi", ] +[[package]] +name = "network-monitor" +version = "0.0.0" +dependencies = [ + "anyhow", + "camino", + "network-scanner", + "network-scanner-net", + "serde", + "serde_json", + "tempdir", + "thiserror 1.0.69", + "time", + "tokio 1.46.1", + "tokio-test", + "tokio-util", + "tracing", +] + [[package]] name = "network-scanner" version = "0.0.0" @@ -4807,6 +4834,19 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.8.5" @@ -4848,6 +4888,21 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.6.4" @@ -4884,6 +4939,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.5.13" @@ -4966,6 +5030,15 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + [[package]] name = "reqwest" version = "0.12.22" @@ -5946,6 +6019,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand 0.4.6", + "remove_dir_all", +] + [[package]] name = "tempfile" version = "3.20.0" diff --git a/crates/network-monitor/Cargo.toml b/crates/network-monitor/Cargo.toml new file mode 100644 index 000000000..d0fcfebaf --- /dev/null +++ b/crates/network-monitor/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "network-monitor" +version = "0.0.0" +edition = "2024" +authors = ["Devolutions Inc. "] +publish = false + +[dependencies] +network-scanner = { path = "../network-scanner" } +network-scanner-net = { path = "../network-scanner-net" } +serde = { version = "1", features = ["derive"] } +camino = { version = "1", features = ["serde1"] } +serde_json = "1" +thiserror = "1" +time = { version = "0.3", features = ["serde"] } +tokio = { version = "1.45", features= ["macros", "sync"] } +tokio-util = { version = "0.7" } +anyhow = "1" +tracing = "0.1" + +[dev-dependencies] +tempdir = "0.3" +tokio-test = "0.4" + +[lints] +workspace = true diff --git a/crates/network-monitor/src/lib.rs b/crates/network-monitor/src/lib.rs new file mode 100644 index 000000000..305bd10d3 --- /dev/null +++ b/crates/network-monitor/src/lib.rs @@ -0,0 +1,220 @@ +use std::collections::{HashMap, HashSet, VecDeque}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; +use std::{fs, io, mem}; + +use anyhow::anyhow; +use camino::Utf8PathBuf; +use network_scanner_net::runtime::Socket2Runtime; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use time::UtcDateTime; +use tokio::select; +use tokio_util::sync::CancellationToken; +use tracing::warn; + +use network_scanner::ping; + +mod log_queue; +mod state; + +pub use crate::state::State; + +#[derive(Error, Debug)] +#[error(transparent)] +pub enum SetConfigError { + Io(#[from] io::Error), + Serde(#[from] serde_json::Error), + Other(#[from] anyhow::Error), +} + +pub async fn set_config(config: MonitorsConfig, state: Arc) -> Result<(), SetConfigError> { + let file = fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&state.cache_path)?; + + let mut config_write = state.config.write().map_err(|_| anyhow!("config lock poisoned"))?; + + serde_json::to_writer_pretty(&file, &config)?; + + let old_config = mem::replace(&mut *config_write, config); + + let new_config_set: HashSet = config_write.monitors.clone().into_iter().collect(); + let old_config_set: HashSet = old_config.monitors.into_iter().collect(); + + drop(config_write); + + let added = new_config_set.difference(&old_config_set).cloned(); + let deleted = old_config_set.difference(&new_config_set); + + let (new_cancellation_tokens, new_monitors): ( + Vec<(String, CancellationToken)>, + Vec + Send>>>, + ) = added + .map(|definition| { + let cancellation_token = CancellationToken::new(); + let cancellation_monitor = cancellation_token.clone(); + + let state = Arc::clone(&state); + let definition_id = definition.id.clone(); + + let monitor = async move { + loop { + let start_time = UtcDateTime::now(); + + let monitor_result = match &definition.probe { + ProbeType::Ping => { + let scanner_runtime = match &*state.scanner_runtime { + Ok(scanner_runtime) => Arc::clone(scanner_runtime), + Err(error) => { + warn!(error = %error, monitor_id = definition.id, "scanning runtime failed to start, aborting monitor"); + break; + }, + }; + do_ping_monitor(&definition, scanner_runtime).await + }, + ProbeType::TcpOpen => do_tcpopen_monitor(&definition).await, + }; + + state.log.write(monitor_result); + + let elapsed = UtcDateTime::now() - start_time; + let next_run_in = + (definition.interval as f64 - elapsed.as_seconds_f64()).clamp(1.0, f64::INFINITY); + select! { + _ = cancellation_monitor.cancelled() => { return } + _ = tokio::time::sleep(Duration::from_secs_f64(next_run_in)) => { } + }; + } + }; + + ( + (definition_id, cancellation_token), + Box::pin(monitor) as Pin + Send>>, + ) + }) + .unzip(); + + let mut cancellation_tokens_write = state + .cancellation_tokens + .lock() + .map_err(|_| anyhow!("cancellation token lock poisoned"))?; + + for definition in deleted { + cancellation_tokens_write[&definition.id].cancel(); + cancellation_tokens_write.remove(&definition.id); + } + + for (monitor_id, cancellation_token) in new_cancellation_tokens { + cancellation_tokens_write.insert(monitor_id, cancellation_token); + } + + for monitoring_task in new_monitors { + tokio::spawn(monitoring_task); + } + + Ok(()) +} + +async fn do_ping_monitor(definition: &MonitorDefinition, scanner_runtime: Arc) -> MonitorResult { + let start_time = UtcDateTime::now(); + + let ping_result = async || -> anyhow::Result { + ping::ping_addr( + scanner_runtime, + format!("{hostname}:0", hostname = definition.address), + Duration::from_secs(definition.timeout), + ) + .await?; + // TODO: send more than 1 ping packet + + Ok(UtcDateTime::now() - start_time) + }() + .await; + + match ping_result { + Ok(time) => MonitorResult { + monitor_id: definition.id.clone(), + request_start_time: start_time, + response_success: true, + response_messages: None, + response_time: time.as_seconds_f64(), + }, + Err(error) => MonitorResult { + monitor_id: definition.id.clone(), + request_start_time: start_time, + response_success: false, + response_messages: Some(format!("{error:#}")), + response_time: f64::INFINITY, + }, + } +} + +async fn do_tcpopen_monitor(definition: &MonitorDefinition) -> MonitorResult { + MonitorResult { + monitor_id: definition.id.clone(), + request_start_time: UtcDateTime::now(), + response_success: false, + response_messages: Some("not implemented".into()), + response_time: f64::INFINITY, + } +} + +pub fn drain_log(state: Arc) -> VecDeque { + state.log.drain() +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MonitorsConfig { + pub monitors: Vec, +} + +impl MonitorsConfig { + fn empty() -> MonitorsConfig { + MonitorsConfig { monitors: Vec::new() } + } + + #[doc(hidden)] + fn mock() -> MonitorsConfig { + MonitorsConfig { + monitors: vec![MonitorDefinition { + id: "a".to_owned(), + probe: ProbeType::Ping, + address: "c".to_owned(), + interval: 1, + timeout: 2, + port: Some(3), + }], + } + } +} + +#[derive(Eq, PartialEq, Hash, Clone, Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub enum ProbeType { + Ping, + TcpOpen, +} + +#[derive(Eq, PartialEq, Hash, Clone, Serialize, Deserialize, Debug)] +pub struct MonitorDefinition { + pub id: String, + pub probe: ProbeType, + pub address: String, + pub interval: u64, + pub timeout: u64, + pub port: Option, +} + +#[derive(PartialEq, Clone, Debug)] +pub struct MonitorResult { + pub monitor_id: String, + pub request_start_time: UtcDateTime, + pub response_success: bool, + pub response_messages: Option, + pub response_time: f64, +} diff --git a/crates/network-monitor/src/log_queue.rs b/crates/network-monitor/src/log_queue.rs new file mode 100644 index 000000000..313c2d0f2 --- /dev/null +++ b/crates/network-monitor/src/log_queue.rs @@ -0,0 +1,60 @@ +use std::collections::VecDeque; +use std::mem; +use std::sync::Mutex; + +/// Thread-safe VecDeque which can be drained (consumed until empty) without blocking writers. +pub(crate) struct LogQueue { + entries: Mutex>, +} + +impl LogQueue { + pub(crate) fn new() -> Self { + LogQueue { + entries: Mutex::new(VecDeque::new()), + } + } + + pub(crate) fn write(&self, data: T) { + let mut entries = self.entries.lock().expect("poisoned"); + + entries.push_back(data); + } + + pub(crate) fn drain(&self) -> VecDeque { + let mut entries = self.entries.lock().expect("poisoned"); + + return mem::replace(&mut entries, VecDeque::new()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn write_and_drain_returns_logged_value() { + // Given + let log: LogQueue = LogQueue::new(); + log.write(String::from("hey")); + + // When + let result = log.drain(); + + // Then + assert!(result.len() == 1 && result.front().unwrap() == "hey"); + } + + #[test] + fn write_and_drain_clears_log() { + // Given + let log: LogQueue = LogQueue::new(); + log.write(String::from("hey")); + + // When + _ = log.drain(); + + // Then + let result = log.drain(); + assert!(result.len() == 0); + } +} diff --git a/crates/network-monitor/src/state.rs b/crates/network-monitor/src/state.rs new file mode 100644 index 000000000..d17d59c0b --- /dev/null +++ b/crates/network-monitor/src/state.rs @@ -0,0 +1,39 @@ +use std::sync::LazyLock; + +use std::sync::{Mutex, RwLock}; + +use network_scanner_net::runtime::Socket2Runtime; + +use crate::log_queue::*; +use crate::*; + +pub struct State { + pub(crate) cache_path: Utf8PathBuf, + pub(crate) log: LogQueue, + pub(crate) config: RwLock, + pub(crate) cancellation_tokens: Mutex>, + pub(crate) scanner_runtime: LazyLock>>, +} + +impl State { + pub fn new(cache_path: Utf8PathBuf) -> State { + State { + cache_path: cache_path, + log: LogQueue::new(), + config: RwLock::new(MonitorsConfig::empty()), + cancellation_tokens: Mutex::new(HashMap::new()), + scanner_runtime: LazyLock::new(|| Socket2Runtime::new(None)), + } + } + + #[doc(hidden)] + pub fn mock(cache_path: Utf8PathBuf) -> State { + State { + cache_path: cache_path, + log: LogQueue::new(), + config: RwLock::new(MonitorsConfig::mock()), + cancellation_tokens: Mutex::new(HashMap::new()), + scanner_runtime: LazyLock::new(|| Socket2Runtime::new(None)), + } + } +} diff --git a/crates/network-scanner-proto/src/icmp_v6.rs b/crates/network-scanner-proto/src/icmp_v6.rs new file mode 100644 index 000000000..8f9f0ec16 --- /dev/null +++ b/crates/network-scanner-proto/src/icmp_v6.rs @@ -0,0 +1,139 @@ +#[repr(u8)] +pub enum Icmpv6MessageType { + Unreachable = 1, + PacketTooBig = 2, + TimeExceeded = 3, + ParameterProblem = 4, + EchoRequest = 128, + EchoReply = 129, +} + +#[repr(u8)] +pub enum Icmpv6UnreacheableCode { + NoRoute = 0, + AdmProhibited = 1, + NotNeighbour = 2, + AddrUnreach = 3, + PortUnreach = 4, + PolicyFail = 5, + RejectRoute = 6, +} + +#[repr(u8)] +pub enum Icmpv6TimeExceededCode { + HopLimitExceeded = 0, + FragmentReassemblyTimeout = 1, +} + +#[repr(u8)] +pub enum Icmpv6ParameterProblemCode { + ErroneousHeaderField = 0, + UnrecognizedNextHeaderType = 1, + UnrecognizedIpv6HeaderOption = 2, +} + +pub enum Icmpv6Message { + Unreachable { + code: Icmpv6UnreacheableCode, + original_packet: Vec, + }, + PacketTooBig { + mtu: u32, + original_packet: Vec, + }, + TimeExceeded { + code: Icmpv6TimeExceededCode, + original_packet: Vec, + }, + ParameterProblem { + code: Icmpv6ParameterProblemCode, + pointer: u32, + original_packet: Vec, + }, + EchoRequest { + identifier: u16, + sequence_number: u16, + payload: Vec, + }, + EchoReply { + identifier: u16, + sequence_number: u16, + payload: Vec, + }, +} + +impl Icmpv6Message { + pub fn encode(self) -> Vec { + let mut bytes = Vec::new(); + + bytes.push(self.get_type() as u8); + + match self { + Icmpv6Message::Unreachable { code, original_packet } => { + bytes.push(code as u8); + bytes.extend(vec![0; 2]); // checksum placeholder + bytes.extend(vec![0; 4]); // unused + bytes.extend(original_packet); + } + Icmpv6Message::PacketTooBig { mtu, original_packet } => { + bytes.push(0); // code + bytes.extend(vec![0; 2]); // checksum placeholder + bytes.extend(mtu.to_be_bytes()); + bytes.extend(original_packet); + } + Icmpv6Message::TimeExceeded { code, original_packet } => { + bytes.push(code as u8); + bytes.extend(vec![0; 2]); // checksum placeholder + bytes.extend(vec![0; 4]); // unused + bytes.extend(original_packet); + } + Icmpv6Message::ParameterProblem { + code, + pointer, + original_packet, + } => { + bytes.push(code as u8); + bytes.extend(vec![0; 2]); // checksum placeholder + bytes.extend(pointer.to_be_bytes()); + bytes.extend(original_packet); + } + Icmpv6Message::EchoRequest { + identifier, + sequence_number, + payload, + } => { + bytes.push(0); // code + bytes.extend(vec![0; 2]); // checksum placeholder + bytes.extend(identifier.to_be_bytes()); + bytes.extend(sequence_number.to_be_bytes()); + bytes.extend(payload); + } + Icmpv6Message::EchoReply { + identifier, + sequence_number, + payload, + } => { + bytes.push(0); // code + bytes.extend(vec![0; 2]); // checksum placeholder + bytes.extend(identifier.to_be_bytes()); + bytes.extend(sequence_number.to_be_bytes()); + bytes.extend(payload); + } + }; + + bytes + } +} + +impl Icmpv6Message { + fn get_type(&self) -> Icmpv6MessageType { + match self { + Self::Unreachable { .. } => Icmpv6MessageType::Unreachable, + Self::PacketTooBig { .. } => Icmpv6MessageType::PacketTooBig, + Self::TimeExceeded { .. } => Icmpv6MessageType::TimeExceeded, + Self::ParameterProblem { .. } => Icmpv6MessageType::ParameterProblem, + Self::EchoRequest { .. } => Icmpv6MessageType::EchoRequest, + Self::EchoReply { .. } => Icmpv6MessageType::EchoReply, + } + } +} diff --git a/crates/network-scanner-proto/src/lib.rs b/crates/network-scanner-proto/src/lib.rs index 6ddd3fe51..d8a2be75d 100644 --- a/crates/network-scanner-proto/src/lib.rs +++ b/crates/network-scanner-proto/src/lib.rs @@ -1,2 +1,3 @@ pub mod icmp_v4; +pub mod icmp_v6; pub mod netbios; diff --git a/crates/network-scanner/src/broadcast/asynchronous.rs b/crates/network-scanner/src/broadcast/asynchronous.rs index 2f9ffc4d7..6abf7b71f 100644 --- a/crates/network-scanner/src/broadcast/asynchronous.rs +++ b/crates/network-scanner/src/broadcast/asynchronous.rs @@ -8,7 +8,7 @@ use network_scanner_net::runtime; use network_scanner_proto::icmp_v4; use socket2::SockAddr; -use crate::create_echo_request; +use crate::create_v4_echo_request; use super::BroadcastEvent; @@ -30,7 +30,7 @@ pub async fn broadcast( socket.set_broadcast(true)?; // skip verification, we are not interested in the response - let (packet, _) = create_echo_request()?; + let (packet, _) = create_v4_echo_request()?; let (sender, receiver) = tokio::sync::mpsc::channel(255); task_manager.spawn_no_sub_task(async move { diff --git a/crates/network-scanner/src/broadcast/blocking.rs b/crates/network-scanner/src/broadcast/blocking.rs index 432b928ed..ffe4bb3d6 100644 --- a/crates/network-scanner/src/broadcast/blocking.rs +++ b/crates/network-scanner/src/broadcast/blocking.rs @@ -6,7 +6,7 @@ use anyhow::Context; use network_scanner_proto::icmp_v4; -use crate::create_echo_request; +use crate::create_v4_echo_request; use super::BroadcastResponseEntry; @@ -70,7 +70,7 @@ pub fn block_broadcast(ip: Ipv4Addr, read_time_out: Option) -> anyhow: let addr = SocketAddr::new(ip.into(), 0); - let (packet, verifier) = create_echo_request()?; + let (packet, verifier) = create_v4_echo_request()?; trace!(?packet, "Sending packet"); socket diff --git a/crates/network-scanner/src/lib.rs b/crates/network-scanner/src/lib.rs index 083fd8b0f..e38fddebe 100644 --- a/crates/network-scanner/src/lib.rs +++ b/crates/network-scanner/src/lib.rs @@ -53,7 +53,7 @@ pub(crate) unsafe fn assume_init(buf: &[MaybeUninit]) -> &[u8] { unsafe { &*(buf as *const [MaybeUninit] as *const [u8]) } } -pub(crate) fn create_echo_request() -> anyhow::Result<(icmp_v4::Icmpv4Packet, Vec)> { +pub(crate) fn create_v4_echo_request() -> anyhow::Result<(icmp_v4::Icmpv4Packet, Vec)> { let time = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .with_context(|| "failed to get current time")? diff --git a/crates/network-scanner/src/ping.rs b/crates/network-scanner/src/ping.rs index 4f9a0e701..67de77895 100644 --- a/crates/network-scanner/src/ping.rs +++ b/crates/network-scanner/src/ping.rs @@ -1,5 +1,6 @@ +use std::default; use std::mem::MaybeUninit; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs}; use std::sync::Arc; use std::time::Duration; @@ -7,9 +8,10 @@ use anyhow::Context; use network_scanner_net::runtime::Socket2Runtime; use network_scanner_net::socket::AsyncRawSocket; use network_scanner_proto::icmp_v4; +use network_scanner_proto::icmp_v6::Icmpv6Message; use tokio::time::timeout; -use crate::create_echo_request; +use crate::create_v4_echo_request; use crate::ip_utils::IpAddrRange; #[derive(Debug, Clone)] @@ -109,23 +111,60 @@ pub fn ping_range( Ok(receiver) } +pub async fn ping_addr( + runtime: Arc, + addr: impl ToSocketAddrs, + duration: Duration, +) -> anyhow::Result<()> { + let socket_addr = addr.to_socket_addrs()?.next().context("Hostname not found")?; //TODO return proper error + let socket2_sockaddr: socket2::SockAddr = socket_addr.into(); + + let socket = runtime.new_socket( + socket2_sockaddr.domain(), + socket2::Type::RAW, + match socket_addr { + SocketAddr::V4(_) => Some(socket2::Protocol::ICMPV4), + SocketAddr::V6(_) => Some(socket2::Protocol::ICMPV6), + }, + )?; + + timeout(duration, try_ping(socket2_sockaddr, socket)).await? +} + pub async fn ping(runtime: Arc, ip: impl Into, duration: Duration) -> anyhow::Result<()> { + let socket_addr = SocketAddr::new(ip.into(), 0); + let socket2_sockaddr: socket2::SockAddr = socket_addr.into(); + let socket = runtime.new_socket( - socket2::Domain::IPV4, + socket2_sockaddr.domain(), socket2::Type::RAW, - Some(socket2::Protocol::ICMPV4), + match socket_addr { + SocketAddr::V4(_) => Some(socket2::Protocol::ICMPV4), + SocketAddr::V6(_) => Some(socket2::Protocol::ICMPV6), + }, )?; - let addr = SocketAddr::new(ip.into(), 0); - timeout(duration, try_ping(addr.into(), socket)).await? + + timeout(duration, try_ping(socket2_sockaddr, socket)).await? } async fn try_ping(addr: socket2::SockAddr, mut socket: AsyncRawSocket) -> anyhow::Result<()> { // skip verification, we are not interested in the response - let (packet, _) = create_echo_request()?; - let packet_bytes = packet.to_bytes(true); + let (packet, _) = create_v4_echo_request()?; + + let packet_bytes = match addr.domain() { + socket2::Domain::IPV4 => create_v4_echo_request()?.0.to_bytes(true), + socket2::Domain::IPV6 => Icmpv6Message::EchoRequest { + identifier: 42, + sequence_number: 0, + payload: vec![42; 32], + } + .encode(), + _ => return Err(anyhow::anyhow!("Can't ping a unix socket")), + }; socket.send_to(&packet_bytes, &addr).await?; + // TODO: because this is a raw socket, packets indicating failure will reach us. we need to check the response code let mut buffer = [MaybeUninit::uninit(); icmp_v4::ICMPV4_MTU]; socket.recv_from(&mut buffer).await?; Ok(()) @@ -140,7 +179,7 @@ pub fn blocking_ping(ip: Ipv4Addr) -> anyhow::Result<()> { let addr = SocketAddr::new(ip.into(), 0); - let (packet, _) = create_echo_request()?; + let (packet, _) = create_v4_echo_request()?; socket .send_to(&packet.to_bytes(true), &addr.into()) diff --git a/devolutions-gateway/Cargo.toml b/devolutions-gateway/Cargo.toml index 3a9c9e357..1953bb073 100644 --- a/devolutions-gateway/Cargo.toml +++ b/devolutions-gateway/Cargo.toml @@ -37,6 +37,7 @@ picky-krb = "0.11" network-scanner = { version = "0.0.0", path = "../crates/network-scanner" } video-streamer = { path = "../crates/video-streamer" } terminal-streamer = { path = "../crates/terminal-streamer" } +network-monitor = { version = "0.0.0", path = "../crates/network-monitor" } # Serialization serde = { version = "1", features = ["derive"] } @@ -63,6 +64,7 @@ backoff = "0.4" sysinfo = { version = "0.35", default-features = false, features = ["disk"] } dunce = "1.0" bitflags = "2.9" +tempdir = "0.3" # Security, crypto… picky = { version = "7.0.0-rc.15", default-features = false, features = ["jose", "x509", "pkcs12", "time_conversion"] } diff --git a/devolutions-gateway/openapi/gateway-api.yaml b/devolutions-gateway/openapi/gateway-api.yaml index 9ea84fdd4..2df93e88f 100644 --- a/devolutions-gateway/openapi/gateway-api.yaml +++ b/devolutions-gateway/openapi/gateway-api.yaml @@ -7,7 +7,7 @@ info: email: infos@devolutions.net license: name: MIT/Apache-2.0 - version: 2025.2.2 + version: 2025.2.3 paths: /jet/config: patch: @@ -366,6 +366,57 @@ paths: description: Unexpected server error security: - netscan_token: [] + /jet/net/monitor//config: + post: + tags: + - NetworkMonitoring + summary: Replaces the existing monitoring config with the one provided in the body. + description: |- + This request will immediately start any new monitors, and will stop + currently active monitors that are no longer in the config. + + The configuration is not persisted across restarts. + operationId: SetMonitoringConfig + requestBody: + description: JSON object containing a list of monitors + content: + application/json: + schema: + $ref: '#/components/schemas/MonitorsConfig' + required: true + responses: + '200': + description: New configuration was accepted + '400': + description: Bad request + '401': + description: Invalid or missing authorization token + '403': + description: Insufficient permissions + '500': + description: Unexpected server error while starting monitors + /jet/net/monitor/log/drain: + post: + tags: + - NetworkMonitoring + summary: Monitors store their results in a temporary log, which is returned here. + description: Once the log is downloaded, gateway purges it from memory. + operationId: DrainMonitoringLog + responses: + '200': + description: Log was flushed and returned in the response body + content: + application/json: + schema: + $ref: '#/components/schemas/MonitoringLogResponse' + '400': + description: Bad request + '401': + description: Invalid or missing authorization token + '403': + description: Insufficient permissions + '500': + description: Unexpected server error /jet/preflight: post: tags: @@ -818,6 +869,98 @@ components: internal_url: type: string description: URL to use on local network + MonitorDefinition: + type: object + required: + - id + - probe + - address + - interval + - timeout + properties: + address: + type: string + id: + type: string + interval: + type: integer + format: int64 + minimum: 0 + port: + type: integer + format: int32 + nullable: true + probe: + $ref: '#/components/schemas/MonitoringProbeType' + timeout: + type: integer + format: int64 + minimum: 0 + MonitorDefinitionProbeTypeError: + type: object + required: + - id + - probe + properties: + id: + type: string + description: The ID of the monitor definition in the client-provided config + probe: + type: string + description: The monitor type that was not supported + MonitorResult: + type: object + required: + - monitor_id + - request_start_time + - response_success + - response_time + properties: + monitor_id: + type: string + request_start_time: + type: string + format: date-time + response_messages: + type: string + nullable: true + response_success: + type: boolean + response_time: + type: number + format: double + MonitoringLogResponse: + type: object + required: + - entries + properties: + entries: + type: array + items: + $ref: '#/components/schemas/MonitorResult' + MonitoringProbeType: + oneOf: + - type: string + enum: + - ping + - type: string + enum: + - tcpOpen + - type: object + required: + - unknown + properties: + unknown: + type: string + MonitorsConfig: + type: object + required: + - monitors + properties: + monitors: + type: array + items: + $ref: '#/components/schemas/MonitorDefinition' PreflightAlertStatus: type: string enum: diff --git a/devolutions-gateway/src/api/mod.rs b/devolutions-gateway/src/api/mod.rs index 63545c4e7..1e34d6b90 100644 --- a/devolutions-gateway/src/api/mod.rs +++ b/devolutions-gateway/src/api/mod.rs @@ -7,6 +7,7 @@ pub mod jmux; pub mod jrec; pub mod jrl; pub mod kdc_proxy; +pub mod monitoring; pub mod net; pub mod preflight; pub mod rdp; @@ -40,5 +41,9 @@ pub fn make_router(state: crate::DgwState) -> axum::Router { ); } + if state.conf_handle.get_conf().debug.enable_unstable { + router = router.nest("/jet/net/monitor", monitoring::make_router(state.clone())); + } + router.with_state(state) } diff --git a/devolutions-gateway/src/api/monitoring.rs b/devolutions-gateway/src/api/monitoring.rs new file mode 100644 index 000000000..bb6f7aaec --- /dev/null +++ b/devolutions-gateway/src/api/monitoring.rs @@ -0,0 +1,226 @@ +use crate::DgwState; +use crate::http::HttpError; +use axum::{Json, Router, extract, routing}; +use network_monitor; +use time::OffsetDateTime; + +pub fn make_router(state: DgwState) -> Router { + let router = Router::new() + .route("/config", routing::post(handle_set_monitoring_config)) + .route("/log/drain", routing::post(handle_drain_log)); + + router.with_state(state) +} + +/// Replace the current monitoring configuration with the configuration in the request body. +/// +/// Changes take effect immediately: +/// - Starts any monitors newly defined in the payload. +/// - Stops any currently running monitors that are omitted from the payload. +/// +/// Note: The configuration is not persisted across process restarts. +#[cfg_attr(feature = "openapi", utoipa::path( + post, + operation_id = "SetMonitoringConfig", + tag = "NetworkMonitoring", + path = "/jet/net/monitor/config", + request_body(content = MonitorsConfig, description = "JSON object containing a list of monitors", content_type = "application/json"), + responses( + (status = 200, description = "New configuration was accepted"), + (status = 400, description = "Bad request"), + (status = 401, description = "Invalid or missing authorization token"), + (status = 403, description = "Insufficient permissions"), + (status = 500, description = "Unexpected server error while starting monitors"), + ), +))] +async fn handle_set_monitoring_config( + extract::State(DgwState { monitoring_state, .. }): extract::State, + Json(config): Json, +) -> Result, HttpError> { + let (processed_config, probe_type_errors) = config.lossy_into(); + + network_monitor::set_config(processed_config, monitoring_state) + .await + .map(|_| Json(SetConfigResponse::new(probe_type_errors))) + .map_err( + HttpError::internal() + .with_msg("failed to set up network monitoring") + .err(), + ) +} + +/// Monitors store their results in a temporary log, which is returned here. +/// Once the log is downloaded, gateway purges it from memory. +#[cfg_attr(feature = "openapi", utoipa::path( + post, + operation_id = "DrainMonitoringLog", + tag = "NetworkMonitoring", + path = "/jet/net/monitor/log/drain", + responses( + (status = 200, description = "Log was flushed and returned in the response body", body = MonitoringLogResponse), + (status = 400, description = "Bad request"), + (status = 401, description = "Invalid or missing authorization token"), + (status = 403, description = "Insufficient permissions"), + (status = 500, description = "Unexpected server error"), + ), +))] +async fn handle_drain_log( + extract::State(DgwState { monitoring_state, .. }): extract::State, +) -> Json { + Json(MonitoringLogResponse { + entries: network_monitor::drain_log(monitoring_state) + .into_iter() + .map(MonitorResult::from) + .collect(), + }) +} + +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct MonitorsConfig { + monitors: Vec, +} + +impl MonitorsConfig { + fn lossy_into(self) -> (network_monitor::MonitorsConfig, Vec) { + let (monitors, errors): ( + Vec, + Vec, + ) = self.monitors.into_iter().map(MonitorDefinition::try_into).fold( + ( + Vec::::new(), + Vec::::new(), + ), + |mut partitions, conversion_result| { + match conversion_result { + Ok(value) => partitions.0.push(value), + Err(error) => partitions.1.push(error), + }; + + return partitions; + }, + ); + + let config = network_monitor::MonitorsConfig { monitors }; + + return (config, errors); + } +} + +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(Eq, PartialEq, Hash, Clone, Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub(crate) enum MonitoringProbeType { + Ping, + TcpOpen, + #[serde(untagged)] + Unknown(String), +} +pub(crate) struct MonitoringProbeTypeError { + probe: String, +} + +impl TryFrom for network_monitor::ProbeType { + type Error = MonitoringProbeTypeError; + + fn try_from(value: MonitoringProbeType) -> Result { + match value { + MonitoringProbeType::Ping => Ok(network_monitor::ProbeType::Ping), + MonitoringProbeType::TcpOpen => Ok(network_monitor::ProbeType::TcpOpen), + MonitoringProbeType::Unknown(unknown_type) => Err(MonitoringProbeTypeError { probe: unknown_type }), + } + } +} + +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(Eq, PartialEq, Hash, Clone, Serialize, Deserialize, Debug)] +pub(crate) struct MonitorDefinition { + id: String, + probe: MonitoringProbeType, + address: String, + interval: u64, + timeout: u64, + port: Option, +} + +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, Serialize)] +pub(crate) struct MonitorDefinitionProbeTypeError { + /// The ID of the monitor definition in the client-provided config + id: String, + /// The monitor type that was not supported + probe: String, +} + +impl TryFrom for network_monitor::MonitorDefinition { + type Error = MonitorDefinitionProbeTypeError; + + fn try_from(value: MonitorDefinition) -> Result { + Ok(network_monitor::MonitorDefinition { + id: value.id.clone(), + probe: value.probe.try_into().map_err(|type_error: MonitoringProbeTypeError| { + MonitorDefinitionProbeTypeError { + id: value.id.clone(), + probe: type_error.probe, + } + })?, + address: value.address, + interval: value.interval, + timeout: value.timeout, + port: value.port, + }) + } +} + +/// This body is returned when the config is successfully set, even if one or all probes were not understood. +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[serde(rename_all = "camelCase")] +#[derive(Debug, Clone, Serialize)] +pub(crate) struct SetConfigResponse { + /// An optional list of probes that this server could not parse. + #[serde(skip_serializing_if = "Option::is_none")] + probe_type_errors: Option>, +} + +impl SetConfigResponse { + fn new(probe_type_errors: Vec) -> SetConfigResponse { + match probe_type_errors.is_empty() { + false => SetConfigResponse { + probe_type_errors: Some(probe_type_errors), + }, + true => SetConfigResponse { + probe_type_errors: None, + }, + } + } +} + +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, Serialize)] +pub(crate) struct MonitoringLogResponse { + entries: Vec, +} + +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub(crate) struct MonitorResult { + monitor_id: String, + #[serde(with = "time::serde::rfc3339")] + request_start_time: OffsetDateTime, + response_success: bool, + #[serde(skip_serializing_if = "Option::is_none")] + response_messages: Option, + response_time: f64, +} + +impl From for MonitorResult { + fn from(value: network_monitor::MonitorResult) -> Self { + MonitorResult { + monitor_id: value.monitor_id, + request_start_time: value.request_start_time.into(), + response_success: value.response_success, + response_messages: value.response_messages, + response_time: value.response_time, + } + } +} diff --git a/devolutions-gateway/src/lib.rs b/devolutions-gateway/src/lib.rs index 812886bae..cc3015e69 100644 --- a/devolutions-gateway/src/lib.rs +++ b/devolutions-gateway/src/lib.rs @@ -1,5 +1,6 @@ // Used by devolutions-gateway binary. use ceviche as _; +use tempdir::TempDir; // Used by tests. #[cfg(test)] @@ -43,6 +44,8 @@ pub mod ws; use std::sync::Arc; +use camino::{Utf8Path, Utf8PathBuf}; + #[derive(Clone)] pub struct DgwState { pub conf_handle: config::ConfHandle, @@ -54,6 +57,7 @@ pub struct DgwState { pub recordings: recording::RecordingMessageSender, pub job_queue_handle: job_queue::JobQueueHandle, pub credential_store: credential::CredentialStoreHandle, + pub monitoring_state: Arc, } #[doc(hidden)] @@ -78,6 +82,14 @@ impl DgwState { let (job_queue_handle, job_queue_rx) = job_queue::JobQueueHandle::new(); let credential_store = credential::CredentialStoreHandle::new(); + let temp_dir = TempDir::new("dgw-network-monitor-test").expect("Could not create temp dir"); + let temp_path: Utf8PathBuf = Utf8Path::from_path(temp_dir.path()) + .expect("TempDir gave us a garbage path") + .to_path_buf() + .join("monitors_cache.json"); + + let monitoring_state = Arc::new(network_monitor::State::mock(temp_path)); + let state = Self { conf_handle, token_cache, @@ -88,6 +100,7 @@ impl DgwState { recordings: recording_manager_handle, job_queue_handle, credential_store, + monitoring_state, }; let handles = MockHandles { diff --git a/devolutions-gateway/src/openapi.rs b/devolutions-gateway/src/openapi.rs index 0234d9f88..e0557c4e9 100644 --- a/devolutions-gateway/src/openapi.rs +++ b/devolutions-gateway/src/openapi.rs @@ -27,6 +27,8 @@ use crate::api::preflight::PreflightAlertStatus; crate::api::update::trigger_update_check, crate::api::preflight::post_preflight, crate::api::net::get_net_config, + crate::api::monitoring::handle_set_monitoring_config, + crate::api::monitoring::handle_drain_log, ), components(schemas( crate::api::health::Identity, @@ -58,6 +60,12 @@ use crate::api::preflight::PreflightAlertStatus; SessionTokenSignRequest, InterfaceInfo, AddressFamily, + crate::api::monitoring::MonitoringLogResponse, + crate::api::monitoring::MonitorResult, + crate::api::monitoring::MonitorsConfig, + crate::api::monitoring::MonitorDefinition, + crate::api::monitoring::MonitoringProbeType, + crate::api::monitoring::MonitorDefinitionProbeTypeError, )), modifiers(&SecurityAddon), )] diff --git a/devolutions-gateway/src/service.rs b/devolutions-gateway/src/service.rs index 686b0be67..c3695b468 100644 --- a/devolutions-gateway/src/service.rs +++ b/devolutions-gateway/src/service.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context as _; -use devolutions_gateway::DgwState; use devolutions_gateway::config::{Conf, ConfHandle}; use devolutions_gateway::credential::CredentialStoreHandle; use devolutions_gateway::listener::GatewayListener; @@ -11,6 +10,7 @@ use devolutions_gateway::recording::recording_message_channel; use devolutions_gateway::session::session_manager_channel; use devolutions_gateway::subscriber::subscriber_channel; use devolutions_gateway::token::{CurrentJrl, JrlTokenClaims}; +use devolutions_gateway::{DgwState, config}; use devolutions_gateway_task::{ChildTask, ShutdownHandle, ShutdownSignal}; use devolutions_log::{self, LoggerGuard}; use parking_lot::Mutex; @@ -242,6 +242,9 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { .await .context("failed to initialize job queue context")?; let credential_store = CredentialStoreHandle::new(); + let monitoring_state = Arc::new(network_monitor::State::new( + config::get_data_dir().join("monitors_cache.json"), + )); let state = DgwState { conf_handle: conf_handle.clone(), @@ -253,6 +256,7 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { recordings: recording_manager_handle.clone(), job_queue_handle: job_queue_ctx.job_queue_handle.clone(), credential_store: credential_store.clone(), + monitoring_state: monitoring_state, }; conf.listeners diff --git a/docs/COOKBOOK.md b/docs/COOKBOOK.md index 309602592..0285a8051 100644 --- a/docs/COOKBOOK.md +++ b/docs/COOKBOOK.md @@ -270,6 +270,36 @@ And here is how the response may look like: ] ``` +## Network monitoring API + +Basic monitors can be set up to scan servers on an interval. Currently only ping is supported. Managing the +configuration and storing the logs is expected to be done by an external server. + +Set up a ping monitor for example.com which fires every 10 seconds and times out after 5 seconds. + +```shell +curl -v http://127.0.0.1:7171/jet/net/monitor/config --json '{"monitors":[{"id":"monitor1","probe":"ping","address":"example.com","interval":10,"timeout":5}]}' -H "Authorization: Bearer $dgwkey" +``` + +The monitor will start immediately. Calling this API again will overwrite the configuration, stopping any +monitors no longer present. A body is returned which may contain a list of monitors that could not be started +due their type (set in the field `probe`) being unsupported. + +Retrieve the logs: + +```shell +curl -v http://127.0.0.1:7171/jet/net/monitor/log/drain -X POST -H "Authorization: Bearer $dgwkey" +``` + +The response will look similar to this: + +```json +{"entries":[{"monitor_id":"monitor1","request_start_time":"2025-08-22T17:07:34.3370521Z","response_success":true,"response_time":0.0585181}]} +``` + +Each log entry is only returned once. After you make this request, the existing log is deleted from memory. + + ## Proxy-based credentials injection for RDP ### How it works diff --git a/tools/generate-openapi/Cargo.toml b/tools/generate-openapi/Cargo.toml index 6d93e046d..5fe9eee47 100644 --- a/tools/generate-openapi/Cargo.toml +++ b/tools/generate-openapi/Cargo.toml @@ -9,4 +9,4 @@ publish = false devolutions-gateway = { path = "../../devolutions-gateway", features = ["openapi"] } devolutions-pedm = { path = "../../crates/devolutions-pedm" } serde_yaml = "0.9" -utoipa = { version = "4.2", default-features = false, features = ["uuid", "chrono", "yaml"] } +utoipa = { version = "4.2", default-features = false, features = ["uuid", "chrono", "yaml", "time"] }