Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

* Implement the `Drain::flush` method added in [slog-rs/slog#349]
* Define minimum supported rust version.
* Setup Github Actions.
* Fixup some minor typos in CHANGELOG.md, including an incorrect release date for 2.8.0.

[slog-rs/slog#349]: https://github.com/slog-rs/slog/pull/349

## 2.8.0 - 2023-08-26

* Call of deprecated `err.description()` replaced with `err.to_string()`.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ default = []
path = "lib.rs"

[dependencies]
slog = "2.1"
slog = { git = "https://github.com/Techcable/slog.git", branch = "feature/simple-flush-method" }
thread_local = "1"
take_mut = "0.2.0"
crossbeam-channel = "0.5"
Expand Down
193 changes: 170 additions & 23 deletions lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use std::{io, thread};

use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use std::sync::{Arc, Condvar, Mutex};
use take_mut::take;

use std::panic::{catch_unwind, AssertUnwindSafe};
Expand Down Expand Up @@ -245,6 +245,12 @@ where
thread_name: Option<String>,
}

struct SpawnedThreadInfo {
sender: Sender<AsyncMsg>,
flush_lock: Arc<Mutex<FlushStatus>>,
flush_cond: Arc<Condvar>,
}

impl<D> AsyncCoreBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
Expand Down Expand Up @@ -287,25 +293,71 @@ where
self
}

