diff --git a/.github/workflows/trivy-ci.yml b/.github/workflows/trivy-ci.yml index ee561b663..1af46f4bd 100644 --- a/.github/workflows/trivy-ci.yml +++ b/.github/workflows/trivy-ci.yml @@ -45,7 +45,7 @@ jobs: format: sarif output: chart/trivy-helm.sarif severity: HIGH,CRITICAL - + - name: Upload Helm SARIF uses: github/codeql-action/upload-sarif@v4 with: @@ -100,4 +100,4 @@ jobs: uses: github/codeql-action/upload-sarif@v4 with: sarif_file: trivy-image.sarif - category: trivy-image-${{ matrix.image }} + category: trivy-image-${{ matrix.image }} diff --git a/Cargo.lock b/Cargo.lock index 7ae0dc39d..3903cd126 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2300,15 +2300,19 @@ name = "metrics-exporter" version = "0.1.0" dependencies = [ "actix-web", + "anyhow", "clap", + "humantime", "mime", "once_cell", + "openapi", "prometheus", "rpc", "serde", "tokio", "tonic", "tracing", + "url", "utils", ] diff --git a/chart/templates/mayastor/io/io-engine-daemonset.yaml b/chart/templates/mayastor/io/io-engine-daemonset.yaml index 2e848ebf0..5746a3bd1 100644 --- a/chart/templates/mayastor/io/io-engine-daemonset.yaml +++ b/chart/templates/mayastor/io/io-engine-daemonset.yaml @@ -163,6 +163,7 @@ spec: - "--grpc-port={{ default 10124 .Values.io_engine.port }}" - "--fmt-style={{ include "logFormat" . }}" - "--ansi-colors={{ .Values.base.logging.color }}" + - "--rest-endpoint=http://{{ .Release.Name }}-api-rest:8081" {{- end }} volumes: - name: device diff --git a/metrics-exporter/Cargo.toml b/metrics-exporter/Cargo.toml index 0b3c30084..22e2e4f35 100644 --- a/metrics-exporter/Cargo.toml +++ b/metrics-exporter/Cargo.toml @@ -23,3 +23,9 @@ tonic = { workspace = true } serde = { workspace = true } mime = { workspace = true } tracing = { workspace = true } + +# OEP-4111: Node status metrics dependencies +openapi = { path = "../dependencies/control-plane/openapi" } +anyhow = { workspace = true } +humantime = { workspace = true } +url = { workspace = true } diff --git a/metrics-exporter/src/bin/io_engine/collector/mod.rs b/metrics-exporter/src/bin/io_engine/collector/mod.rs index 101ffa0de..a9a7fe56f 100644 --- a/metrics-exporter/src/bin/io_engine/collector/mod.rs +++ b/metrics-exporter/src/bin/io_engine/collector/mod.rs @@ -4,6 +4,7 @@ use prometheus::{ }; pub(crate) mod nexus_stat; +pub(crate) mod node_status; pub(crate) mod pool; pub(crate) mod pool_stat; pub(crate) mod replica_stat; diff --git a/metrics-exporter/src/bin/io_engine/collector/node_status.rs b/metrics-exporter/src/bin/io_engine/collector/node_status.rs new file mode 100644 index 000000000..55fc10f40 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/collector/node_status.rs @@ -0,0 +1,365 @@ +use openapi::models::{CordonDrainState, Node, NodeStatus}; +use prometheus::{ + core::{Collector, Desc}, + proto, GaugeVec, Opts, +}; + +/// Returns true if the node is online. +fn is_online(node: &Node) -> bool { + node.state + .as_ref() + .map(|s| matches!(s.status, NodeStatus::Online)) + .unwrap_or(false) +} + +/// Returns true if the node is draining or drained. +fn is_draining(node: &Node) -> bool { + node.spec + .as_ref() + .and_then(|s| s.cordondrainstate.as_ref()) + .map(|cds| { + matches!( + cds, + CordonDrainState::drainingstate(_) | CordonDrainState::drainedstate(_) + ) + }) + .unwrap_or(false) +} + +/// Returns true if the node is cordoned (including implicit cordon from drain). +fn is_cordoned(node: &Node) -> bool { + is_draining(node) + || node + .spec + .as_ref() + .and_then(|s| s.cordondrainstate.as_ref()) + .map(|cds| matches!(cds, CordonDrainState::cordonedstate(_))) + .unwrap_or(false) +} + +/// Collector for node status metrics. +pub(crate) struct NodeStatusCollector { + /// Node data fetched at scrape time (None if unavailable). + node: Option, + /// Gauge for node online status (0/1). + node_online: GaugeVec, + /// Gauge for node cordoned status (0/1). + node_cordoned: GaugeVec, + /// Gauge for node draining status (0/1). + node_draining: GaugeVec, + /// Descriptors for Prometheus. + descs: Vec, +} + +impl NodeStatusCollector { + /// Create a new NodeStatusCollector from node data fetched at scrape time. + pub(crate) fn new(node: Option) -> Self { + let mut descs = Vec::new(); + + let node_online = GaugeVec::new( + Opts::new("node_online", "Indicates if Mayastor node is online") + .subsystem("mayastor") + .variable_labels(vec!["node_id".to_string()]), + &["node_id"], + ) + .expect("Unable to create gauge metric for node_online"); + descs.extend(node_online.desc().into_iter().cloned()); + + let node_cordoned = GaugeVec::new( + Opts::new("node_cordoned", "Indicates if Mayastor node is cordoned") + .subsystem("mayastor") + .variable_labels(vec!["node_id".to_string()]), + &["node_id"], + ) + .expect("Unable to create gauge metric for node_cordoned"); + descs.extend(node_cordoned.desc().into_iter().cloned()); + + let node_draining = GaugeVec::new( + Opts::new("node_draining", "Indicates if Mayastor node is draining") + .subsystem("mayastor") + .variable_labels(vec!["node_id".to_string()]), + &["node_id"], + ) + .expect("Unable to create gauge metric for node_draining"); + descs.extend(node_draining.desc().into_iter().cloned()); + + Self { + node, + node_online, + node_cordoned, + node_draining, + descs, + } + } + + /// Update metrics from node data fetched at scrape time. + fn update_metrics(&self) { + let node = match &self.node { + Some(n) => n, + None => return, + }; + + let node_id = node.id.as_str(); + + self.node_online + .with_label_values(&[node_id]) + .set(if is_online(node) { 1.0 } else { 0.0 }); + + self.node_cordoned + .with_label_values(&[node_id]) + .set(if is_cordoned(node) { 1.0 } else { 0.0 }); + + self.node_draining + .with_label_values(&[node_id]) + .set(if is_draining(node) { 1.0 } else { 0.0 }); + } +} + +impl Collector for NodeStatusCollector { + fn desc(&self) -> Vec<&Desc> { + self.descs.iter().collect() + } + + fn collect(&self) -> Vec { + self.update_metrics(); + + let mut metrics = Vec::new(); + metrics.extend(self.node_online.collect()); + metrics.extend(self.node_cordoned.collect()); + metrics.extend(self.node_draining.collect()); + metrics + } +} + +#[cfg(test)] +mod tests { + use super::*; + use openapi::models::{CordonedState, DrainState, Node, NodeSpec, NodeState}; + use prometheus::core::Collector; + + /// Create a test node with the specified status and cordon/drain state. + fn create_test_node(id: &str, online: bool, cordon_drain: Option) -> Node { + let status = if online { + NodeStatus::Online + } else { + NodeStatus::Offline + }; + Node { + id: id.to_string(), + spec: Some(NodeSpec::new_all( + format!("http://{id}:10124"), + id, + None::>, + cordon_drain, + None::, + None::, + )), + state: Some(NodeState::new_all( + format!("http://{id}:10124"), + id, + status, + None::, + None::, + )), + } + } + + /// Helper to extract metric value from collected metrics. + fn get_metric_value(metrics: &[proto::MetricFamily], name: &str, node_id: &str) -> Option { + for mf in metrics { + if mf.get_name() == name { + for metric in mf.get_metric() { + for label in metric.get_label() { + if label.get_name() == "node_id" && label.get_value() == node_id { + return Some(metric.get_gauge().get_value()); + } + } + } + } + } + None + } + + #[test] + fn test_collector_creation() { + let collector = NodeStatusCollector::new(None); + // 3 metrics: online, cordoned, draining. + assert_eq!(collector.desc().len(), 3); + } + + #[test] + fn test_collector_no_node() { + let collector = NodeStatusCollector::new(None); + let metrics = collector.collect(); + // 3 metric families returned, but with no data points. + assert_eq!(metrics.len(), 3); + for mf in &metrics { + assert_eq!(mf.get_metric().len(), 0); + } + } + + #[test] + fn test_collector_online_node() { + let node = create_test_node("io-engine-1", true, None); + let collector = NodeStatusCollector::new(Some(node)); + + let metrics = collector.collect(); + + assert_eq!( + get_metric_value(&metrics, "mayastor_node_online", "io-engine-1"), + Some(1.0) + ); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_cordoned", "io-engine-1"), + Some(0.0) + ); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_draining", "io-engine-1"), + Some(0.0) + ); + } + + #[test] + fn test_collector_offline_node() { + let node = create_test_node("io-engine-1", false, None); + let collector = NodeStatusCollector::new(Some(node)); + + let metrics = collector.collect(); + + assert_eq!( + get_metric_value(&metrics, "mayastor_node_online", "io-engine-1"), + Some(0.0) + ); + } + + #[test] + fn test_collector_cordoned_node() { + let cordon = + CordonDrainState::cordonedstate(CordonedState::new(vec!["maintenance".to_string()])); + let node = create_test_node("io-engine-1", true, Some(cordon)); + let collector = NodeStatusCollector::new(Some(node)); + + let metrics = collector.collect(); + + assert_eq!( + get_metric_value(&metrics, "mayastor_node_cordoned", "io-engine-1"), + Some(1.0) + ); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_online", "io-engine-1"), + Some(1.0) + ); + } + + #[test] + fn test_collector_draining_node() { + let draining = CordonDrainState::drainingstate(DrainState::new( + vec!["maintenance".to_string()], + vec!["drain-volumes".to_string()], + )); + let node = create_test_node("io-engine-1", true, Some(draining)); + let collector = NodeStatusCollector::new(Some(node)); + + let metrics = collector.collect(); + + assert_eq!( + get_metric_value(&metrics, "mayastor_node_draining", "io-engine-1"), + Some(1.0) + ); + // Draining implies cordoned. + assert_eq!( + get_metric_value(&metrics, "mayastor_node_cordoned", "io-engine-1"), + Some(1.0) + ); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_online", "io-engine-1"), + Some(1.0) + ); + } + + #[test] + fn test_collector_state_transitions() { + // Online -> Cordoned -> Draining -> Drained -> Online. + let node = create_test_node("io-engine-1", true, None); + let metrics = NodeStatusCollector::new(Some(node)).collect(); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_online", "io-engine-1"), + Some(1.0) + ); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_cordoned", "io-engine-1"), + Some(0.0) + ); + + // Cordoned. + let cordon = + CordonDrainState::cordonedstate(CordonedState::new(vec!["maintenance".to_string()])); + let node = create_test_node("io-engine-1", true, Some(cordon)); + let metrics = NodeStatusCollector::new(Some(node)).collect(); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_cordoned", "io-engine-1"), + Some(1.0) + ); + + // Draining. + let draining = CordonDrainState::drainingstate(DrainState::new( + vec!["maintenance".to_string()], + vec!["drain-volumes".to_string()], + )); + let node = create_test_node("io-engine-1", true, Some(draining)); + let metrics = NodeStatusCollector::new(Some(node)).collect(); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_draining", "io-engine-1"), + Some(1.0) + ); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_cordoned", "io-engine-1"), + Some(1.0) + ); + + // Drained. + let drained = CordonDrainState::drainedstate(DrainState::new( + vec!["maintenance".to_string()], + vec!["drain-volumes".to_string()], + )); + let node = create_test_node("io-engine-1", true, Some(drained)); + let metrics = NodeStatusCollector::new(Some(node)).collect(); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_draining", "io-engine-1"), + Some(1.0) + ); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_cordoned", "io-engine-1"), + Some(1.0) + ); + + // Back to online. + let node = create_test_node("io-engine-1", true, None); + let metrics = NodeStatusCollector::new(Some(node)).collect(); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_online", "io-engine-1"), + Some(1.0) + ); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_cordoned", "io-engine-1"), + Some(0.0) + ); + assert_eq!( + get_metric_value(&metrics, "mayastor_node_draining", "io-engine-1"), + Some(0.0) + ); + } + + #[test] + fn test_collector_metric_names() { + let node = create_test_node("test-node", true, None); + let collector = NodeStatusCollector::new(Some(node)); + + let metrics = collector.collect(); + let metric_names: Vec<&str> = metrics.iter().map(|m| m.get_name()).collect(); + + assert!(metric_names.contains(&"mayastor_node_online")); + assert!(metric_names.contains(&"mayastor_node_cordoned")); + assert!(metric_names.contains(&"mayastor_node_draining")); + } +} diff --git a/metrics-exporter/src/bin/io_engine/main.rs b/metrics-exporter/src/bin/io_engine/main.rs index 0cd99332e..b8cd99664 100644 --- a/metrics-exporter/src/bin/io_engine/main.rs +++ b/metrics-exporter/src/bin/io_engine/main.rs @@ -1,6 +1,7 @@ use crate::{ client::grpc_client::{init_client, GrpcClient}, error::ExporterError, + node_status::NodeStatusClient, serve::metric_route, }; use actix_web::{middleware, HttpServer}; @@ -10,16 +11,19 @@ use std::{ env, net::{IpAddr, SocketAddr}, }; +use tracing::{info, warn}; use utils::tracing_telemetry::{FmtLayer, FmtStyle}; /// Cache module for exporter. pub(crate) mod cache; -/// Grpc client module. +/// gRPC client module. pub(crate) mod client; /// Collector module. pub(crate) mod collector; /// Error module. pub(crate) mod error; +/// Node status module for REST-based metrics. +pub(crate) mod node_status; /// Prometheus metrics handler module. pub(crate) mod serve; @@ -60,15 +64,19 @@ pub(crate) struct Cli { /// Use ANSI colors for the logs. #[clap(long, default_value_t = true, action = clap::ArgAction::Set)] ansi_colors: bool, -} -impl Cli { - fn args() -> Self { - Cli::parse() - } + /// REST endpoint for control-plane API (for node status metrics). + #[clap(long, env = "MAYASTOR_REST_ENDPOINT")] + rest_endpoint: Option, + + /// Timeout for node status REST requests. + #[clap(long, default_value = "10s", env = "MAYASTOR_SCRAPE_TIMEOUT")] + scrape_timeout: humantime::Duration, } static GRPC_CLIENT: OnceCell = OnceCell::new(); +static NODE_STATUS_CLIENT: OnceCell = OnceCell::new(); +static NODE_NAME: OnceCell = OnceCell::new(); /// Get IO engine gRPC Client. pub(crate) fn grpc_client<'a>() -> &'a GrpcClient { @@ -77,9 +85,21 @@ pub(crate) fn grpc_client<'a>() -> &'a GrpcClient { .expect("gRPC Client should have been initialised") } +/// Get node status REST client, if configured. +pub(crate) fn node_status_client() -> Option<&'static NodeStatusClient> { + NODE_STATUS_CLIENT.get() +} + +/// Get the name of the node this exporter is running on. +pub(crate) fn node_name() -> &'static str { + NODE_NAME + .get() + .expect("Node name should have been initialised") +} + #[tokio::main] async fn main() -> Result<(), ExporterError> { - let args = Cli::args(); + let args = Cli::parse(); utils::print_package_info!(); utils::tracing_telemetry::TracingTelemetry::builder() @@ -94,6 +114,26 @@ async fn main() -> Result<(), ExporterError> { GRPC_CLIENT .set(client) .expect("Expect to be initialised only once"); + + // Store node name for use by the metrics handler on every scrape. + NODE_NAME + .set(get_node_name()?) + .expect("Node name should be initialised only once"); + + // Initialize node status REST client if endpoint is configured. + if let Some(ref endpoint) = args.rest_endpoint { + info!("Initializing node status REST client with endpoint: {endpoint}"); + let node_client = NodeStatusClient::new(&endpoint.to_string(), *args.scrape_timeout) + .map_err(|e| { + ExporterError::HttpServerError(format!("Failed to create node status client: {e}")) + })?; + NODE_STATUS_CLIENT + .set(node_client) + .expect("Node status client should be initialised only once"); + } else { + warn!("REST endpoint not configured, node status metrics will not be available"); + } + let app = move || { actix_web::App::new() .wrap(middleware::Logger::default()) diff --git a/metrics-exporter/src/bin/io_engine/node_status/client.rs b/metrics-exporter/src/bin/io_engine/node_status/client.rs new file mode 100644 index 000000000..69cad0ed8 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/node_status/client.rs @@ -0,0 +1,36 @@ +use openapi::{ + clients::tower::Error, + models::{Node, RestJsonError}, + tower::client::{ApiClient, Configuration}, +}; +use std::time::Duration; +use tracing::trace; + +/// REST client for fetching node status from the control-plane. +#[derive(Clone, Debug)] +pub(crate) struct NodeStatusClient { + client: ApiClient, +} + +impl NodeStatusClient { + /// Create a new NodeStatusClient. + pub(crate) fn new(endpoint: &str, timeout: Duration) -> anyhow::Result { + let url = url::Url::parse(endpoint) + .map_err(|e| anyhow::anyhow!("Invalid REST endpoint URL '{endpoint}': {e}"))?; + let config = Configuration::builder() + .with_timeout(timeout) + .with_tracing(false) + .build_url(url) + .map_err(|e| anyhow::anyhow!("Failed to create openapi configuration: {e:?}"))?; + let client = ApiClient::new(config); + Ok(Self { client }) + } + + /// Fetch a single node from the control-plane REST API. + pub(crate) async fn fetch_node(&self, node_id: &str) -> Result> { + trace!("Fetching node {node_id} from control-plane"); + let node = self.client.nodes_api().get_node(node_id).await?.into_body(); + trace!("Successfully fetched node {node_id}"); + Ok(node) + } +} diff --git a/metrics-exporter/src/bin/io_engine/node_status/mod.rs b/metrics-exporter/src/bin/io_engine/node_status/mod.rs new file mode 100644 index 000000000..89b850fd3 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/node_status/mod.rs @@ -0,0 +1,4 @@ +/// Node status REST client. +pub(crate) mod client; + +pub(crate) use client::NodeStatusClient; diff --git a/metrics-exporter/src/bin/io_engine/serve/handler.rs b/metrics-exporter/src/bin/io_engine/serve/handler.rs index 32ccdeb2b..dad83961d 100644 --- a/metrics-exporter/src/bin/io_engine/serve/handler.rs +++ b/metrics-exporter/src/bin/io_engine/serve/handler.rs @@ -2,11 +2,12 @@ use crate::{ cache::store_resource_data, collector::{ nexus_stat::NexusIoStatsCollector, + node_status::NodeStatusCollector, pool::{PoolCapacityCollector, PoolStatusCollector}, pool_stat::PoolIoStatsCollector, replica_stat::ReplicaIoStatsCollector, }, - grpc_client, + grpc_client, node_name, node_status_client, }; use actix_web::{http::header, HttpResponse, Responder}; use prometheus::{Encoder, Registry}; @@ -16,12 +17,26 @@ use tracing::{error, warn}; pub(crate) async fn metrics_handler() -> impl Responder { // Fetches stats for all resource from io engine, Populates the cache. store_resource_data(grpc_client()).await; + + // Fetch node status inline at scrape time so Prometheus timestamps reflect actual state. + let node = match node_status_client() { + Some(client) => match client.fetch_node(node_name()).await { + Ok(node) => Some(node), + Err(e) => { + warn!("Failed to fetch node status from control-plane: {e}"); + None + } + }, + None => None, + }; + // Create collectors for all resources. let pools_collector = PoolCapacityCollector::default(); let pool_status_collector = PoolStatusCollector::default(); let pool_iostat_collector = PoolIoStatsCollector::default(); let nexus_iostat_collector = NexusIoStatsCollector::default(); let replica_iostat_collector = ReplicaIoStatsCollector::default(); + let node_status_collector = NodeStatusCollector::new(node); // Create a new registry for prometheus. let registry = Registry::default(); // Register all collectors to the registry. @@ -40,6 +55,9 @@ pub(crate) async fn metrics_handler() -> impl Responder { if let Err(error) = Registry::register(®istry, Box::new(replica_iostat_collector)) { warn!(%error, "Replica IoStat collector already registered"); } + if let Err(error) = Registry::register(®istry, Box::new(node_status_collector)) { + warn!(%error, "Node status collector already registered"); + } let mut buffer = Vec::new();