diff --git a/src/common.h b/src/common.h index 9cf595ee4..2dd46452d 100644 --- a/src/common.h +++ b/src/common.h @@ -31,6 +31,34 @@ extern "C" { "Please do not compile this with a lslboost version older than 1.45 because the library would otherwise not be protocol-compatible with builds using other versions." #endif +// size of a cache line +#if defined(__s390__) || defined(__s390x__) +#define CACHELINE_BYTES 256 +#elif defined(powerpc) || defined(__powerpc__) || defined(__ppc__) +#define CACHELINE_BYTES 128 +#else +#define CACHELINE_BYTES 64 +#endif + +// force-inline the given function, if possible +#if defined(__clang__) || defined(__GNUC__) +#define FORCEINLINE __attribute__((always_inline)) +#elif defined _MSC_VER +#define FORCEINLINE __forceinline +#else +#define FORCEINLINE inline +#endif + +// compiler hint that the given expression is likely or unlikely +// (e.g., in conditional statements) +#if defined(__clang__) || defined(__GNUC__) +#define LIKELY(x) __builtin_expect(!!(x), 1) +#define UNLIKELY(x) __builtin_expect(!!(x), 0) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif + // the highest supported protocol version // * 100 is the original version, supported by library versions 1.00+ // * 110 is an alternative protocol that improves throughput, supported by library versions 1.10+ diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 7a7dc6a21..1c158448f 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -1,13 +1,23 @@ #include "consumer_queue.h" #include "common.h" -#include "sample.h" #include "send_buffer.h" #include using namespace lsl; -consumer_queue::consumer_queue(std::size_t max_capacity, send_buffer_p registry) - : registry_(registry), buffer_(max_capacity) { +consumer_queue::consumer_queue(std::size_t size, send_buffer_p registry) + : registry_(registry), + buffer_(new item_t[size]), + size_(size), + // largest integer at which we can wrap correctly + wrap_at_(std::numeric_limits::max() - size - std::numeric_limits::max() % size) +{ + assert(size_ > 1); + for (std::size_t i=0; iregister_consumer(this); } @@ -16,44 +26,27 @@ consumer_queue::~consumer_queue() { if (registry_) registry_->unregister_consumer(this); } catch (std::exception &e) { LOG_F(ERROR, - "Unexpected error while trying to unregister a consumer queue from its registry: %s", - e.what()); + "Unexpected error while trying to unregister a consumer queue from its registry: %s", + e.what()); } -} - -void consumer_queue::push_sample(const sample_p &sample) { - // push a sample, dropping the oldest sample if the queue ist already full. - // During this operation the producer becomes a second consumer, i.e., a case - // where the underlying spsc queue isn't thread-safe) so the mutex is locked. - std::lock_guard lk(mut_); - while (!buffer_.push(sample)) { - buffer_.pop(); - } - cv_.notify_one(); -} - -sample_p consumer_queue::pop_sample(double timeout) { - sample_p result; - if (timeout <= 0.0) { - std::lock_guard lk(mut_); - buffer_.pop(result); - } else { - std::unique_lock lk(mut_); - if (!buffer_.pop(result)) { - // wait for a new sample until the thread calling push_sample delivers one and sends a - // notification, or until timeout - std::chrono::duration sec(timeout); - cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); - } - } - return result; + delete[] buffer_; } uint32_t consumer_queue::flush() noexcept { - std::lock_guard lk(mut_); uint32_t n = 0; - while (buffer_.pop()) n++; + while (try_pop()) n++; return n; } -bool consumer_queue::empty() { return buffer_.empty(); } +std::size_t consumer_queue::read_available() const { + std::size_t write_index = write_idx_.load(std::memory_order_acquire); + std::size_t read_index = read_idx_.load(std::memory_order_relaxed); + if (write_index >= read_index) + return write_index - read_index; + const std::size_t ret = write_index + size_ - read_index; + return ret; +} + +bool consumer_queue::empty() const { + return write_idx_.load(std::memory_order_acquire) == read_idx_.load(std::memory_order_relaxed); +} diff --git a/src/consumer_queue.h b/src/consumer_queue.h index a84b26b2f..d7b7a121c 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -2,60 +2,167 @@ #define CONSUMER_QUEUE_H #include "common.h" -#include "forward.h" -#include -#include +#include "sample.h" +#include #include +#include +#include namespace lsl { /** - * A thread-safe producer-consumer queue of unread samples. + * A thread-safe producer/consumer queue of unread samples. * - * Erases the oldest samples if max capacity is exceeded. Implemented as a circular buffer. + * Erases the oldest samples if max capacity is exceeded. Implemented as a ring buffer (wait-free + * unless the buffer is full or empty). */ class consumer_queue { - using buffer_type = lslboost::lockfree::spsc_queue; - public: /** * Create a new queue with a given capacity. - * @param max_capacity The maximum number of samples that can be held by the queue. Beyond that, + * @param size The maximum number of samples that can be held by the queue. Beyond that, * the oldest samples are dropped. * @param registry Optionally a pointer to a registration facility, for multiple-reader * arrangements. */ - consumer_queue(std::size_t max_capacity, send_buffer_p registry = send_buffer_p()); + explicit consumer_queue(std::size_t size, send_buffer_p registry = send_buffer_p()); /// Destructor. Unregisters from the send buffer, if any. ~consumer_queue(); - /// Push a new sample onto the queue. - void push_sample(const sample_p &sample); + /** + * Push a new sample onto the queue. Can only be called by one thread (single-producer). + * This deletes the oldest sample if the max capacity is exceeded. + */ + template + void push_sample(T&& sample) { + while (!try_push(std::forward(sample))) { + // buffer full, drop oldest sample + if (!done_sync_.load(std::memory_order_acquire)) { + // synchronizes-with store to done_sync_ in ctor + std::atomic_thread_fence(std::memory_order_acquire); + done_sync_.store(true, std::memory_order_release); + } + try_pop(); + } + { + // ensure that notify_one doesn't happen in between try_pop and wait_for + std::lock_guard lk(mut_); + cv_.notify_one(); + } + } /** - * Pop a sample from the queue. - * Blocks if empty. - * @param timeout Timeout for the blocking, in seconds. If expired, an empty sample is returned. + * Pop a sample from the queue. Can be called by multiple threads (multi-consumer). + * Blocks if empty and if a nonzero timeout is used. + * @param timeout Timeout for the blocking, in seconds. If expired, an empty sample is returned. */ - sample_p pop_sample(double timeout = FOREVER); + sample_p pop_sample(double timeout = FOREVER) { + sample_p result; + bool success = try_pop(result); + if (!success && timeout > 0.0) { + // only acquire mutex if we have to do a blocking wait with timeout + std::chrono::duration sec(timeout); + std::unique_lock lk(mut_); + if (!try_pop(result)) + cv_.wait_for(lk, sec, [&]{ return this->try_pop(result); }); + } + return result; + } - /// Number of available samples - std::size_t read_available() const { return buffer_.read_available(); } + /// Number of available samples. This is approximate unless called by the thread calling the + /// pop_sample(). + std::size_t read_available() const; - /// Flush the queue, return the number of dropped samples + /// Flush the queue, return the number of dropped samples. uint32_t flush() noexcept; - /// Check whether the buffer is empty. - bool empty(); + /// Check whether the buffer is empty. This is approximate unless called by the thread calling + /// the pop_sample(). + bool empty() const; consumer_queue(const consumer_queue&) = delete; + consumer_queue(consumer_queue&&) = delete; consumer_queue& operator=(const consumer_queue&) = delete; - + consumer_queue& operator= (consumer_queue&&) = delete; private: - send_buffer_p registry_; // optional consumer registry - buffer_type buffer_; // the sample buffer - std::mutex mut_; // mutex for cond var (also to protect queue at buffer overflow) - std::condition_variable cv_; // to allow for blocking wait by consumer + // an item stored in the queue + struct item_t { + std::atomic seq_state; + sample_p value; + }; + + // Push a new element to the queue. + // Returns true if successful or false if queue full. + template + bool try_push(T&& sample) { + std::size_t write_index = write_idx_.load(std::memory_order_acquire); + std::size_t next_idx = add_wrap(write_index, 1); + item_t &item = buffer_[write_index % size_]; + if (UNLIKELY(write_index != item.seq_state.load(std::memory_order_acquire))) + return false; // item currently occupied, queue full + write_idx_.store(next_idx, std::memory_order_release); + copy_or_move(item.value, std::forward(sample)); + item.seq_state.store(next_idx, std::memory_order_release); + return true; + } + + // Pop an element from the queue (can be called with zero or one argument). Returns true if + // successful or false if queue is empty. Uses the same method as Vyukov's bounded MPMC queue. + template + bool try_pop(T&... result) { + item_t* item; + std::size_t read_index = read_idx_.load(std::memory_order_relaxed); + for (;;) { + item = &buffer_[read_index % size_]; + const std::size_t seq_state = item->seq_state.load(std::memory_order_acquire); + const std::size_t next_idx = add_wrap(read_index, 1); + // check if the item is ok to pop + if (LIKELY(seq_state == next_idx)) { + // yes, try to claim slot using CAS + if (LIKELY(read_idx_.compare_exchange_weak(read_index, next_idx, std::memory_order_relaxed))) + break; + } else if (LIKELY(seq_state == read_index)) + return false; // queue empty + else + // we're behind or ahead of another pop, try again + read_index = read_idx_.load(std::memory_order_relaxed); + } + move_or_drop(item->value, result...); + // mark item as free for next pass + item->seq_state.store(add_wrap(read_index, size_), std::memory_order_release); + return true; + } + + // helper to either copy or move a value, depending on whether it's an rvalue ref + inline static void copy_or_move(sample_p& dst, const sample_p& src) { dst = src; } + inline static void copy_or_move(sample_p& dst, sample_p&& src) { dst = std::move(src); } + // helper to either move or drop a value, depending on whether a dst argument is given + inline static void move_or_drop(sample_p &src) { src.~sample_p(); } + inline static void move_or_drop(sample_p &src, sample_p &dst) { dst = std::move(src); } + // helper to add a delta to the given index and wrap correctly + FORCEINLINE std::size_t add_wrap(std::size_t x, std::size_t delta) const { + const std::size_t xp = x + delta; + return xp >= wrap_at_ ? xp - wrap_at_ : xp; + } + + /// the sample buffer + item_t *buffer_; + /// max number of elements in the queue + const std::size_t size_; + /// threshold at which to wrap read/write indices + const std::size_t wrap_at_; + // whether we have performed a sync on the data stored by the constructor + alignas(CACHELINE_BYTES) std::atomic done_sync_; + /// current write position + alignas(CACHELINE_BYTES) std::atomic write_idx_; + /// current read position + alignas(CACHELINE_BYTES) std::atomic read_idx_; + /// for use with the condition variable + std::mutex mut_; + /// condition for waiting with timeout + std::condition_variable cv_; + /// optional consumer registry + send_buffer_p registry_; }; } // namespace lsl