Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions temporalio/ext/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{any::Any, sync::Arc, time::Duration};

use magnus::{
DataTypeFunctions, Error, Float, Integer, RClass, RHash, RModule, RString, Ruby, StaticSymbol,
Symbol, TryConvert, TypedData, Value, function,
DataTypeFunctions, Error, Float, Integer, RArray, RClass, RHash, RModule, RString, Ruby,
StaticSymbol, Symbol, TryConvert, TypedData, Value, function,
gc::register_mark_object,
method,
prelude::*,
Expand All @@ -13,7 +13,7 @@ use temporalio_common::telemetry::metrics::{
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent,
};

use crate::{ROOT_MOD, error, id, runtime::Runtime, util::SendSyncBoxValue};
use crate::{ROOT_MOD, error, id, runtime::Runtime, util::ThreadSafeBoxValue};

pub fn init(ruby: &Ruby) -> Result<(), Error> {
let root_mod = ruby.get_inner(&ROOT_MOD);
Expand Down Expand Up @@ -269,14 +269,14 @@ fn metric_key_value(k: Value, v: Value) -> Result<metrics::MetricKeyValue, Error

#[derive(Clone, Debug)]
pub struct BufferedMetricRef {
value: Arc<SendSyncBoxValue<Value>>,
value: Arc<ThreadSafeBoxValue<Value>>,
}

impl BufferInstrumentRef for BufferedMetricRef {}

#[derive(Debug)]
struct BufferedMetricAttributes {
value: SendSyncBoxValue<RHash>,
value: ThreadSafeBoxValue<RHash>,
}

impl CustomMetricAttributes for BufferedMetricAttributes {
Expand Down Expand Up @@ -325,12 +325,21 @@ pub fn convert_metric_events(
ruby: &Ruby,
events: Vec<MetricEvent<BufferedMetricRef>>,
durations_as_seconds: bool,
) -> Result<Vec<Value>, Error> {
let temp: Result<Vec<Option<Value>>, Error> = events
.into_iter()
.map(|e| convert_metric_event(ruby, e, durations_as_seconds))
.collect();
Ok(temp?.into_iter().flatten().collect())
) -> Result<RArray, Error> {
// We must use an RArray (not Vec<Value>) to hold intermediate results.
// Ruby's GC scans the native stack but not the Rust heap. A Vec<Value>'s
// backing buffer lives on the Rust heap, so GC cannot see the VALUEs stored
// there. If a subsequent funcall triggers GC while we're still iterating,
// previously created Update objects in the Vec could be collected, leaving
// dangling VALUE pointers that cause a segfault. Pushing into an RArray
// immediately makes each object reachable from a GC-visible root.
let result = ruby.ary_new_capa(events.len());
for event in events {
if let Some(val) = convert_metric_event(ruby, event, durations_as_seconds)? {
result.push(val)?;
}
}
Ok(result)
}

fn convert_metric_event(
Expand Down Expand Up @@ -386,7 +395,7 @@ fn convert_metric_event(
// Put on lazy ref
populate_into
.set(Arc::new(BufferedMetricRef {
value: Arc::new(SendSyncBoxValue::new(val)),
value: Arc::new(ThreadSafeBoxValue::new(val)),
}))
.map_err(|_| error!("Failed setting metric ref"))?;
Ok(None)
Expand Down Expand Up @@ -427,7 +436,7 @@ fn convert_metric_event(
// Put on lazy ref
populate_into
.set(Arc::new(BufferedMetricAttributes {
value: SendSyncBoxValue::new(hash),
value: ThreadSafeBoxValue::new(hash),
}))
.map_err(|_| error!("Failed setting metric attrs"))?;
Ok(None)
Expand Down
3 changes: 1 addition & 2 deletions temporalio/ext/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ impl Runtime {
.metrics_call_buffer
.clone()
.expect("Attempting to retrieve buffered metrics without buffer");
let updates = convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds)?;
Ok(ruby.ary_new_from_values(&updates))
convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds)
}
}

Expand Down
44 changes: 44 additions & 0 deletions temporalio/ext/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::ffi::c_void;
use std::mem::ManuallyDrop;

use magnus::symbol::IntoSymbol;
use magnus::value::{BoxValue, OpaqueId, ReprValue};
Expand Down Expand Up @@ -157,3 +158,46 @@ impl<T: ReprValue> SendSyncBoxValue<T> {
*self.0
}
}

/// Like SendSyncBoxValue but safe to drop from any thread. When dropped on a
/// non-Ruby thread (e.g. a Tokio worker), the BoxValue and its GC registration
/// are intentionally leaked instead of calling rb_gc_unregister_address (which
/// would corrupt Ruby's GC data structures). On a Ruby thread, cleanup proceeds
/// normally.
///
/// Use this for metric buffer objects (instruments, attribute sets) where:
/// - The Rust Core SDK may drop Arc references on Tokio threads
/// - The number of unique objects is bounded (small, acceptable leak)
pub(crate) struct ThreadSafeBoxValue<T: ReprValue>(ManuallyDrop<BoxValue<T>>);

unsafe impl<T: ReprValue> Send for ThreadSafeBoxValue<T> {}
unsafe impl<T: ReprValue> Sync for ThreadSafeBoxValue<T> {}

impl<T: ReprValue> ThreadSafeBoxValue<T> {
pub fn new(val: T) -> Self {
Self(ManuallyDrop::new(BoxValue::new(val)))
}

pub fn value(&self, _: &Ruby) -> T {
**self.0
}
}

impl<T: ReprValue> Drop for ThreadSafeBoxValue<T> {
fn drop(&mut self) {
if Ruby::get().is_ok() {
// On a Ruby thread: safe to call rb_gc_unregister_address
unsafe {
ManuallyDrop::drop(&mut self.0);
}
}
// On a non-Ruby thread: intentionally leak the BoxValue to avoid
// calling rb_gc_unregister_address from a thread unknown to Ruby's VM
}
}

impl<T: ReprValue> std::fmt::Debug for ThreadSafeBoxValue<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ThreadSafeBoxValue").finish()
}
}
Loading