diff --git a/examples/ipmpsc-receive.rs b/examples/ipmpsc-receive.rs index 89b6a3d..c4e3e84 100644 --- a/examples/ipmpsc-receive.rs +++ b/examples/ipmpsc-receive.rs @@ -1,7 +1,36 @@ #![deny(warnings)] use clap::{App, Arg}; -use ipmpsc::{Receiver, SharedRingBuffer}; +use ipmpsc::{Receiver, SharedRingBuffer, ShmDeserializer, ShmZeroCopyDeserializer}; +use serde::Deserialize; + +#[derive(Debug)] +pub struct BincodeZeroCopyDeserializer(pub T); + +impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer +where + T: Deserialize<'de>, +{ + type Error = bincode::Error; + + fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } +} + +#[derive(Debug)] +pub struct BincodeDeserializer(pub T); + +impl ShmDeserializer for BincodeDeserializer +where + T: for<'de> Deserialize<'de>, +{ + type Error = bincode::Error; + + fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> std::result::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } +} fn main() -> Result<(), Box> { let matches = App::new("ipmpsc-send") @@ -34,9 +63,13 @@ fn main() -> Result<(), Box> { loop { if zero_copy { - println!("received {:?}", rx.zero_copy_context().recv::<&str>()?); + println!( + "received {:?}", + rx.zero_copy_context() + .recv::>()? + ); } else { - println!("received {:?}", rx.recv::()?); + println!("received {:?}", rx.recv::>()?); } } } diff --git a/examples/ipmpsc-send.rs b/examples/ipmpsc-send.rs index 2c85398..4449271 100644 --- a/examples/ipmpsc-send.rs +++ b/examples/ipmpsc-send.rs @@ -1,9 +1,21 @@ #![deny(warnings)] use clap::{App, Arg}; -use ipmpsc::{Sender, SharedRingBuffer}; +use ipmpsc::{Sender, SharedRingBuffer, ShmSerializer}; +use serde::Serialize; use std::io::{self, BufRead}; +#[derive(Debug)] +pub struct BincodeSerializer(pub T); + +impl ShmSerializer for BincodeSerializer { + type Error = bincode::Error; + + fn serialize(&self) -> std::result::Result, Self::Error> { + Ok(bincode::serialize(&self.0)?) + } +} + fn main() -> Result<(), Box> { let matches = App::new("ipmpsc-send") .about("ipmpsc sender example") @@ -29,7 +41,7 @@ fn main() -> Result<(), Box> { println!("Ready! Enter some lines of text to send them to the receiver."); while handle.read_line(&mut buffer)? > 0 { - tx.send(&buffer)?; + tx.send::>(&BincodeSerializer(&buffer))?; buffer.clear(); } diff --git a/ipc-benchmarks/src/lib.rs b/ipc-benchmarks/src/lib.rs index dcb70da..23cb27e 100644 --- a/ipc-benchmarks/src/lib.rs +++ b/ipc-benchmarks/src/lib.rs @@ -2,9 +2,8 @@ extern crate test; -use serde_derive::{Deserialize, Serialize}; - -use std::time::Duration; +use ipmpsc::{ShmDeserializer, ShmSerializer, ShmZeroCopyDeserializer}; +use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] pub struct YuvFrameInfo { @@ -37,8 +36,50 @@ pub struct OwnedYuvFrame { pub v_pixels: Vec, } +#[derive(Debug)] +pub struct BincodeZeroCopyDeserializer(pub T); + +impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer +where + T: Deserialize<'de>, +{ + type Error = bincode::Error; + + #[inline(always)] + fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } +} + +#[derive(Debug)] +pub struct BincodeDeserializer(pub T); + +impl ShmDeserializer for BincodeDeserializer +where T: for<'de> Deserialize<'de> +{ + type Error = bincode::Error; + + #[inline(always)] + fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> std::result::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } +} + +#[derive(Debug)] +pub struct BincodeSerializer(pub T); + +impl ShmSerializer for BincodeSerializer { + type Error = bincode::Error; + + #[inline(always)] + fn serialize(&self) -> std::result::Result, Self::Error> { + Ok(bincode::serialize(&self.0)?) + } +} + #[cfg(test)] mod tests { + use std::time::Duration; use super::*; use anyhow::{anyhow, Error, Result}; use ipc_channel::ipc; @@ -97,8 +138,8 @@ mod tests { v_pixels: &v_pixels, }; - while exit_rx.try_recv::()?.is_none() { - tx.send_timeout(&frame, Duration::from_millis(100))?; + while exit_rx.try_recv::>()?.is_none() { + tx.send_timeout(&BincodeSerializer(&frame), Duration::from_millis(100))?; } Ok(()) @@ -107,20 +148,20 @@ mod tests { // wait for first frame to arrive { let mut context = rx.zero_copy_context(); - if let Err(e) = context.recv::() { + if let Err(e) = context.recv::>() { panic!("error receiving: {:?}", e); }; } bencher.iter(|| { let mut context = rx.zero_copy_context(); - match context.recv::() { + match context.recv::>() { Err(e) => panic!("error receiving: {:?}", e), Ok(frame) => test::black_box(&frame), }; }); - exit_tx.send(&1_u8)?; + exit_tx.send(&BincodeSerializer(1_u8))?; sender.join().map_err(|e| anyhow!("{:?}", e))??; @@ -147,7 +188,7 @@ mod tests { let exit_buffer = SharedRingBuffer::open(&exit_name)?; let exit_rx = Receiver::new(exit_buffer); - while exit_rx.try_recv::()?.is_none() { + while exit_rx.try_recv::>()?.is_none() { let y_pixels = vec![128_u8; y_stride(width) * height]; let u_pixels = vec![192_u8; uv_stride(width) * height / 2]; let v_pixels = vec![255_u8; uv_stride(width) * height / 2]; @@ -166,7 +207,7 @@ mod tests { }; if let Err(e) = tx.send(frame) { - if exit_rx.try_recv::()?.is_none() { + if exit_rx.try_recv::>()?.is_none() { return Err(Error::from(e)); } else { break; @@ -187,7 +228,7 @@ mod tests { }; }); - exit_tx.send(&1_u8)?; + exit_tx.send(&BincodeSerializer(1_u8))?; while rx.recv().is_ok() {} @@ -239,10 +280,10 @@ mod tests { let size = bincode::serialized_size(&frame).unwrap() as usize; let mut buffer = vec![0_u8; size]; - while exit_rx.try_recv::()?.is_none() { + while exit_rx.try_recv::>()?.is_none() { bincode::serialize_into(&mut buffer as &mut [u8], &frame).unwrap(); if let Err(e) = tx.send(&buffer) { - if exit_rx.try_recv::()?.is_none() { + if exit_rx.try_recv::>()?.is_none() { return Err(Error::from(e)); } else { break; @@ -263,7 +304,7 @@ mod tests { }; }); - exit_tx.send(&1_u8)?; + exit_tx.send(&BincodeSerializer(1_u8))?; while rx.recv().is_ok() {} diff --git a/src/lib.rs b/src/lib.rs index c486847..557418a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ //! Inter-Process Multiple Producer, Single Consumer Channels for Rust //! //! This library provides a type-safe, high-performance inter-process channel implementation based on a shared -//! memory ring buffer. It uses [bincode](https://github.com/TyOverby/bincode) for (de)serialization, including +//! memory ring buffer. It is agnostic to (de)serialization backends and includes the capability for //! zero-copy deserialization, making it ideal for messages with large `&str` or `&[u8]` fields. And it has a name //! that rolls right off the tongue. @@ -9,10 +9,12 @@ use memmap2::MmapMut; use os::{Buffer, Header, View}; -use serde::{Deserialize, Serialize}; use std::{ + array::TryFromSliceError, cell::UnsafeCell, + convert::TryInto, ffi::c_void, + fmt::{Debug, Display}, fs::{File, OpenOptions}, mem, sync::{ @@ -89,9 +91,13 @@ pub enum Error { #[error(transparent)] Io(#[from] std::io::Error), - /// Wrapped bincode error encountered during (de)serialization. + /// Errors from ser/deser operations. #[error(transparent)] - Bincode(#[from] bincode::Error), + Serialize(#[from] SerializeError), + + /// Errors from converting little endian bytes to u32 will be caught here. + #[error(transparent)] + TryFrom(#[from] TryFromSliceError), } /// `ipmpsc`-specific Result type alias @@ -116,6 +122,41 @@ fn map(file: &File) -> Result { } } +/// `ipmpsc`-specific error type +#[derive(ThisError, Debug)] +pub enum SerializeError { + #[error("failed to serialize: {0:?} => {1}@{2}")] + FailedSerialize(String, u32, &'static str), + + #[error("failed to deserialize: {0:?} => {1}@{2}")] + FailedDeserialize(String, u32, &'static str), +} + +pub type SerializeResult = std::result::Result; + +/// Trait apis to decouple the serialization backend from the mechanical send/recv +/// For a writer to work the payload must implement this trait +pub trait ShmSerializer { + type Error: Display; + + fn serialize(&self) -> std::result::Result, Self::Error>; +} + +/// For a reader to work they payload must implement this trait +pub trait ShmDeserializer: Sized { + type Error: Display; + + fn deserialize_from_bytes(bytes: &[u8]) -> std::result::Result; +} + +/// To use the zero_copy_context the payload must implement this trait allowing for more +/// explict lifetimes +pub trait ShmZeroCopyDeserializer<'de>: Sized { + type Error: Display; + + fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result; +} + /// Represents a file-backed shared memory ring buffer, suitable for constructing a /// [`Receiver`](struct.Receiver.html) or [`Sender`](struct.Sender.html). /// @@ -222,11 +263,8 @@ impl Receiver { /// Attempt to read a message without blocking. /// /// This will return `Ok(None)` if there are no messages immediately available. - pub fn try_recv(&self) -> Result> - where - T: for<'de> Deserialize<'de>, - { - Ok(if let Some((value, position)) = self.try_recv_0()? { + pub fn try_recv(&self) -> Result> { + Ok(if let Some((value, position)) = self.try_recv_0::()? { self.seek(position)?; Some(value) @@ -235,7 +273,48 @@ impl Receiver { }) } - fn try_recv_0<'a, T: Deserialize<'a>>(&'a self) -> Result> { + fn try_recv_0<'a, T: ShmDeserializer>(&'a self) -> Result> { + let buffer = self.0 .0.buffer(); + let map = buffer.map(); + + let mut read = buffer.header().read.load(Relaxed); + let write = buffer.header().write.load(Acquire); + + Ok(loop { + if write != read { + let slice = map.as_ref(); + let start = read + 4; + let size = u32::from_le_bytes(slice[read as usize..start as usize].try_into()?); + + if size > 0 { + let end = start + size; + break Some(( + T::deserialize_from_bytes(&slice[start as usize..end as usize]).map_err( + |e| { + Error::Serialize(SerializeError::FailedDeserialize( + e.to_string(), + line!(), + file!(), + )) + }, + )?, + end, + )); + } else if write < read { + read = BEGINNING; + let mut lock = buffer.lock()?; + buffer.header().read.store(read, Relaxed); + lock.notify_all()?; + } else { + return Err(Error::Runtime("corrupt ring buffer".into())); + } + } else { + break None; + } + }) + } + + fn try_zc_recv_0<'a, T: ShmZeroCopyDeserializer<'a>>(&'a self) -> Result> { let buffer = self.0 .0.buffer(); let map = buffer.map(); @@ -246,11 +325,20 @@ impl Receiver { if write != read { let slice = map.as_ref(); let start = read + 4; - let size = bincode::deserialize::(&slice[read as usize..start as usize])?; + let size = u32::from_le_bytes(slice[read as usize..start as usize].try_into()?); + if size > 0 { let end = start + size; break Some(( - bincode::deserialize(&slice[start as usize..end as usize])?, + T::deserialize_from_bytes(&slice[start as usize..end as usize]).map_err( + |e| { + Error::Serialize(SerializeError::FailedDeserialize( + e.to_string(), + line!(), + file!(), + )) + }, + )?, end, )); } else if write < read { @@ -268,11 +356,8 @@ impl Receiver { } /// Attempt to read a message, blocking if necessary until one becomes available. - pub fn recv(&self) -> Result - where - T: for<'de> Deserialize<'de>, - { - let (value, position) = self.recv_timeout_0(None)?.unwrap(); + pub fn recv(&self) -> Result { + let (value, position) = self.recv_timeout_0::(None)?.unwrap(); self.seek(position)?; @@ -281,12 +366,9 @@ impl Receiver { /// Attempt to read a message, blocking for up to the specified duration if necessary until one becomes /// available. - pub fn recv_timeout(&self, timeout: Duration) -> Result> - where - T: for<'de> Deserialize<'de>, - { + pub fn recv_timeout(&self, timeout: Duration) -> Result> { Ok( - if let Some((value, position)) = self.recv_timeout_0(Some(timeout))? { + if let Some((value, position)) = self.recv_timeout_0::(Some(timeout))? { self.seek(position)?; Some(value) @@ -313,20 +395,50 @@ impl Receiver { /// 3. A given [`ZeroCopyContext`](struct.ZeroCopyContext.html) can only be used to deserialize a single /// message before it must be discarded since the read pointer is advanced only when the instance is dropped /// (enforced at run time). - pub fn zero_copy_context(&mut self) -> ZeroCopyContext { + pub fn zero_copy_context(&mut self) -> ZeroCopyContext<'_> { ZeroCopyContext { receiver: self, position: None, } } - fn recv_timeout_0<'a, T: Deserialize<'a>>( + fn recv_timeout_0<'a, T: ShmDeserializer>( &'a self, timeout: Option, ) -> Result> { let mut deadline = None; loop { - if let Some(value_and_position) = self.try_recv_0()? { + if let Some(value_and_position) = self.try_recv_0::()? { + return Ok(Some(value_and_position)); + } + + let buffer = self.0 .0.buffer(); + + let mut now = Instant::now(); + deadline = deadline.or_else(|| timeout.map(|timeout| now + timeout)); + + let read = buffer.header().read.load(Relaxed); + + let mut lock = buffer.lock()?; + while read == buffer.header().write.load(Acquire) { + if deadline.map(|deadline| deadline > now).unwrap_or(true) { + lock.timed_wait(&self.0 .0, deadline.map(|deadline| deadline - now))?; + + now = Instant::now(); + } else { + return Ok(None); + } + } + } + } + + fn recv_zc_timeout_0<'a, T: ShmZeroCopyDeserializer<'a>>( + &'a self, + timeout: Option, + ) -> Result> { + let mut deadline = None; + loop { + if let Some(value_and_position) = self.try_zc_recv_0::()? { return Ok(Some(value_and_position)); } @@ -371,12 +483,12 @@ impl<'a> ZeroCopyContext<'a> { /// This will return `Ok(None)` if there are no messages immediately available. It will return /// `Err(`[`Error::AlreadyReceived`](enum.Error.html#variant.AlreadyReceived)`))` if this instance has already /// been used to read a message. - pub fn try_recv<'b, T: Deserialize<'b>>(&'b mut self) -> Result> { + pub fn try_recv<'b, T: ShmZeroCopyDeserializer<'b>>(&'b mut self) -> Result> { if self.position.is_some() { Err(Error::AlreadyReceived) } else { Ok( - if let Some((value, position)) = self.receiver.try_recv_0()? { + if let Some((value, position)) = self.receiver.try_zc_recv_0::()? { self.position = Some(position); Some(value) } else { @@ -390,8 +502,8 @@ impl<'a> ZeroCopyContext<'a> { /// /// This will return `Err(`[`Error::AlreadyReceived`](enum.Error.html#variant.AlreadyReceived)`))` if this /// instance has already been used to read a message. - pub fn recv<'b, T: Deserialize<'b>>(&'b mut self) -> Result { - let (value, position) = self.receiver.recv_timeout_0(None)?.unwrap(); + pub fn recv<'b, T: ShmZeroCopyDeserializer<'b>>(&'b mut self) -> Result { + let (value, position) = self.receiver.recv_zc_timeout_0::(None)?.unwrap(); self.position = Some(position); @@ -403,7 +515,7 @@ impl<'a> ZeroCopyContext<'a> { /// /// This will return `Err(`[`Error::AlreadyReceived`](enum.Error.html#variant.AlreadyReceived)`))` if this /// instance has already been used to read a message. - pub fn recv_timeout<'b, T: Deserialize<'b>>( + pub fn recv_timeout<'b, T: ShmZeroCopyDeserializer<'b>>( &'b mut self, timeout: Duration, ) -> Result> { @@ -411,7 +523,9 @@ impl<'a> ZeroCopyContext<'a> { Err(Error::AlreadyReceived) } else { Ok( - if let Some((value, position)) = self.receiver.recv_timeout_0(Some(timeout))? { + if let Some((value, position)) = + self.receiver.recv_zc_timeout_0::(Some(timeout))? + { self.position = Some(position); Some(value) } else { @@ -448,8 +562,8 @@ impl Sender { /// `Err(`[`Error::ZeroSizedMessage`](enum.Error.html#variant.ZeroSizedMessage)`))`. If the serialized size is /// greater than the ring buffer capacity, this method will return /// `Err(`[`Error::MessageTooLarge`](enum.Error.html#variant.MessageTooLarge)`))`. - pub fn send(&self, value: &impl Serialize) -> Result<()> { - self.send_timeout_0(value, false, None).map(drop) + pub fn send(&self, value: &T) -> Result<()> { + self.send_timeout_0::(value, false, None).map(drop) } /// Send the specified message, waiting for sufficient contiguous space to become available in the ring buffer @@ -461,8 +575,8 @@ impl Sender { /// `Err(`[`Error::ZeroSizedMessage`](enum.Error.html#variant.ZeroSizedMessage)`))`. If the serialized size is /// greater than the ring buffer capacity, this method will return /// `Err(`[`Error::MessageTooLarge`](enum.Error.html#variant.MessageTooLarge)`))`. - pub fn send_timeout(&self, value: &impl Serialize, timeout: Duration) -> Result { - self.send_timeout_0(value, false, Some(timeout)) + pub fn send_timeout(&self, value: &T, timeout: Duration) -> Result { + self.send_timeout_0::(value, false, Some(timeout)) } /// Send the specified message, waiting for the ring buffer to become completely empty first. @@ -474,20 +588,28 @@ impl Sender { /// `Err(`[`Error::ZeroSizedMessage`](enum.Error.html#variant.ZeroSizedMessage)`))`. If the serialized size /// is greater than the ring buffer capacity, this method will return /// `Err(`[`Error::MessageTooLarge`](enum.Error.html#variant.MessageTooLarge)`))`. - pub fn send_when_empty(&self, value: &impl Serialize) -> Result<()> { - self.send_timeout_0(value, true, None).map(drop) + pub fn send_when_empty(&self, value: &T) -> Result<()> { + self.send_timeout_0::(value, true, None).map(drop) } - fn send_timeout_0( + fn send_timeout_0( &self, - value: &impl Serialize, + value: &impl ShmSerializer, wait_until_empty: bool, timeout: Option, ) -> Result { let buffer = self.0 .0.buffer(); let map = self.0 .0.map_mut(); - let size = bincode::serialized_size(value)? as u32; + let bytes = value.serialize().map_err(|e| { + Error::Serialize(SerializeError::FailedSerialize( + e.to_string(), + line!(), + file!(), + )) + })?; + + let size = bytes.len() as u32; if size == 0 { return Err(Error::ZeroSizedMessage); @@ -512,10 +634,8 @@ impl Sender { } else if read != BEGINNING { assert!(write > BEGINNING); - bincode::serialize_into( - &mut map[write as usize..(write + 4) as usize], - &0_u32, - )?; + map[write as usize..(write + 4) as usize].copy_from_slice(&0_u32.to_le_bytes()); + write = BEGINNING; buffer.header().write.store(write, Release); lock.notify_all()?; @@ -536,10 +656,10 @@ impl Sender { } let start = write + 4; - bincode::serialize_into(&mut map[write as usize..start as usize], &size)?; + map[write as usize..start as usize].copy_from_slice(&size.to_le_bytes()); let end = start + size; - bincode::serialize_into(&mut map[start as usize..end as usize], value)?; + map[start as usize..end as usize].copy_from_slice(&bytes); buffer.header().write.store(end, Release); @@ -554,8 +674,48 @@ mod tests { use super::*; use anyhow::{anyhow, Result}; use proptest::{arbitrary::any, collection::vec, prop_assume, proptest, strategy::Strategy}; + use serde::{Deserialize, Serialize}; use std::thread; + #[derive(Debug)] + pub struct BincodeZeroCopyDeserializer(pub T); + + impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer + where + T: Deserialize<'de>, + { + type Error = bincode::Error; + + fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } + } + + #[derive(Debug)] + pub struct BincodeDeserializer(pub T); + + impl ShmDeserializer for BincodeDeserializer + where + T: for<'de> Deserialize<'de>, + { + type Error = bincode::Error; + + fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> std::result::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } + } + + #[derive(Debug)] + pub struct BincodeSerializer(pub T); + + impl ShmSerializer for BincodeSerializer { + type Error = bincode::Error; + + fn serialize(&self) -> std::result::Result, Self::Error> { + Ok(bincode::serialize(&self.0)?) + } + } + #[derive(Debug)] struct Case { channel_size: u32, @@ -573,8 +733,8 @@ mod tests { let expected = self.data.clone(); thread::spawn(move || -> Result<()> { for item in &expected { - let received = rx.recv::>()?; - assert_eq!(item, &received); + let received = rx.recv::>>()?; + assert_eq!(item, &received.0); } Ok(()) @@ -585,7 +745,7 @@ mod tests { let expected = self.data.len() * self.sender_count as usize; thread::spawn(move || -> Result<()> { for _ in 0..expected { - rx.recv::>()?; + rx.recv::>>()?; } Ok(()) }) @@ -601,7 +761,7 @@ mod tests { let tx = Sender::new(SharedRingBuffer::open(&name)?); for item in data.as_ref() { - tx.send(item)?; + tx.send(&BincodeSerializer(item))?; } Ok(()) @@ -662,17 +822,17 @@ mod tests { let mut rx = Receiver::new(buffer); let tx = Sender::new(SharedRingBuffer::open(&name)?); - tx.send(&sent)?; - tx.send(&42_u32)?; + tx.send(&BincodeSerializer(&sent))?; + tx.send(&BincodeSerializer(42_u32))?; { let mut rx = rx.zero_copy_context(); - let received = rx.recv()?; + let received = rx.recv::>()?; - assert_eq!(sent, received); + assert_eq!(sent, received.0); } - assert_eq!(42_u32, rx.recv()?); + assert_eq!(42_u32, rx.recv::>()?.0); Ok(()) } @@ -685,12 +845,15 @@ mod tests { let sender = os::test::fork(move || { thread::sleep(Duration::from_secs(1)); - tx.send(&42_u32).map_err(anyhow::Error::from) + tx.send(&BincodeSerializer(42_u32)) + .map_err(anyhow::Error::from) })?; loop { - if let Some(value) = rx.recv_timeout(Duration::from_millis(1))? { - assert_eq!(42_u32, value); + if let Some(value) = + rx.recv_timeout::>(Duration::from_millis(1))? + { + assert_eq!(42_u32, value.0); break; } } @@ -707,13 +870,13 @@ mod tests { let tx = Sender::new(SharedRingBuffer::open(&name)?); let sender = os::test::fork(move || loop { - if tx.send_timeout(&42_u32, Duration::from_millis(1))? { + if tx.send_timeout(&BincodeSerializer(42_u32), Duration::from_millis(1))? { break Ok(()); } })?; thread::sleep(Duration::from_secs(1)); - assert_eq!(42_u32, rx.recv()?); + assert_eq!(42_u32, rx.recv::>()?.0); sender.join().map_err(|e| anyhow!("{:?}", e))??; diff --git a/src/posix.rs b/src/posix.rs index 163b4d3..c94054a 100644 --- a/src/posix.rs +++ b/src/posix.rs @@ -106,7 +106,7 @@ impl Buffer { } } - pub fn lock(&self) -> Result { + pub fn lock(&self) -> Result> { Lock::try_new(self) } @@ -122,7 +122,7 @@ impl Buffer { pub struct Lock<'a>(&'a Buffer); impl<'a> Lock<'a> { - pub fn try_new(buffer: &Buffer) -> Result { + pub fn try_new(buffer: &Buffer) -> Result> { unsafe { nonzero!(libc::pthread_mutex_lock(buffer.header().mutex.get()))?; }