Skip to content

Expose the ability to have zero allocation sends. #4802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,8 @@ test_apps += tests/test_poller \
tests/test_hiccup_msg \
tests/test_zmq_ppoll_fd \
tests/test_xsub_verbose \
tests/test_pubsub_topics_count
tests/test_pubsub_topics_count \
tests/test_msg_ffn_external_storage

tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
Expand Down Expand Up @@ -1149,6 +1150,10 @@ tests_test_pubsub_topics_count_SOURCES = tests/test_pubsub_topics_count.cpp
tests_test_pubsub_topics_count_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_pubsub_topics_count_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

tests_test_msg_ffn_external_storage_SOURCES = tests/test_msg_ffn_external_storage.cpp
tests_test_msg_ffn_external_storage_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_msg_ffn_external_storage_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

if HAVE_FORK
test_apps += tests/test_zmq_ppoll_signals

Expand Down
11 changes: 11 additions & 0 deletions builds/gyp/project-tests.gypi
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@
'libzmq'
],
},
{
'target_name': 'test_msg_ffn_external_storage',
'type': 'executable',
'sources': [
'../../tests/test_msg_ffn_external_storage.cpp',
'../../tests/testutil.hpp'
],
'dependencies': [
'libzmq'
],
},
{
'target_name': 'test_msg_init',
'type': 'executable',
Expand Down
1 change: 1 addition & 0 deletions builds/gyp/project-tests.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<test name = "test_invalid_rep" />
<test name = "test_msg_flags" />
<test name = "test_msg_ffn" />
<test name = "test_msg_ffn_external_storage" />
<test name = "test_msg_init" />
<test name = "test_connect_resolve" />
<test name = "test_immediate" />
Expand Down
24 changes: 24 additions & 0 deletions include/zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,30 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg);
ZMQ_EXPORT int
zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_);

/* Draft Msg control block type for init_external_storage. */
typedef struct zmq_msg_content_t
{
#if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_ARM64))
__declspec (align (8)) unsigned char _[64];
#elif defined(_MSC_VER) \
&& (defined(_M_IX86) || defined(_M_ARM_ARMV7VE) || defined(_M_ARM))
__declspec (align (4)) unsigned char _[64];
#elif defined(__GNUC__) || defined(__INTEL_COMPILER) \
|| (defined(__SUNPRO_C) && __SUNPRO_C >= 0x590) \
|| (defined(__SUNPRO_CC) && __SUNPRO_CC >= 0x590)
unsigned char _[64] __attribute__ ((aligned (sizeof (void *))));
#else
unsigned char _[64];
#endif
} zmq_msg_content_t;

ZMQ_EXPORT int zmq_msg_init_external_storage (zmq_msg_t *msg_,
zmq_msg_content_t *content_,
void *data_,
size_t size_,
zmq_free_fn *ffn_,
void *hint_);

/* DRAFT Msg property names. */
#define ZMQ_MSG_PROPERTY_ROUTING_ID "Routing-Id"
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
Expand Down
18 changes: 18 additions & 0 deletions src/zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ struct iovec
typedef char
check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];

// Compile time check whether msg_t::content_t fits into zmq_msg_content_t.
// It is expected to be larger.
typedef char check_msg_content_size
[sizeof (zmq::msg_t::content_t) <= sizeof (zmq_msg_content_t) ? 1 : -1];

void zmq_version (int *major_, int *minor_, int *patch_)
{
Expand Down Expand Up @@ -606,6 +610,20 @@ int zmq_msg_init_data (
->init_data (data_, size_, ffn_, hint_);
}

int zmq_msg_init_external_storage (zmq_msg_t *msg_,
zmq_msg_content_t *content_,
void *data_,
size_t size_,
zmq_free_fn *ffn_,
void *hint_)
{
return (reinterpret_cast<zmq::msg_t *> (msg_))
->init_external_storage (
reinterpret_cast<zmq::msg_t::content_t *> (content_), data_, size_,
ffn_, hint_);
}


int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
Expand Down
24 changes: 24 additions & 0 deletions src/zmq_draft.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,30 @@ int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_);
const char *zmq_msg_group (zmq_msg_t *msg_);
int zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_);

