Skip to content

Commit 03d4c99

Browse files
benjipelletierfacebook-github-bot
authored andcommitted
Add InMemoryReader to expose otel counter information on demand (#782)
Summary: Add a new otel reader type `InMemoryReader` * impls `ManualReader` and stores global ManualReader instance so we can get metrics during hyperactor program runs. * provides getters to cumulative counters. Will add more metrics later * ‼️ Uses cumulative temporality so metrics are stored in the SDK rather than in hyperactor. This might be something we iterate on if collection and transforming to a hashmap each time is too costly * also changes macros in lib.rs to use $crate eliothedeman Reviewed By: eliothedeman Differential Revision: D79658641
1 parent ddfb022 commit 03d4c99

File tree

3 files changed

+171
-13
lines changed

3 files changed

+171
-13
lines changed

hyperactor_telemetry/Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry
1+
# @generated by autocargo from //monarch/hyperactor_telemetry:[hyperactor_telemetry,hyperactor_telemetry_test]
22

33
[package]
44
name = "hyperactor_telemetry"
@@ -7,6 +7,10 @@ authors = ["Meta"]
77
edition = "2021"
88
license = "BSD-3-Clause"
99

10+
[[test]]
11+
name = "hyperactor_telemetry_test"
12+
path = "src/lib.rs"
13+
1014
[dependencies]
1115
anyhow = "1.0.98"
1216
dashmap = { version = "5.5.3", features = ["rayon", "serde"] }
@@ -27,6 +31,10 @@ tracing-glog = { version = "0.4.1", features = ["ansi", "tracing-log"] }
2731
tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
2832
whoami = "1.5"
2933

34+
[dev-dependencies]
35+
fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
36+
scuba = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
37+
3038
[features]
3139
default = []
3240
fbcode_build = ["fbinit", "scuba"]
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::collections::HashMap;
10+
use std::sync::Weak;
11+
12+
use opentelemetry_sdk::Resource;
13+
use opentelemetry_sdk::error::OTelSdkResult;
14+
use opentelemetry_sdk::metrics::InstrumentKind;
15+
use opentelemetry_sdk::metrics::ManualReader;
16+
use opentelemetry_sdk::metrics::MetricResult;
17+
use opentelemetry_sdk::metrics::Pipeline;
18+
use opentelemetry_sdk::metrics::Temporality;
19+
use opentelemetry_sdk::metrics::data::ResourceMetrics;
20+
use opentelemetry_sdk::metrics::data::Sum;
21+
use opentelemetry_sdk::metrics::reader::MetricReader;
22+
23+
// Global ManualReader instance for easy access with cumulative temporality
24+
static IN_MEMORY_MANUAL_READER: std::sync::LazyLock<ManualReader> =
25+
std::sync::LazyLock::new(|| {
26+
ManualReader::builder()
27+
.with_temporality(Temporality::Cumulative)
28+
.build()
29+
});
30+
31+
/// InMemoryReader that wraps the global ManualReader and implements MetricReader
32+
#[derive(Debug)]
33+
pub struct InMemoryReader;
34+
35+
impl InMemoryReader {
36+
pub fn new() -> Self {
37+
Self
38+
}
39+
}
40+
41+
impl MetricReader for InMemoryReader {
42+
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
43+
IN_MEMORY_MANUAL_READER.register_pipeline(pipeline);
44+
}
45+
46+
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
47+
IN_MEMORY_MANUAL_READER.collect(rm)
48+
}
49+
50+
fn force_flush(&self) -> OTelSdkResult {
51+
IN_MEMORY_MANUAL_READER.force_flush()
52+
}
53+
54+
fn shutdown(&self) -> OTelSdkResult {
55+
IN_MEMORY_MANUAL_READER.shutdown()
56+
}
57+
58+
fn temporality(&self, kind: InstrumentKind) -> Temporality {
59+
IN_MEMORY_MANUAL_READER.temporality(kind)
60+
}
61+
}
62+
63+
// Public API for In Memory Metrics
64+
impl InMemoryReader {
65+
/// Get all counters from the global ManualReader
66+
pub fn get_all_counters(&self) -> HashMap<String, i64> {
67+
let mut rm = ResourceMetrics {
68+
resource: Resource::builder_empty().build(),
69+
scope_metrics: Vec::new(),
70+
};
71+
let _ = IN_MEMORY_MANUAL_READER.collect(&mut rm);
72+
73+
// Extract counters directly from the collected metrics
74+
let mut counters = HashMap::new();
75+
for scope in &rm.scope_metrics {
76+
for metric in &scope.metrics {
77+
let data = metric.data.as_any();
78+
79+
if let Some(sum_u64) = data.downcast_ref::<Sum<u64>>() {
80+
for data_point in &sum_u64.data_points {
81+
let metric_name = metric.name.to_string();
82+
counters.insert(metric_name, data_point.value as i64);
83+
}
84+
} else if let Some(sum_i64) = data.downcast_ref::<Sum<i64>>() {
85+
for data_point in &sum_i64.data_points {
86+
let metric_name = metric.name.to_string();
87+
counters.insert(metric_name, data_point.value);
88+
}
89+
}
90+
}
91+
}
92+
counters
93+
}
94+
}
95+
96+
#[cfg(test)]
97+
mod tests {
98+
use opentelemetry_sdk::metrics::SdkMeterProvider;
99+
100+
use super::*;
101+
102+
#[test]
103+
fn test_get_all_counters() {
104+
let provider = SdkMeterProvider::builder()
105+
.with_reader(InMemoryReader::new())
106+
.build();
107+
108+
opentelemetry::global::set_meter_provider(provider);
109+
110+
// Create static counters using the macro
111+
crate::declare_static_counter!(TEST_COUNTER_1, "test_counter_1");
112+
crate::declare_static_counter!(TEST_COUNTER_2, "test_counter_2");
113+
114+
// Bump the counters with different values
115+
TEST_COUNTER_1.add(10, &[]);
116+
TEST_COUNTER_2.add(25, &[]);
117+
TEST_COUNTER_1.add(5, &[]); // Add more to the first counter (total should be 15)
118+
119+
// Get all counters and verify values
120+
let counters = InMemoryReader::new().get_all_counters();
121+
122+
// The counters should contain our test counters
123+
println!("All counters: {:?}", counters);
124+
125+
// Assert that we have counters
126+
assert!(!counters.is_empty(), "Should have some counters");
127+
128+
// Assert specific counter values
129+
// TEST_COUNTER_1 should have 15 (10 + 5)
130+
// TEST_COUNTER_2 should have 25
131+
assert_eq!(
132+
counters.get("test_counter_1"),
133+
Some(&15),
134+
"TEST_COUNTER_1 should be 15"
135+
);
136+
assert_eq!(
137+
counters.get("test_counter_2"),
138+
Some(&25),
139+
"TEST_COUNTER_2 should be 25"
140+
);
141+
}
142+
143+
#[test]
144+
fn test_get_all_counters_empty() {
145+
// Get counters when none have been created
146+
let counters = InMemoryReader::new().get_all_counters();
147+
148+
// Should be empty
149+
println!("Empty counters: {:?}", counters);
150+
151+
// This test ensures the function doesn't panic when no counters exist
152+
assert!(
153+
counters.is_empty(),
154+
"Should be empty when no counters created"
155+
);
156+
}
157+
}

