Skip to content

Commit 44ecc8a

Browse files
committed
Passed down the ignored timeout from MeterProvider to reader in pipeline
1 parent 4a3aa77 commit 44ecc8a

File tree

2 files changed

+28
-4
lines changed

2 files changed

+28
-4
lines changed

opentelemetry-sdk/src/metrics/meter_provider.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,12 @@ impl SdkMeterProvider {
110110
///
111111
/// There is no guaranteed that all telemetry be flushed or all resources have
112112
/// been released on error.
113-
pub fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
113+
pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
114114
otel_debug!(
115115
name: "MeterProvider.Shutdown",
116116
message = "User initiated shutdown of MeterProvider."
117117
);
118-
self.inner.shutdown()
118+
self.inner.shutdown_with_timeout(timeout)
119119
}
120120

121121
/// shutdown with default timeout
@@ -136,15 +136,15 @@ impl SdkMeterProviderInner {
136136
}
137137
}
138138

139-
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
139+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
140140
if self
141141
.shutdown_invoked
142142
.swap(true, std::sync::atomic::Ordering::SeqCst)
143143
{
144144
// If the previous value was true, shutdown was already invoked.
145145
Err(crate::error::OTelSdkError::AlreadyShutdown)
146146
} else {
147-
self.pipes.shutdown()
147+
self.pipes.shutdown_with_timeout(timeout)
148148
}
149149
}
150150

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
borrow::Cow,
44
collections::{HashMap, HashSet},
55
sync::{Arc, Mutex},
6+
time::Duration,
67
};
78

89
use opentelemetry::{otel_debug, otel_warn, InstrumentationScope, KeyValue};
@@ -96,6 +97,11 @@ impl Pipeline {
9697
fn shutdown(&self) -> OTelSdkResult {
9798
self.reader.shutdown()
9899
}
100+
101+
/// Shut down pipeline with timeout
102+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
103+
self.reader.shutdown_with_timeout(timeout)
104+
}
99105
}
100106

101107
impl SdkProducer for Pipeline {
@@ -703,6 +709,24 @@ impl Pipelines {
703709
}
704710
}
705711

712+
/// Shut down all pipelines
713+
pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
714+
let mut errs = vec![];
715+
for pipeline in &self.0 {
716+
if let Err(err) = pipeline.shutdown_with_timeout(timeout) {
717+
errs.push(err);
718+
}
719+
}
720+
721+
if errs.is_empty() {
722+
Ok(())
723+
} else {
724+
Err(crate::error::OTelSdkError::InternalFailure(format!(
725+
"{errs:?}"
726+
)))
727+
}
728+
}
729+
706730
/// Shut down all pipelines
707731
pub(crate) fn shutdown(&self) -> OTelSdkResult {
708732
let mut errs = vec![];

0 commit comments

Comments
 (0)