From 7fc5a76e09717988f6f73d16fe650e40b9b76d62 Mon Sep 17 00:00:00 2001 From: Benji Pelletier Date: Fri, 8 Aug 2025 08:31:11 -0700 Subject: [PATCH] Add InMemoryReader to expose otel counter information on demand (#782) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Add a new otel reader type `InMemoryReader` * impls `ManualReader` and stores global ManualReader instance so we can get metrics during hyperactor program runs. * provides getters to cumulative counters. Will add more metrics later * ‼️ Uses cumulative temporality so metrics are stored in the SDK rather than in hyperactor. This might be something we iterate on if collection and transforming to a hashmap each time is too costly * also changes macros in lib.rs to use $crate eliothedeman Reviewed By: eliothedeman Differential Revision: D79658641 --- hyperactor_telemetry/Cargo.toml | 10 +- hyperactor_telemetry/src/in_memory_reader.rs | 157 +++++++++++++++++++ hyperactor_telemetry/src/lib.rs | 17 +- 3 files changed, 171 insertions(+), 13 deletions(-) create mode 100644 hyperactor_telemetry/src/in_memory_reader.rs diff --git a/hyperactor_telemetry/Cargo.toml b/hyperactor_telemetry/Cargo.toml index 42744afa7..8ffdfe426 100644 --- a/hyperactor_telemetry/Cargo.toml +++ b/hyperactor_telemetry/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry +# @generated by autocargo from //monarch/hyperactor_telemetry:[hyperactor_telemetry,hyperactor_telemetry_test] [package] name = "hyperactor_telemetry" @@ -7,6 +7,10 @@ authors = ["Meta"] edition = "2021" license = "BSD-3-Clause" +[[test]] +name = "hyperactor_telemetry_test" +path = "src/lib.rs" + [dependencies] anyhow = "1.0.98" dashmap = { version = "5.5.3", features = ["rayon", "serde"] } @@ -27,6 +31,10 @@ tracing-glog = { version = "0.4.1", features = ["ansi", "tracing-log"] } tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] } whoami = "1.5" +[dev-dependencies] +fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } +scuba = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } + [features] default = [] fbcode_build = ["fbinit", "scuba"] diff --git a/hyperactor_telemetry/src/in_memory_reader.rs b/hyperactor_telemetry/src/in_memory_reader.rs new file mode 100644 index 000000000..0005bfcb5 --- /dev/null +++ b/hyperactor_telemetry/src/in_memory_reader.rs @@ -0,0 +1,157 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +use std::collections::HashMap; +use std::sync::Weak; + +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::metrics::InstrumentKind; +use opentelemetry_sdk::metrics::ManualReader; +use opentelemetry_sdk::metrics::MetricResult; +use opentelemetry_sdk::metrics::Pipeline; +use opentelemetry_sdk::metrics::Temporality; +use opentelemetry_sdk::metrics::data::ResourceMetrics; +use opentelemetry_sdk::metrics::data::Sum; +use opentelemetry_sdk::metrics::reader::MetricReader; + +// Global ManualReader instance for easy access with cumulative temporality +static IN_MEMORY_MANUAL_READER: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + ManualReader::builder() + .with_temporality(Temporality::Cumulative) + .build() + }); + +/// InMemoryReader that wraps the global ManualReader and implements MetricReader +#[derive(Debug)] +pub struct InMemoryReader; + +impl InMemoryReader { + pub fn new() -> Self { + Self + } +} + +impl MetricReader for InMemoryReader { + fn register_pipeline(&self, pipeline: Weak) { + IN_MEMORY_MANUAL_READER.register_pipeline(pipeline); + } + + fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> { + IN_MEMORY_MANUAL_READER.collect(rm) + } + + fn force_flush(&self) -> OTelSdkResult { + IN_MEMORY_MANUAL_READER.force_flush() + } + + fn shutdown(&self) -> OTelSdkResult { + IN_MEMORY_MANUAL_READER.shutdown() + } + + fn temporality(&self, kind: InstrumentKind) -> Temporality { + IN_MEMORY_MANUAL_READER.temporality(kind) + } +} + +// Public API for In Memory Metrics +impl InMemoryReader { + /// Get all counters from the global ManualReader + pub fn get_all_counters(&self) -> HashMap { + let mut rm = ResourceMetrics { + resource: Resource::builder_empty().build(), + scope_metrics: Vec::new(), + }; + let _ = IN_MEMORY_MANUAL_READER.collect(&mut rm); + + // Extract counters directly from the collected metrics + let mut counters = HashMap::new(); + for scope in &rm.scope_metrics { + for metric in &scope.metrics { + let data = metric.data.as_any(); + + if let Some(sum_u64) = data.downcast_ref::>() { + for data_point in &sum_u64.data_points { + let metric_name = metric.name.to_string(); + counters.insert(metric_name, data_point.value as i64); + } + } else if let Some(sum_i64) = data.downcast_ref::>() { + for data_point in &sum_i64.data_points { + let metric_name = metric.name.to_string(); + counters.insert(metric_name, data_point.value); + } + } + } + } + counters + } +} + +#[cfg(test)] +mod tests { + use opentelemetry_sdk::metrics::SdkMeterProvider; + + use super::*; + + #[test] + fn test_get_all_counters() { + let provider = SdkMeterProvider::builder() + .with_reader(InMemoryReader::new()) + .build(); + + opentelemetry::global::set_meter_provider(provider); + + // Create static counters using the macro + crate::declare_static_counter!(TEST_COUNTER_1, "test_counter_1"); + crate::declare_static_counter!(TEST_COUNTER_2, "test_counter_2"); + + // Bump the counters with different values + TEST_COUNTER_1.add(10, &[]); + TEST_COUNTER_2.add(25, &[]); + TEST_COUNTER_1.add(5, &[]); // Add more to the first counter (total should be 15) + + // Get all counters and verify values + let counters = InMemoryReader::new().get_all_counters(); + + // The counters should contain our test counters + println!("All counters: {:?}", counters); + + // Assert that we have counters + assert!(!counters.is_empty(), "Should have some counters"); + + // Assert specific counter values + // TEST_COUNTER_1 should have 15 (10 + 5) + // TEST_COUNTER_2 should have 25 + assert_eq!( + counters.get("test_counter_1"), + Some(&15), + "TEST_COUNTER_1 should be 15" + ); + assert_eq!( + counters.get("test_counter_2"), + Some(&25), + "TEST_COUNTER_2 should be 25" + ); + } + + #[test] + fn test_get_all_counters_empty() { + // Get counters when none have been created + let counters = InMemoryReader::new().get_all_counters(); + + // Should be empty + println!("Empty counters: {:?}", counters); + + // This test ensures the function doesn't panic when no counters exist + assert!( + counters.is_empty(), + "Should be empty when no counters created" + ); + } +} diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 8d12d86ab..4e0fc5780 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -26,6 +26,7 @@ pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS"; /// Set to "1" to disable the recorder output. pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING"; +pub mod in_memory_reader; #[cfg(fbcode_build)] mod meta; mod otel; @@ -298,11 +299,7 @@ macro_rules! declare_static_counter { #[doc = "a global counter named: "] #[doc = $key] pub static $name: std::sync::LazyLock> = - std::sync::LazyLock::new(|| { - hyperactor_telemetry::meter(module_path!()) - .u64_counter($key) - .build() - }); + std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build()); }; } @@ -334,7 +331,7 @@ macro_rules! declare_static_up_down_counter { #[doc = $key] pub static $name: std::sync::LazyLock> = std::sync::LazyLock::new(|| { - hyperactor_telemetry::meter(module_path!()) + $crate::meter(module_path!()) .i64_up_down_counter($key) .build() }); @@ -368,11 +365,7 @@ macro_rules! declare_static_gauge { #[doc = "a global gauge named: "] #[doc = $key] pub static $name: std::sync::LazyLock> = - std::sync::LazyLock::new(|| { - hyperactor_telemetry::meter(module_path!()) - .f64_gauge($key) - .build() - }); + std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build()); }; } /// Create a thread safe static observable gauge that can be set to a specific value based on the provided callback. @@ -399,7 +392,7 @@ macro_rules! declare_observable_gauge { #[doc = $key] pub static $name: std::sync::LazyLock> = std::sync::LazyLock::new(|| { - hyperactor_telemetry::meter(module_path!()) + $crate::meter(module_path!()) .f64_observable_gauge($key) .with_callback($cb) .build()