From 9392bcfff103993e902cdecb751ba9274bf11e58 Mon Sep 17 00:00:00 2001 From: Techcable Date: Sat, 9 Aug 2025 14:10:52 -0700 Subject: [PATCH] Implement the Drain::flush method Sends a message to the logging thread, then blocks until the logger acknowledges it. Contrast this to the busy-loop approach in #36 Pinned to the branch used in PR #349, so cannot be released until that PR is accepted. Fixes issue #35 --- CHANGELOG.md | 3 + Cargo.toml | 2 +- lib.rs | 193 +++++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 174 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6144ed5..09b18f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +* Implement the `Drain::flush` method added in [slog-rs/slog#349] * Define minimum supported rust version. * Setup Github Actions. * Fixup some minor typos in CHANGELOG.md, including an incorrect release date for 2.8.0. +[slog-rs/slog#349]: https://github.com/slog-rs/slog/pull/349 + ## 2.8.0 - 2023-08-26 * Call of deprecated `err.description()` replaced with `err.to_string()`. diff --git a/Cargo.toml b/Cargo.toml index c7600c8..96afed9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ default = [] path = "lib.rs" [dependencies] -slog = "2.1" +slog = { git = "https://github.com/Techcable/slog.git", branch = "feature/simple-flush-method" } thread_local = "1" take_mut = "0.2.0" crossbeam-channel = "0.5" diff --git a/lib.rs b/lib.rs index 2f7fb28..bc3347b 100644 --- a/lib.rs +++ b/lib.rs @@ -68,7 +68,7 @@ use std::{io, thread}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::sync::Mutex; +use std::sync::{Arc, Condvar, Mutex}; use take_mut::take; use std::panic::{catch_unwind, AssertUnwindSafe}; @@ -245,6 +245,12 @@ where thread_name: Option, } +struct SpawnedThreadInfo { + sender: Sender, + flush_lock: Arc>, + flush_cond: Arc, +} + impl AsyncCoreBuilder where D: slog::Drain + Send + 'static, @@ -287,25 +293,71 @@ where self } - fn spawn_thread(self) -> (thread::JoinHandle<()>, Sender) { + fn spawn_thread(self) -> (thread::JoinHandle<()>, SpawnedThreadInfo) { let (tx, rx) = crossbeam_channel::bounded(self.chan_size); let mut builder = thread::Builder::new(); if let Some(thread_name) = self.thread_name { builder = builder.name(thread_name); } + let flush_lock = Arc::new(Mutex::new(FlushStatus::NotRequested)); + let flush_cond = Arc::new(Condvar::new()); + let state = SpawnedThreadInfo { + sender: tx, + flush_lock: Arc::clone(&flush_lock), + flush_cond: Arc::clone(&flush_cond), + }; let drain = self.drain; let join = builder .spawn(move || { let drain = AssertUnwindSafe(&drain); + let mut gave_flush_warning = false; // catching possible unwinding panics which can occur in used inner Drain implementation if let Err(panic_cause) = catch_unwind(move || loop { + let mut give_flush_warning = |x: &str| { + if !gave_flush_warning { + eprintln!("slog-async: {}", x); + } + gave_flush_warning = true; + }; match rx.recv() { Ok(AsyncMsg::Record(r)) => { if r.log_to(&*drain).is_err() { eprintln!("slog-async failed while writing"); return; } - } + }, + Ok(AsyncMsg::Flush) => { + let status_lock = match flush_lock.lock() { + Err(_e) => { + give_flush_warning("fush lock poisoned"); + continue; + }, + Ok(s) => s, + }; + if !matches!(*status_lock, FlushStatus::Pending) { + flush_cond.notify_all(); + drop(status_lock); + continue; + } + drop(status_lock); + let res_status = match drain.flush() { + Ok(()) => FlushStatus::Success, + Err(e) => FlushStatus::Failure(e), + }; + let mut status_lock = match flush_lock.lock() { + Err(_e) => { + give_flush_warning("fush lock poisoned"); + continue; + }, + Ok(s) => s, + }; + if !matches!(*status_lock, FlushStatus::Pending) { + give_flush_warning("fush status corrupted"); + } + *status_lock = res_status; + flush_cond.notify_all(); + drop(status_lock); + }, Ok(AsyncMsg::Finish) => return, Err(recv_error) => { eprintln!("slog-async failed while receiving: {recv_error}"); @@ -318,7 +370,7 @@ where }) .unwrap(); - (join, tx) + (join, state) } /// Build `AsyncCore` @@ -329,12 +381,14 @@ where /// Build `AsyncCore` pub fn build_no_guard(self) -> AsyncCore { let blocking = self.blocking; - let (join, tx) = self.spawn_thread(); + let (join, info) = self.spawn_thread(); AsyncCore { - ref_sender: tx, + ref_sender: info.sender, tl_sender: thread_local::ThreadLocal::new(), join: Mutex::new(Some(join)), + flush_lock: info.flush_lock, + flush_cond: info.flush_cond, blocking, } } @@ -344,18 +398,20 @@ where /// See `AsyncGuard` for more information. pub fn build_with_guard(self) -> (AsyncCore, AsyncGuard) { let blocking = self.blocking; - let (join, tx) = self.spawn_thread(); + let (join, info) = self.spawn_thread(); ( AsyncCore { - ref_sender: tx.clone(), + ref_sender: info.sender.clone(), tl_sender: thread_local::ThreadLocal::new(), join: Mutex::new(None), + flush_lock: info.flush_lock, + flush_cond: info.flush_cond, blocking, }, AsyncGuard { join: Some(join), - tx, + tx: info.sender, }, ) } @@ -403,6 +459,14 @@ impl Drop for AsyncGuard { } } +#[derive(Debug)] +enum FlushStatus { + NotRequested, + Pending, + Failure(slog::FlushError), + Success, +} + /// Core of `Async` drain /// /// See `Async` for documentation. @@ -418,6 +482,8 @@ pub struct AsyncCore { tl_sender: thread_local::ThreadLocal>, join: Mutex>>, blocking: bool, + flush_lock: Arc>, + flush_cond: Arc, } impl AsyncCore { @@ -474,8 +540,52 @@ impl Drain for AsyncCore { ) -> AsyncResult<()> { self.send(AsyncRecord::from(record, logger_values)) } -} + fn flush(&self) -> Result<(), slog::FlushError> { + fn handle_poison( + mut e: std::sync::PoisonError< + std::sync::MutexGuard<'_, FlushStatus>, + >, + ) -> std::io::Error { + **e.get_mut() = FlushStatus::NotRequested; // cancel request, allowing retry + std::io::Error::new(std::io::ErrorKind::Other, "mutex poisoned") + } + let sender = self.get_sender().map_err(|_e| { + std::io::Error::new(std::io::ErrorKind::Other, "mutex poisoned") + })?; + let mut status_lock = self.flush_lock.lock().map_err(handle_poison)?; + while !matches!(*status_lock, FlushStatus::NotRequested) { + // another flush is in progress, block until that one succeeds + status_lock = + self.flush_cond.wait(status_lock).map_err(handle_poison)?; + } + assert!( + matches!(*status_lock, FlushStatus::NotRequested), + "{:?}", + *status_lock + ); + match sender.send(AsyncMsg::Flush) { + Ok(()) => {} + Err(_) => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "channel disconnected", + ) + .into()); + } + } + *status_lock = FlushStatus::Pending; + while matches!(*status_lock, FlushStatus::Pending) { + status_lock = + self.flush_cond.wait(status_lock).map_err(handle_poison)?; + } + match std::mem::replace(&mut *status_lock, FlushStatus::NotRequested) { + FlushStatus::NotRequested | FlushStatus::Pending => unreachable!(), + FlushStatus::Failure(cause) => Err(cause), + FlushStatus::Success => Ok(()), + } + } +} /// Serialized record. pub struct AsyncRecord { msg: String, @@ -545,6 +655,7 @@ impl AsyncRecord { enum AsyncMsg { Record(AsyncRecord), Finish, + Flush, } impl Drop for AsyncCore { @@ -796,6 +907,10 @@ impl Drain for Async { Ok(()) } + + fn flush(&self) -> Result<(), slog::FlushError> { + self.core.flush() + } } impl Drop for Async { @@ -806,7 +921,6 @@ impl Drop for Async { // }}} - #[cfg(test)] mod test { use super::*; @@ -815,25 +929,45 @@ mod test { #[test] fn integration_test() { let (mock_drain, mock_drain_rx) = MockDrain::new(); - let async_drain = AsyncBuilder::new(mock_drain) - .build(); - let slog = slog::Logger::root(async_drain.fuse(), o!("field1" => "value1")); + let async_drain = AsyncBuilder::new(mock_drain).build(); + let slog = + slog::Logger::root(async_drain.fuse(), o!("field1" => "value1")); info!(slog, "Message 1"; "field2" => "value2"); warn!(slog, "Message 2"; "field3" => "value3"); - assert_eq!(mock_drain_rx.recv().unwrap(), r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#); - assert_eq!(mock_drain_rx.recv().unwrap(), r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#); + assert_eq!( + mock_drain_rx.recv().unwrap(), + *r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"# + ); + assert_eq!( + mock_drain_rx.recv().unwrap(), + *r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"# + ); + slog.flush().unwrap(); + assert_eq!(mock_drain_rx.recv().unwrap(), MockMsg::Flush); + } + + #[derive(Debug, PartialEq)] + enum MockMsg { + Log(String), + Flush, + } + impl PartialEq for MockMsg { + fn eq(&self, other: &str) -> bool { + match self { + MockMsg::Log(ref msg) => msg == other, + _ => false, + } + } } - - /// Test-helper drain #[derive(Debug)] struct MockDrain { - tx: mpsc::Sender, + tx: mpsc::Sender, } impl MockDrain { - fn new() -> (Self, mpsc::Receiver) { + fn new() -> (Self, mpsc::Receiver) { let (tx, rx) = mpsc::channel(); (Self { tx }, rx) } @@ -843,14 +977,23 @@ mod test { type Ok = (); type Err = slog::Never; - fn log(&self, record: &Record, logger_kv: &OwnedKVList) -> Result { + fn log( + &self, + record: &Record, + logger_kv: &OwnedKVList, + ) -> Result { let mut serializer = MockSerializer::default(); logger_kv.serialize(record, &mut serializer).unwrap(); record.kv().serialize(record, &mut serializer).unwrap(); let level = record.level().as_short_str(); let msg = record.msg().to_string(); let entry = format!("{} {}: {:?}", level, msg, serializer.kvs); - self.tx.send(entry).unwrap(); + self.tx.send(MockMsg::Log(entry)).unwrap(); + Ok(()) + } + + fn flush(&self) -> Result<(), slog::FlushError> { + self.tx.send(MockMsg::Flush).unwrap(); Ok(()) } } @@ -861,7 +1004,11 @@ mod test { } impl slog::Serializer for MockSerializer { - fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> Result<(), slog::Error> { + fn emit_arguments( + &mut self, + key: Key, + val: &fmt::Arguments, + ) -> Result<(), slog::Error> { self.kvs.push((key.to_string(), val.to_string())); Ok(()) }