Skip to content

Add InMemoryReader to expose otel counter information on demand #782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion hyperactor_telemetry/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry
# @generated by autocargo from //monarch/hyperactor_telemetry:[hyperactor_telemetry,hyperactor_telemetry_test]

[package]
name = "hyperactor_telemetry"
Expand All @@ -7,6 +7,10 @@ authors = ["Meta"]
edition = "2021"
license = "BSD-3-Clause"

[[test]]
name = "hyperactor_telemetry_test"
path = "src/lib.rs"

[dependencies]
anyhow = "1.0.98"
dashmap = { version = "5.5.3", features = ["rayon", "serde"] }
Expand All @@ -27,6 +31,10 @@ tracing-glog = { version = "0.4.1", features = ["ansi", "tracing-log"] }
tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
whoami = "1.5"

[dev-dependencies]
fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
scuba = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }

[features]
default = []
fbcode_build = ["fbinit", "scuba"]
Expand Down
157 changes: 157 additions & 0 deletions hyperactor_telemetry/src/in_memory_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

use std::collections::HashMap;
use std::sync::Weak;

use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::metrics::InstrumentKind;
use opentelemetry_sdk::metrics::ManualReader;
use opentelemetry_sdk::metrics::MetricResult;
use opentelemetry_sdk::metrics::Pipeline;
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::data::Sum;
use opentelemetry_sdk::metrics::reader::MetricReader;

// Global ManualReader instance for easy access with cumulative temporality
static IN_MEMORY_MANUAL_READER: std::sync::LazyLock<ManualReader> =
std::sync::LazyLock::new(|| {
ManualReader::builder()
.with_temporality(Temporality::Cumulative)
.build()
});

/// InMemoryReader that wraps the global ManualReader and implements MetricReader
#[derive(Debug)]
pub struct InMemoryReader;

impl InMemoryReader {
pub fn new() -> Self {
Self
}
}

impl MetricReader for InMemoryReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
IN_MEMORY_MANUAL_READER.register_pipeline(pipeline);
}

fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
IN_MEMORY_MANUAL_READER.collect(rm)
}

fn force_flush(&self) -> OTelSdkResult {
IN_MEMORY_MANUAL_READER.force_flush()
}

fn shutdown(&self) -> OTelSdkResult {
IN_MEMORY_MANUAL_READER.shutdown()
}

fn temporality(&self, kind: InstrumentKind) -> Temporality {
IN_MEMORY_MANUAL_READER.temporality(kind)
}
}

// Public API for In Memory Metrics
impl InMemoryReader {
/// Get all counters from the global ManualReader
pub fn get_all_counters(&self) -> HashMap<String, i64> {
let mut rm = ResourceMetrics {
resource: Resource::builder_empty().build(),
scope_metrics: Vec::new(),
};
let _ = IN_MEMORY_MANUAL_READER.collect(&mut rm);

// Extract counters directly from the collected metrics
let mut counters = HashMap::new();
for scope in &rm.scope_metrics {
for metric in &scope.metrics {
let data = metric.data.as_any();

if let Some(sum_u64) = data.downcast_ref::<Sum<u64>>() {
for data_point in &sum_u64.data_points {
let metric_name = metric.name.to_string();
counters.insert(metric_name, data_point.value as i64);
}
} else if let Some(sum_i64) = data.downcast_ref::<Sum<i64>>() {
for data_point in &sum_i64.data_points {
let metric_name = metric.name.to_string();
counters.insert(metric_name, data_point.value);
}
}
}
}
counters
}
}

#[cfg(test)]
mod tests {
use opentelemetry_sdk::metrics::SdkMeterProvider;

use super::*;

#[test]
fn test_get_all_counters() {
let provider = SdkMeterProvider::builder()
.with_reader(InMemoryReader::new())
.build();

opentelemetry::global::set_meter_provider(provider);

// Create static counters using the macro
crate::declare_static_counter!(TEST_COUNTER_1, "test_counter_1");
crate::declare_static_counter!(TEST_COUNTER_2, "test_counter_2");

// Bump the counters with different values
TEST_COUNTER_1.add(10, &[]);
TEST_COUNTER_2.add(25, &[]);
TEST_COUNTER_1.add(5, &[]); // Add more to the first counter (total should be 15)

// Get all counters and verify values
let counters = InMemoryReader::new().get_all_counters();

// The counters should contain our test counters
println!("All counters: {:?}", counters);

// Assert that we have counters
assert!(!counters.is_empty(), "Should have some counters");

// Assert specific counter values
// TEST_COUNTER_1 should have 15 (10 + 5)
// TEST_COUNTER_2 should have 25
assert_eq!(
counters.get("test_counter_1"),
Some(&15),
"TEST_COUNTER_1 should be 15"
);
assert_eq!(
counters.get("test_counter_2"),
Some(&25),
"TEST_COUNTER_2 should be 25"
);
}

#[test]
fn test_get_all_counters_empty() {
// Get counters when none have been created
let counters = InMemoryReader::new().get_all_counters();

// Should be empty
println!("Empty counters: {:?}", counters);

// This test ensures the function doesn't panic when no counters exist
assert!(
counters.is_empty(),
"Should be empty when no counters created"
);
}
}
17 changes: 5 additions & 12 deletions hyperactor_telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
/// Set to "1" to disable the recorder output.
pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";

pub mod in_memory_reader;
#[cfg(fbcode_build)]
mod meta;
mod otel;
Expand Down Expand Up @@ -298,11 +299,7 @@ macro_rules! declare_static_counter {
#[doc = "a global counter named: "]
#[doc = $key]
pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
std::sync::LazyLock::new(|| {
hyperactor_telemetry::meter(module_path!())
.u64_counter($key)
.build()
});
std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
};
}

Expand Down Expand Up @@ -334,7 +331,7 @@ macro_rules! declare_static_up_down_counter {
#[doc = $key]
pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
std::sync::LazyLock::new(|| {
hyperactor_telemetry::meter(module_path!())
$crate::meter(module_path!())
.i64_up_down_counter($key)
.build()
});
Expand Down Expand Up @@ -368,11 +365,7 @@ macro_rules! declare_static_gauge {
#[doc = "a global gauge named: "]
#[doc = $key]
pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
std::sync::LazyLock::new(|| {
hyperactor_telemetry::meter(module_path!())
.f64_gauge($key)
.build()
});
std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
};
}
/// Create a thread safe static observable gauge that can be set to a specific value based on the provided callback.
Expand All @@ -399,7 +392,7 @@ macro_rules! declare_observable_gauge {
#[doc = $key]
pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
std::sync::LazyLock::new(|| {
hyperactor_telemetry::meter(module_path!())
$crate::meter(module_path!())
.f64_observable_gauge($key)
.with_callback($cb)
.build()
Expand Down