hyperactor_telemetry/src/lib.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
2626
/// Set to "1" to disable the recorder output.
2727
pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
2828

29+
pub mod in_memory_reader;
2930
#[cfg(fbcode_build)]
3031
mod meta;
3132
mod otel;
@@ -298,11 +299,7 @@ macro_rules! declare_static_counter {
298299
#[doc = "a global counter named: "]
299300
#[doc = $key]
300301
pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
301-
std::sync::LazyLock::new(|| {
302-
hyperactor_telemetry::meter(module_path!())
303-
.u64_counter($key)
304-
.build()
305-
});
302+
std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
306303
};
307304
}
308305

@@ -334,7 +331,7 @@ macro_rules! declare_static_up_down_counter {
334331
#[doc = $key]
335332
pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
336333
std::sync::LazyLock::new(|| {
337-
hyperactor_telemetry::meter(module_path!())
334+
$crate::meter(module_path!())
338335
.i64_up_down_counter($key)
339336
.build()
340337
});
@@ -368,11 +365,7 @@ macro_rules! declare_static_gauge {
368365
#[doc = "a global gauge named: "]
369366
#[doc = $key]
370367
pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
371-
std::sync::LazyLock::new(|| {
372-
hyperactor_telemetry::meter(module_path!())
373-
.f64_gauge($key)
374-
.build()
375-
});
368+
std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
376369
};
377370
}
378371
/// Create a thread safe static observable gauge that can be set to a specific value based on the provided callback.
@@ -399,7 +392,7 @@ macro_rules! declare_observable_gauge {
399392
#[doc = $key]
400393
pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
401394
std::sync::LazyLock::new(|| {
402-
hyperactor_telemetry::meter(module_path!())
395+
$crate::meter(module_path!())
403396
.f64_observable_gauge($key)
404397
.with_callback($cb)
405398
.build()

0 commit comments

Comments
 (0)