/* Draft Msg control block type for init_external_storage. */
typedef struct zmq_msg_content_t
{
#if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_ARM64))
__declspec (align (8)) unsigned char _[64];
#elif defined(_MSC_VER) \
&& (defined(_M_IX86) || defined(_M_ARM_ARMV7VE) || defined(_M_ARM))
__declspec (align (4)) unsigned char _[64];
#elif defined(__GNUC__) || defined(__INTEL_COMPILER) \
|| (defined(__SUNPRO_C) && __SUNPRO_C >= 0x590) \
|| (defined(__SUNPRO_CC) && __SUNPRO_CC >= 0x590)
unsigned char _[64] __attribute__ ((aligned (sizeof (void *))));
#else
unsigned char _[64];
#endif
} zmq_msg_content_t;

int zmq_msg_init_external_storage (zmq_msg_t *msg_,
zmq_msg_content_t *content_,
void *data_,
size_t size_,
zmq_free_fn *ffn_,
void *hint_);

/* DRAFT Msg property names. */
#define ZMQ_MSG_PROPERTY_ROUTING_ID "Routing-Id"
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ if(ENABLE_DRAFTS)
test_zmq_ppoll_fd
test_xsub_verbose
test_pubsub_topics_count
test_msg_ffn_external_storage
)

if(HAVE_FORK)
Expand Down
102 changes: 102 additions & 0 deletions tests/test_msg_ffn_external_storage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/* SPDX-License-Identifier: MPL-2.0 */

#include "testutil.hpp"
#include "testutil_unity.hpp"

#include <string.h>

SETUP_TEARDOWN_TESTCONTEXT

void ffn (void *data_, void *hint_)
{
// Signal that ffn has been called by writing "freed" to hint
(void) data_; // Suppress 'unused' warnings at compile time
memcpy (hint_, (void *) "freed", 5);
}

void test_msg_init_ffn_external_storage ()
{
// Create the infrastructure
char my_endpoint[MAX_SOCKET_STRING];

void *router = test_context_socket (ZMQ_ROUTER);
bind_loopback_ipv4 (router, my_endpoint, sizeof my_endpoint);

void *dealer = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint));

// Test that creating and closing a message triggers ffn
zmq_msg_content_t *content = new zmq_msg_content_t;
zmq_msg_t msg;
char hint[5];
char data[255];
memset (data, 0, 255);
memcpy (data, (void *) "data", 4);
memcpy (hint, (void *) "hint", 4);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage (
&msg, content, (void *) data, 255, ffn, (void *) hint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

msleep (SETTLE_TIME);
TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5);
memcpy (hint, (void *) "hint", 4);

// Making and closing a copy triggers ffn
zmq_msg_t msg2;
zmq_msg_init (&msg2);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage (
&msg, content, (void *) data, 255, ffn, (void *) hint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_copy (&msg2, &msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg2));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

msleep (SETTLE_TIME);
TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5);
memcpy (hint, (void *) "hint", 4);

// Test that sending a message triggers ffn
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage (
&msg, content, (void *) data, 255, ffn, (void *) hint));

zmq_msg_send (&msg, dealer, 0);
char buf[255];
TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (router, buf, 255, 0));
TEST_ASSERT_EQUAL_INT (255, zmq_recv (router, buf, 255, 0));
TEST_ASSERT_EQUAL_STRING_LEN (data, buf, 4);

msleep (SETTLE_TIME);
TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5);
memcpy (hint, (void *) "hint", 4);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

// Sending a copy of a message triggers ffn
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg2));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage (
&msg, content, (void *) data, 255, ffn, (void *) hint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_copy (&msg2, &msg));

zmq_msg_send (&msg, dealer, 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (router, buf, 255, 0));
TEST_ASSERT_EQUAL_INT (255, zmq_recv (router, buf, 255, 0));
TEST_ASSERT_EQUAL_STRING_LEN (data, buf, 4);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg2));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

msleep (SETTLE_TIME);
TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5);

// Deallocate the infrastructure.
test_context_socket_close (router);
test_context_socket_close (dealer);

delete content;
}

int main (void)
{
setup_test_environment ();

UNITY_BEGIN ();
RUN_TEST (test_msg_init_ffn_external_storage);
return UNITY_END ();
}
Loading