Skip to content
37 changes: 25 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use std::os::raw::c_void;
use std::os::unix::io::{AsRawFd, RawFd as UnixRawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::ptr;
use std::result;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::{mem, ptr, str};

use zmq_sys::{errno, RawFd};

Expand Down Expand Up @@ -149,10 +149,10 @@ impl SocketEvent {
}

/// Flag for socket `send` methods that specifies non-blocking mode.
pub static DONTWAIT: i32 = zmq_sys::ZMQ_DONTWAIT as i32;
pub const DONTWAIT: i32 = zmq_sys::ZMQ_DONTWAIT as i32;
/// Flag for socket `send` methods that specifies that more frames of a
/// multipart message will follow.
pub static SNDMORE: i32 = zmq_sys::ZMQ_SNDMORE as i32;
pub const SNDMORE: i32 = zmq_sys::ZMQ_SNDMORE as i32;

/// Security Mechanism
#[allow(non_camel_case_types)]
Expand Down Expand Up @@ -298,7 +298,7 @@ impl Error {
panic!(
"unknown error [{}]: {}",
x,
str::from_utf8(ffi::CStr::from_ptr(s).to_bytes()).unwrap()
ffi::CStr::from_ptr(s).to_str().unwrap()
)
},
}
Expand All @@ -308,8 +308,7 @@ impl Error {
pub fn message(self) -> &'static str {
unsafe {
let s = zmq_sys::zmq_strerror(self.to_raw());
let v: &'static [u8] = mem::transmute(ffi::CStr::from_ptr(s).to_bytes());
str::from_utf8(v).unwrap()
ffi::CStr::from_ptr(s).to_str().unwrap()
}
}
}
Expand Down Expand Up @@ -371,7 +370,7 @@ pub fn version() -> (i32, i32, i32) {
zmq_sys::zmq_version(&mut major, &mut minor, &mut patch);
}

(major as i32, minor as i32, patch as i32)
(major, minor, patch)
}

struct RawContext {
Expand Down Expand Up @@ -442,7 +441,7 @@ impl Context {
/// Set the size of the ØMQ thread pool to handle I/O operations.
pub fn set_io_threads(&self, value: i32) -> Result<()> {
zmq_try!(unsafe {
zmq_sys::zmq_ctx_set(self.raw.ctx, zmq_sys::ZMQ_IO_THREADS as _, value as i32)
zmq_sys::zmq_ctx_set(self.raw.ctx, zmq_sys::ZMQ_IO_THREADS as _, value)
});
Ok(())
}
Expand Down Expand Up @@ -1272,7 +1271,14 @@ impl fmt::Display for EncodeError {
}
}

impl std::error::Error for EncodeError {}
impl std::error::Error for EncodeError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::BadLength => None,
Self::FromUtf8Error(err) => Some(err),
}
}
}

/// Encode a binary key as Z85 printable text.
///
Expand All @@ -1285,7 +1291,7 @@ pub fn z85_encode(data: &[u8]) -> result::Result<String, EncodeError> {
return Err(EncodeError::BadLength);
}

let len = data.len() * 5 / 4 + 1;
let len = data.len() / 4 * 5 + 1;
let mut dest = vec![0u8; len];

unsafe {
Expand Down Expand Up @@ -1324,7 +1330,14 @@ impl fmt::Display for DecodeError {
}
}

impl std::error::Error for DecodeError {}
impl std::error::Error for DecodeError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::BadLength => None,
Self::NulError(err) => Some(err),
}
}
}

/// Decode a binary key from Z85-encoded text.
///
Expand All @@ -1337,7 +1350,7 @@ pub fn z85_decode(data: &str) -> result::Result<Vec<u8>, DecodeError> {
return Err(DecodeError::BadLength);
}

let len = data.len() * 4 / 5;
let len = data.len() / 5 * 4;
let mut dest = vec![0u8; len];

let c_str = ffi::CString::new(data)?;
Expand Down
100 changes: 79 additions & 21 deletions src/message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use libc::size_t;

use std::cmp::Ordering;
use std::ffi;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::ops::{Deref, DerefMut};
use std::os::raw::c_void;
use std::{ptr, slice, str};
Expand Down Expand Up @@ -38,7 +40,10 @@ impl fmt::Debug for Message {
}

unsafe extern "C" fn drop_msg_data_box(data: *mut c_void, hint: *mut c_void) {
let _ = Box::from_raw(slice::from_raw_parts_mut(data as *mut u8, hint as usize));
drop(Box::from_raw(ptr::slice_from_raw_parts_mut(
data as *mut u8,
hint as usize,
)));
}