fn spawn_thread(self) -> (thread::JoinHandle<()>, Sender<AsyncMsg>) {
fn spawn_thread(self) -> (thread::JoinHandle<()>, SpawnedThreadInfo) {
let (tx, rx) = crossbeam_channel::bounded(self.chan_size);
let mut builder = thread::Builder::new();
if let Some(thread_name) = self.thread_name {
builder = builder.name(thread_name);
}
let flush_lock = Arc::new(Mutex::new(FlushStatus::NotRequested));
let flush_cond = Arc::new(Condvar::new());
let state = SpawnedThreadInfo {
sender: tx,
flush_lock: Arc::clone(&flush_lock),
flush_cond: Arc::clone(&flush_cond),
};
let drain = self.drain;
let join = builder
.spawn(move || {
let drain = AssertUnwindSafe(&drain);
let mut gave_flush_warning = false;
// catching possible unwinding panics which can occur in used inner Drain implementation
if let Err(panic_cause) = catch_unwind(move || loop {
let mut give_flush_warning = |x: &str| {
if !gave_flush_warning {
eprintln!("slog-async: {}", x);
}
gave_flush_warning = true;
};
match rx.recv() {
Ok(AsyncMsg::Record(r)) => {
if r.log_to(&*drain).is_err() {
eprintln!("slog-async failed while writing");
return;
}
}
},
Ok(AsyncMsg::Flush) => {
let status_lock = match flush_lock.lock() {
Err(_e) => {
give_flush_warning("fush lock poisoned");
continue;
},
Ok(s) => s,
};
if !matches!(*status_lock, FlushStatus::Pending) {
flush_cond.notify_all();
drop(status_lock);
continue;
}
drop(status_lock);
let res_status = match drain.flush() {
Ok(()) => FlushStatus::Success,
Err(e) => FlushStatus::Failure(e),
};
let mut status_lock = match flush_lock.lock() {
Err(_e) => {
give_flush_warning("fush lock poisoned");
continue;
},
Ok(s) => s,
};
if !matches!(*status_lock, FlushStatus::Pending) {
give_flush_warning("fush status corrupted");
}
*status_lock = res_status;
flush_cond.notify_all();
drop(status_lock);
},
Ok(AsyncMsg::Finish) => return,
Err(recv_error) => {
eprintln!("slog-async failed while receiving: {recv_error}");
Expand All @@ -318,7 +370,7 @@ where
})
.unwrap();

(join, tx)
(join, state)
}

/// Build `AsyncCore`
Expand All @@ -329,12 +381,14 @@ where
/// Build `AsyncCore`
pub fn build_no_guard(self) -> AsyncCore {
let blocking = self.blocking;
let (join, tx) = self.spawn_thread();
let (join, info) = self.spawn_thread();

AsyncCore {
ref_sender: tx,
ref_sender: info.sender,
tl_sender: thread_local::ThreadLocal::new(),
join: Mutex::new(Some(join)),
flush_lock: info.flush_lock,
flush_cond: info.flush_cond,
blocking,
}
}
Expand All @@ -344,18 +398,20 @@ where
/// See `AsyncGuard` for more information.
pub fn build_with_guard(self) -> (AsyncCore, AsyncGuard) {
let blocking = self.blocking;
let (join, tx) = self.spawn_thread();
let (join, info) = self.spawn_thread();

(
AsyncCore {
ref_sender: tx.clone(),
ref_sender: info.sender.clone(),
tl_sender: thread_local::ThreadLocal::new(),
join: Mutex::new(None),
flush_lock: info.flush_lock,
flush_cond: info.flush_cond,
blocking,
},
AsyncGuard {
join: Some(join),
tx,
tx: info.sender,
},
)
}
Expand Down Expand Up @@ -403,6 +459,14 @@ impl Drop for AsyncGuard {
}
}

#[derive(Debug)]
enum FlushStatus {
NotRequested,
Pending,
Failure(slog::FlushError),
Success,
}

/// Core of `Async` drain
///
/// See `Async` for documentation.
Expand All @@ -418,6 +482,8 @@ pub struct AsyncCore {
tl_sender: thread_local::ThreadLocal<Sender<AsyncMsg>>,
join: Mutex<Option<thread::JoinHandle<()>>>,
blocking: bool,
flush_lock: Arc<Mutex<FlushStatus>>,
flush_cond: Arc<Condvar>,
}

impl AsyncCore {
Expand Down Expand Up @@ -474,8 +540,52 @@ impl Drain for AsyncCore {
) -> AsyncResult<()> {
self.send(AsyncRecord::from(record, logger_values))
}
}

fn flush(&self) -> Result<(), slog::FlushError> {
fn handle_poison(
mut e: std::sync::PoisonError<
std::sync::MutexGuard<'_, FlushStatus>,
>,
) -> std::io::Error {
**e.get_mut() = FlushStatus::NotRequested; // cancel request, allowing retry
std::io::Error::new(std::io::ErrorKind::Other, "mutex poisoned")
}
let sender = self.get_sender().map_err(|_e| {
std::io::Error::new(std::io::ErrorKind::Other, "mutex poisoned")
})?;
let mut status_lock = self.flush_lock.lock().map_err(handle_poison)?;
while !matches!(*status_lock, FlushStatus::NotRequested) {
// another flush is in progress, block until that one succeeds
status_lock =
self.flush_cond.wait(status_lock).map_err(handle_poison)?;
}
assert!(
matches!(*status_lock, FlushStatus::NotRequested),
"{:?}",
*status_lock
);
match sender.send(AsyncMsg::Flush) {
Ok(()) => {}
Err(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"channel disconnected",
)
.into());
}
}
*status_lock = FlushStatus::Pending;
while matches!(*status_lock, FlushStatus::Pending) {
status_lock =
self.flush_cond.wait(status_lock).map_err(handle_poison)?;
}
match std::mem::replace(&mut *status_lock, FlushStatus::NotRequested) {
FlushStatus::NotRequested | FlushStatus::Pending => unreachable!(),
FlushStatus::Failure(cause) => Err(cause),
FlushStatus::Success => Ok(()),
}
}
}
/// Serialized record.
pub struct AsyncRecord {
msg: String,
Expand Down Expand Up @@ -545,6 +655,7 @@ impl AsyncRecord {
enum AsyncMsg {
Record(AsyncRecord),
Finish,
Flush,
}

impl Drop for AsyncCore {
Expand Down Expand Up @@ -796,6 +907,10 @@ impl Drain for Async {

Ok(())
}

fn flush(&self) -> Result<(), slog::FlushError> {
self.core.flush()
}
}

impl Drop for Async {
Expand All @@ -806,7 +921,6 @@ impl Drop for Async {

// }}}


#[cfg(test)]
mod test {
use super::*;
Expand All @@ -815,25 +929,45 @@ mod test {
#[test]
fn integration_test() {
let (mock_drain, mock_drain_rx) = MockDrain::new();
let async_drain = AsyncBuilder::new(mock_drain)
.build();
let slog = slog::Logger::root(async_drain.fuse(), o!("field1" => "value1"));
let async_drain = AsyncBuilder::new(mock_drain).build();
let slog =
slog::Logger::root(async_drain.fuse(), o!("field1" => "value1"));

info!(slog, "Message 1"; "field2" => "value2");
warn!(slog, "Message 2"; "field3" => "value3");
assert_eq!(mock_drain_rx.recv().unwrap(), r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#);
assert_eq!(mock_drain_rx.recv().unwrap(), r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#);
assert_eq!(
mock_drain_rx.recv().unwrap(),
*r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#
);
assert_eq!(
mock_drain_rx.recv().unwrap(),
*r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#
);
slog.flush().unwrap();
assert_eq!(mock_drain_rx.recv().unwrap(), MockMsg::Flush);
}

#[derive(Debug, PartialEq)]
enum MockMsg {
Log(String),
Flush,
}
impl PartialEq<str> for MockMsg {
fn eq(&self, other: &str) -> bool {
match self {
MockMsg::Log(ref msg) => msg == other,
_ => false,
}
}
}


/// Test-helper drain
#[derive(Debug)]
struct MockDrain {
tx: mpsc::Sender<String>,
tx: mpsc::Sender<MockMsg>,
}

impl MockDrain {
fn new() -> (Self, mpsc::Receiver<String>) {
fn new() -> (Self, mpsc::Receiver<MockMsg>) {
let (tx, rx) = mpsc::channel();
(Self { tx }, rx)
}
Expand All @@ -843,14 +977,23 @@ mod test {
type Ok = ();
type Err = slog::Never;

fn log(&self, record: &Record, logger_kv: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
fn log(
&self,
record: &Record,
logger_kv: &OwnedKVList,
) -> Result<Self::Ok, Self::Err> {
let mut serializer = MockSerializer::default();
logger_kv.serialize(record, &mut serializer).unwrap();
record.kv().serialize(record, &mut serializer).unwrap();
let level = record.level().as_short_str();
let msg = record.msg().to_string();
let entry = format!("{} {}: {:?}", level, msg, serializer.kvs);
self.tx.send(entry).unwrap();
self.tx.send(MockMsg::Log(entry)).unwrap();
Ok(())
}

fn flush(&self) -> Result<(), slog::FlushError> {
self.tx.send(MockMsg::Flush).unwrap();
Ok(())
}
}
Expand All @@ -861,7 +1004,11 @@ mod test {
}

impl slog::Serializer for MockSerializer {
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> Result<(), slog::Error> {
fn emit_arguments(
&mut self,
key: Key,
val: &fmt::Arguments,
) -> Result<(), slog::Error> {
self.kvs.push((key.to_string(), val.to_string()));
Ok(())
}
Expand Down
Loading