diff --git a/Makefile.am b/Makefile.am index 19aa168096..fd45bf0764 100755 --- a/Makefile.am +++ b/Makefile.am @@ -1067,7 +1067,8 @@ test_apps += tests/test_poller \ tests/test_hiccup_msg \ tests/test_zmq_ppoll_fd \ tests/test_xsub_verbose \ - tests/test_pubsub_topics_count + tests/test_pubsub_topics_count \ + tests/test_msg_ffn_external_storage tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la @@ -1149,6 +1150,10 @@ tests_test_pubsub_topics_count_SOURCES = tests/test_pubsub_topics_count.cpp tests_test_pubsub_topics_count_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_pubsub_topics_count_CPPFLAGS = ${TESTUTIL_CPPFLAGS} +tests_test_msg_ffn_external_storage_SOURCES = tests/test_msg_ffn_external_storage.cpp +tests_test_msg_ffn_external_storage_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_msg_ffn_external_storage_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + if HAVE_FORK test_apps += tests/test_zmq_ppoll_signals diff --git a/builds/gyp/project-tests.gypi b/builds/gyp/project-tests.gypi index cd7f93e2fa..e5e5e8b76f 100644 --- a/builds/gyp/project-tests.gypi +++ b/builds/gyp/project-tests.gypi @@ -132,6 +132,17 @@ 'libzmq' ], }, + { + 'target_name': 'test_msg_ffn_external_storage', + 'type': 'executable', + 'sources': [ + '../../tests/test_msg_ffn_external_storage.cpp', + '../../tests/testutil.hpp' + ], + 'dependencies': [ + 'libzmq' + ], + }, { 'target_name': 'test_msg_init', 'type': 'executable', diff --git a/builds/gyp/project-tests.xml b/builds/gyp/project-tests.xml index 9a86391956..8aca763b33 100644 --- a/builds/gyp/project-tests.xml +++ b/builds/gyp/project-tests.xml @@ -11,6 +11,7 @@ + diff --git a/include/zmq.h b/include/zmq.h index 4b66d42f5f..227995c0ab 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -701,6 +701,30 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_); +/* Draft Msg control block type for init_external_storage. */ +typedef struct zmq_msg_content_t +{ +#if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_ARM64)) + __declspec (align (8)) unsigned char _[64]; +#elif defined(_MSC_VER) \ + && (defined(_M_IX86) || defined(_M_ARM_ARMV7VE) || defined(_M_ARM)) + __declspec (align (4)) unsigned char _[64]; +#elif defined(__GNUC__) || defined(__INTEL_COMPILER) \ + || (defined(__SUNPRO_C) && __SUNPRO_C >= 0x590) \ + || (defined(__SUNPRO_CC) && __SUNPRO_CC >= 0x590) + unsigned char _[64] __attribute__ ((aligned (sizeof (void *)))); +#else + unsigned char _[64]; +#endif +} zmq_msg_content_t; + +ZMQ_EXPORT int zmq_msg_init_external_storage (zmq_msg_t *msg_, + zmq_msg_content_t *content_, + void *data_, + size_t size_, + zmq_free_fn *ffn_, + void *hint_); + /* DRAFT Msg property names. */ #define ZMQ_MSG_PROPERTY_ROUTING_ID "Routing-Id" #define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type" diff --git a/src/zmq.cpp b/src/zmq.cpp index e0090dcde8..4c88d09fd8 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -84,6 +84,10 @@ struct iovec typedef char check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1]; +// Compile time check whether msg_t::content_t fits into zmq_msg_content_t. +// It is expected to be larger. +typedef char check_msg_content_size + [sizeof (zmq::msg_t::content_t) <= sizeof (zmq_msg_content_t) ? 1 : -1]; void zmq_version (int *major_, int *minor_, int *patch_) { @@ -606,6 +610,20 @@ int zmq_msg_init_data ( ->init_data (data_, size_, ffn_, hint_); } +int zmq_msg_init_external_storage (zmq_msg_t *msg_, + zmq_msg_content_t *content_, + void *data_, + size_t size_, + zmq_free_fn *ffn_, + void *hint_) +{ + return (reinterpret_cast (msg_)) + ->init_external_storage ( + reinterpret_cast (content_), data_, size_, + ffn_, hint_); +} + + int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_) { zmq::socket_base_t *s = as_socket_base_t (s_); diff --git a/src/zmq_draft.h b/src/zmq_draft.h index ce5a04dd17..fe0d361d1a 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -91,6 +91,30 @@ int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_); const char *zmq_msg_group (zmq_msg_t *msg_); int zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_); +/* Draft Msg control block type for init_external_storage. */ +typedef struct zmq_msg_content_t +{ +#if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_ARM64)) + __declspec (align (8)) unsigned char _[64]; +#elif defined(_MSC_VER) \ + && (defined(_M_IX86) || defined(_M_ARM_ARMV7VE) || defined(_M_ARM)) + __declspec (align (4)) unsigned char _[64]; +#elif defined(__GNUC__) || defined(__INTEL_COMPILER) \ + || (defined(__SUNPRO_C) && __SUNPRO_C >= 0x590) \ + || (defined(__SUNPRO_CC) && __SUNPRO_CC >= 0x590) + unsigned char _[64] __attribute__ ((aligned (sizeof (void *)))); +#else + unsigned char _[64]; +#endif +} zmq_msg_content_t; + +int zmq_msg_init_external_storage (zmq_msg_t *msg_, + zmq_msg_content_t *content_, + void *data_, + size_t size_, + zmq_free_fn *ffn_, + void *hint_); + /* DRAFT Msg property names. */ #define ZMQ_MSG_PROPERTY_ROUTING_ID "Routing-Id" #define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type" diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eb59de29f9..0eef7d5d34 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -175,6 +175,7 @@ if(ENABLE_DRAFTS) test_zmq_ppoll_fd test_xsub_verbose test_pubsub_topics_count + test_msg_ffn_external_storage ) if(HAVE_FORK) diff --git a/tests/test_msg_ffn_external_storage.cpp b/tests/test_msg_ffn_external_storage.cpp new file mode 100644 index 0000000000..abbe2f5b15 --- /dev/null +++ b/tests/test_msg_ffn_external_storage.cpp @@ -0,0 +1,102 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include + +SETUP_TEARDOWN_TESTCONTEXT + +void ffn (void *data_, void *hint_) +{ + // Signal that ffn has been called by writing "freed" to hint + (void) data_; // Suppress 'unused' warnings at compile time + memcpy (hint_, (void *) "freed", 5); +} + +void test_msg_init_ffn_external_storage () +{ + // Create the infrastructure + char my_endpoint[MAX_SOCKET_STRING]; + + void *router = test_context_socket (ZMQ_ROUTER); + bind_loopback_ipv4 (router, my_endpoint, sizeof my_endpoint); + + void *dealer = test_context_socket (ZMQ_DEALER); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint)); + + // Test that creating and closing a message triggers ffn + zmq_msg_content_t *content = new zmq_msg_content_t; + zmq_msg_t msg; + char hint[5]; + char data[255]; + memset (data, 0, 255); + memcpy (data, (void *) "data", 4); + memcpy (hint, (void *) "hint", 4); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage ( + &msg, content, (void *) data, 255, ffn, (void *) hint)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + + msleep (SETTLE_TIME); + TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5); + memcpy (hint, (void *) "hint", 4); + + // Making and closing a copy triggers ffn + zmq_msg_t msg2; + zmq_msg_init (&msg2); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage ( + &msg, content, (void *) data, 255, ffn, (void *) hint)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_copy (&msg2, &msg)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg2)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + + msleep (SETTLE_TIME); + TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5); + memcpy (hint, (void *) "hint", 4); + + // Test that sending a message triggers ffn + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage ( + &msg, content, (void *) data, 255, ffn, (void *) hint)); + + zmq_msg_send (&msg, dealer, 0); + char buf[255]; + TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (router, buf, 255, 0)); + TEST_ASSERT_EQUAL_INT (255, zmq_recv (router, buf, 255, 0)); + TEST_ASSERT_EQUAL_STRING_LEN (data, buf, 4); + + msleep (SETTLE_TIME); + TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5); + memcpy (hint, (void *) "hint", 4); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + + // Sending a copy of a message triggers ffn + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg2)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage ( + &msg, content, (void *) data, 255, ffn, (void *) hint)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_copy (&msg2, &msg)); + + zmq_msg_send (&msg, dealer, 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (router, buf, 255, 0)); + TEST_ASSERT_EQUAL_INT (255, zmq_recv (router, buf, 255, 0)); + TEST_ASSERT_EQUAL_STRING_LEN (data, buf, 4); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg2)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + + msleep (SETTLE_TIME); + TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5); + + // Deallocate the infrastructure. + test_context_socket_close (router); + test_context_socket_close (dealer); + + delete content; +} + +int main (void) +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_msg_init_ffn_external_storage); + return UNITY_END (); +}