impl Message {
Expand Down Expand Up @@ -120,6 +125,38 @@ impl Message {
Self::from(data)
}

/// Returns a raw pointer to the message to be used with ffi calls.
pub fn as_message_ptr(&self) -> *const zmq_sys::zmq_msg_t {
&self.msg
}

/// Returns a raw mutable pointer to the message to be used with ffi calls.
pub fn as_mut_message_ptr(&mut self) -> *mut zmq_sys::zmq_msg_t {
&mut self.msg
}

/// Returns the amount of bytes that are stored in this `Message`.
pub fn len(&self) -> usize {
unsafe { zmq_sys::zmq_msg_size(self.as_message_ptr()) }
}

/// Return `true` is there is at least one byte stored in this `Message`.
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Returns a raw pointer to the buffer of this message.
pub fn as_ptr(&self) -> *const u8 {
let ptr = unsafe { zmq_sys::zmq_msg_data(self.as_message_ptr().cast_mut()) };
ptr as *const u8
}

/// Returns a raw mutable pointer to the buffer of this message.
pub fn as_mut_ptr(&mut self) -> *mut u8 {
let ptr = unsafe { zmq_sys::zmq_msg_data(self.as_mut_message_ptr()) };
ptr as *mut u8
}

/// Return the message content as a string slice if it is valid UTF-8.
pub fn as_str(&self) -> Option<&str> {
str::from_utf8(self).ok()
Expand Down Expand Up @@ -152,43 +189,64 @@ impl Message {
if value.is_null() {
None
} else {
str::from_utf8(unsafe { ffi::CStr::from_ptr(value) }.to_bytes()).ok()
unsafe { ffi::CStr::from_ptr(value) }.to_str().ok()
}
}
}

impl PartialEq for Message {
fn eq(&self, other: &Message) -> bool {
self[..] == other[..]
}
}

impl Eq for Message {}

impl PartialOrd for Message {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for Message {
fn cmp(&self, other: &Self) -> Ordering {
self[..].cmp(&other[..])
}
}

impl Clone for Message {
fn clone(&self) -> Self {
self[..].into()
}
}

impl Default for Message {
fn default() -> Self {
Self::new()
}
}

impl Hash for Message {
fn hash<H: Hasher>(&self, state: &mut H) {
self[..].hash(state);
}
}

impl Deref for Message {
type Target = [u8];

fn deref(&self) -> &[u8] {
// This is safe because we're constraining the slice to the lifetime of
// this message.
unsafe {
let ptr = &self.msg as *const _ as *mut _;
let data = zmq_sys::zmq_msg_data(ptr);
let len = zmq_sys::zmq_msg_size(ptr) as usize;
slice::from_raw_parts(data as *mut u8, len)
}
unsafe { slice::from_raw_parts(self.as_ptr(), self.len()) }
}
}

impl PartialEq for Message {
fn eq(&self, other: &Message) -> bool {
self[..] == other[..]
}
}

impl Eq for Message {}

impl DerefMut for Message {
fn deref_mut(&mut self) -> &mut [u8] {
// This is safe because we're constraining the slice to the lifetime of
// this message.
unsafe {
let data = zmq_sys::zmq_msg_data(&mut self.msg);
let len = zmq_sys::zmq_msg_size(&self.msg) as usize;
slice::from_raw_parts_mut(data as *mut u8, len)
}
unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len()) }
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/sockopt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use libc::{c_int, c_uint, size_t};
use std::os::raw::c_void;
use std::result;
use std::string::FromUtf8Error;
use std::{mem, ptr, str};
use std::{mem, ptr};

use super::{PollEvents, Result};

Expand Down Expand Up @@ -47,8 +47,7 @@ getsockopt_num!(c_uint, u32);
getsockopt_num!(i64, i64);
getsockopt_num!(u64, u64);

pub fn get_bytes(sock: *mut c_void, opt: c_int, size: size_t) -> Result<Vec<u8>> {
let mut size = size;
pub fn get_bytes(sock: *mut c_void, opt: c_int, mut size: size_t) -> Result<Vec<u8>> {
let mut value = vec![0u8; size];

zmq_try!(unsafe {
Expand Down
2 changes: 1 addition & 1 deletion tests/message_from_boxed_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ static A: Allocator = Allocator;
#[test]
fn message_from_boxed_slice() {
let mut b: Box<[u8]> = Box::new([0u8; 42]);
CHECK_PTR.store(b.as_mut_ptr() as *mut u8, Ordering::SeqCst);
CHECK_PTR.store(b.as_mut_ptr(), Ordering::SeqCst);
let _ = zmq::Message::from(b);
assert_eq!(CHECK_PTR.load(Ordering::SeqCst), ptr::null_mut());
}
1 change: 0 additions & 1 deletion tests/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
mod common;

use std::str;
use std::u16;

fn version_ge_4_3() -> bool {
let (major, minor, _) = zmq::version();
Expand Down
2 changes: 1 addition & 1 deletion tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ test!(test_exchanging_multipart, {
let (sender, receiver) = create_socketpair();

// convenience API
sender.send_multipart(&["foo", "bar"], 0).unwrap();
sender.send_multipart(["foo", "bar"], 0).unwrap();
assert_eq!(receiver.recv_multipart(0).unwrap(), vec![b"foo", b"bar"]);

// manually
Expand Down
2 changes: 1 addition & 1 deletion tests/unix/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn poll_worker(_ctx: &zmq::Context, socket: &zmq::Socket) {
}
Some(msg) => {
state.wait(zmq::POLLOUT);
let done = msg.len() == 0;
let done = msg.is_empty();
socket.send(msg, zmq::DONTWAIT).unwrap();
if done {
break;
Expand Down
6 changes: 3 additions & 3 deletions tests/z85.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ fn test_decode_errors() {
}
}

/*
// Disabled because quickcheck doesn't expose gen_range and gen anymore

// Valid input for z85 encoding (i.e. a slice of bytes with its length
// being a multiple of 4)
#[derive(Clone, Debug)]
struct Input(Vec<u8>);

/*
// Disabled because quickcheck doesn't expose gen_range and gen anymore

impl Arbitrary for Input {
fn arbitrary(g: &mut Gen) -> Self {
let len = g.gen_range(0..256) * 4;
Expand Down