Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions examples/ipmpsc-receive.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,36 @@
#![deny(warnings)]

use clap::{App, Arg};
use ipmpsc::{Receiver, SharedRingBuffer};
use ipmpsc::{Receiver, SharedRingBuffer, ShmDeserializer, ShmZeroCopyDeserializer};
use serde::Deserialize;

#[derive(Debug)]
pub struct BincodeZeroCopyDeserializer<T>(pub T);

impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer<T>
where
T: Deserialize<'de>,
{
type Error = bincode::Error;

fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result<Self, Self::Error> {
Ok(Self(bincode::deserialize::<T>(bytes)?))
}
}

#[derive(Debug)]
pub struct BincodeDeserializer<T>(pub T);

impl<T> ShmDeserializer for BincodeDeserializer<T>
where
T: for<'de> Deserialize<'de>,
{
type Error = bincode::Error;

fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> std::result::Result<Self, Self::Error> {
Ok(Self(bincode::deserialize::<T>(bytes)?))
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let matches = App::new("ipmpsc-send")
Expand Down Expand Up @@ -34,9 +63,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

loop {
if zero_copy {
println!("received {:?}", rx.zero_copy_context().recv::<&str>()?);
println!(
"received {:?}",
rx.zero_copy_context()
.recv::<BincodeZeroCopyDeserializer<&str>>()?
);
} else {
println!("received {:?}", rx.recv::<String>()?);
println!("received {:?}", rx.recv::<BincodeDeserializer<String>>()?);
}
}
}
16 changes: 14 additions & 2 deletions examples/ipmpsc-send.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
#![deny(warnings)]

use clap::{App, Arg};
use ipmpsc::{Sender, SharedRingBuffer};
use ipmpsc::{Sender, SharedRingBuffer, ShmSerializer};
use serde::Serialize;
use std::io::{self, BufRead};

#[derive(Debug)]
pub struct BincodeSerializer<T: Serialize>(pub T);

impl<T: Serialize> ShmSerializer for BincodeSerializer<T> {
type Error = bincode::Error;

fn serialize(&self) -> std::result::Result<Vec<u8>, Self::Error> {
Ok(bincode::serialize(&self.0)?)
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let matches = App::new("ipmpsc-send")
.about("ipmpsc sender example")
Expand All @@ -29,7 +41,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Ready! Enter some lines of text to send them to the receiver.");

while handle.read_line(&mut buffer)? > 0 {
tx.send(&buffer)?;
tx.send::<BincodeSerializer<&String>>(&BincodeSerializer(&buffer))?;
buffer.clear();
}

Expand Down
69 changes: 55 additions & 14 deletions ipc-benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

extern crate test;

use serde_derive::{Deserialize, Serialize};

use std::time::Duration;
use ipmpsc::{ShmDeserializer, ShmSerializer, ShmZeroCopyDeserializer};
use serde::{Deserialize, Serialize};

#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub struct YuvFrameInfo {
Expand Down Expand Up @@ -37,8 +36,50 @@ pub struct OwnedYuvFrame {
pub v_pixels: Vec<u8>,
}

#[derive(Debug)]
pub struct BincodeZeroCopyDeserializer<T>(pub T);

impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer<T>
where
T: Deserialize<'de>,
{
type Error = bincode::Error;

#[inline(always)]
fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result<Self, Self::Error> {
Ok(Self(bincode::deserialize::<T>(bytes)?))
}
}

#[derive(Debug)]
pub struct BincodeDeserializer<T>(pub T);

impl<T> ShmDeserializer for BincodeDeserializer<T>
where T: for<'de> Deserialize<'de>
{
type Error = bincode::Error;

#[inline(always)]
fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> std::result::Result<Self, Self::Error> {
Ok(Self(bincode::deserialize::<T>(bytes)?))
}
}

#[derive(Debug)]
pub struct BincodeSerializer<T: Serialize>(pub T);

impl<T: Serialize> ShmSerializer for BincodeSerializer<T> {
type Error = bincode::Error;

#[inline(always)]
fn serialize(&self) -> std::result::Result<Vec<u8>, Self::Error> {
Ok(bincode::serialize(&self.0)?)
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use anyhow::{anyhow, Error, Result};
use ipc_channel::ipc;
Expand Down Expand Up @@ -97,8 +138,8 @@ mod tests {
v_pixels: &v_pixels,
};

while exit_rx.try_recv::<u8>()?.is_none() {
tx.send_timeout(&frame, Duration::from_millis(100))?;
while exit_rx.try_recv::<BincodeDeserializer<u8>>()?.is_none() {
tx.send_timeout(&BincodeSerializer(&frame), Duration::from_millis(100))?;
}

Ok(())
Expand All @@ -107,20 +148,20 @@ mod tests {
// wait for first frame to arrive
{
let mut context = rx.zero_copy_context();
if let Err(e) = context.recv::<YuvFrame>() {
if let Err(e) = context.recv::<BincodeZeroCopyDeserializer<YuvFrame>>() {
panic!("error receiving: {:?}", e);
};
}

bencher.iter(|| {
let mut context = rx.zero_copy_context();
match context.recv::<YuvFrame>() {
match context.recv::<BincodeZeroCopyDeserializer<YuvFrame>>() {
Err(e) => panic!("error receiving: {:?}", e),
Ok(frame) => test::black_box(&frame),
};
});

exit_tx.send(&1_u8)?;
exit_tx.send(&BincodeSerializer(1_u8))?;

sender.join().map_err(|e| anyhow!("{:?}", e))??;

Expand All @@ -147,7 +188,7 @@ mod tests {
let exit_buffer = SharedRingBuffer::open(&exit_name)?;
let exit_rx = Receiver::new(exit_buffer);

while exit_rx.try_recv::<u8>()?.is_none() {
while exit_rx.try_recv::<BincodeDeserializer<u8>>()?.is_none() {
let y_pixels = vec![128_u8; y_stride(width) * height];
let u_pixels = vec![192_u8; uv_stride(width) * height / 2];
let v_pixels = vec![255_u8; uv_stride(width) * height / 2];
Expand All @@ -166,7 +207,7 @@ mod tests {
};

if let Err(e) = tx.send(frame) {
if exit_rx.try_recv::<u8>()?.is_none() {
if exit_rx.try_recv::<BincodeDeserializer<u8>>()?.is_none() {
return Err(Error::from(e));
} else {
break;
Expand All @@ -187,7 +228,7 @@ mod tests {
};
});

exit_tx.send(&1_u8)?;
exit_tx.send(&BincodeSerializer(1_u8))?;

while rx.recv().is_ok() {}

Expand Down Expand Up @@ -239,10 +280,10 @@ mod tests {
let size = bincode::serialized_size(&frame).unwrap() as usize;
let mut buffer = vec![0_u8; size];

while exit_rx.try_recv::<u8>()?.is_none() {
while exit_rx.try_recv::<BincodeDeserializer<u8>>()?.is_none() {
bincode::serialize_into(&mut buffer as &mut [u8], &frame).unwrap();
if let Err(e) = tx.send(&buffer) {
if exit_rx.try_recv::<u8>()?.is_none() {
if exit_rx.try_recv::<BincodeDeserializer<u8>>()?.is_none() {
return Err(Error::from(e));
} else {
break;
Expand All @@ -263,7 +304,7 @@ mod tests {
};
});

exit_tx.send(&1_u8)?;
exit_tx.send(&BincodeSerializer(1_u8))?;

while rx.recv().is_ok() {}

Expand Down
Loading