Skip to content

[Post-1.14] Replace boost::spsc_queue by a fast SPMC queue #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ extern "C" {
"Please do not compile this with a lslboost version older than 1.45 because the library would otherwise not be protocol-compatible with builds using other versions."
#endif

// size of a cache line
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block should be in consumer_queue.h

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah well there's one thing -- we'll need that definition in sample.h too for the same reason (the 2 atomics in the factory there are prone to false sharing).

#if defined(__s390__) || defined(__s390x__)
#define CACHELINE_BYTES 256
#elif defined(powerpc) || defined(__powerpc__) || defined(__ppc__)
#define CACHELINE_BYTES 128
#else
#define CACHELINE_BYTES 64
#endif

// force-inline the given function, if possible
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC Boost has macros for both forceinline and likely/unlikely but with more supported compilers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point - will use those. For likely/unlikely, I'd expect us to sprinkle that in along the "hot" path here and there over time. There's a bit of an issue with the boost definitions, which differ from how e.g. the Linux kernel or UE4 define them, in that they don't include the double-not. That causes them to be prone to accidentally wrong usage (e.g., if I were to replace if (myptr) by if (BOOST_LIKELY(myptr)), that would expand to if (__builtin_expect(myptr, 1)) -- i.e., it assumes that the pointer is 1, rather than that the expression evaluates to true in a boolean context (causing it to fail to optimize, and in some cases might even pessimize the code path). We could try to add that to our mental checklist when reviewing code that uses that, but it feels more prudent to write it into the macro since otherwise it'd put extra burden on the devs/reviewers. One might even argue that the boost versions are misleading names for something that would be better described as BOOST_LIKELYONE and BOOST_LIKELYZERO.

But to gain the extra compilers supported by the boost versions while keeping the ease of use we could define it here as

#define LIKELY(x) BOOST_LIKELY(!!(x))
#define UNLIKELY(x) BOOST_UNLIKELY(!!(x))

For the forceinline, do you prefer using the boost version in the code or should we alias that here similarly to a shorter macro name, as in

#define FORCEINLINE BOOST_FORCEINLINE

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the boost source code; BOOST_LIKELY is defined for mostly GCC/Clang and the Intel and IBM compilers. MSVC waits for C++20's [[likely]] attribute and wants assumes the condition in if() to be true.
So I'm for #define LIKELY(x) BOOST_LIKELY(!!(x)) and using that everywhere. There's just one UNLIKELY that'd need to be changed in the code below. As for BOOST_FORCEINLINE, I'd just use that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, will do that

#if defined(__clang__) || defined(__GNUC__)
#define FORCEINLINE __attribute__((always_inline))
#elif defined _MSC_VER
#define FORCEINLINE __forceinline
#else
#define FORCEINLINE inline
#endif

// compiler hint that the given expression is likely or unlikely
// (e.g., in conditional statements)
#if defined(__clang__) || defined(__GNUC__)
#define LIKELY(x) __builtin_expect(!!(x), 1)
#define UNLIKELY(x) __builtin_expect(!!(x), 0)
#else
#define LIKELY(x) (x)
#define UNLIKELY(x) (x)
#endif

// the highest supported protocol version
// * 100 is the original version, supported by library versions 1.00+
// * 110 is an alternative protocol that improves throughput, supported by library versions 1.10+
Expand Down
65 changes: 29 additions & 36 deletions src/consumer_queue.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
#include "consumer_queue.h"
#include "common.h"
#include "sample.h"
#include "send_buffer.h"
#include <chrono>

using namespace lsl;

consumer_queue::consumer_queue(std::size_t max_capacity, send_buffer_p registry)
: registry_(registry), buffer_(max_capacity) {
consumer_queue::consumer_queue(std::size_t size, send_buffer_p registry)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::uint_fast32_t would let the compiler choose the best int size that's at least 4 bytes wide

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting, didn't know about that one! On that, I've actually been wondering if we want to keep limiting our buffers to 32-bit sizes in the long run (e.g., if I declare a stream with a 6 MHz sampling rate, and if someone connects to that with a default 360 s max_buflen (e.g. LabRecorder), that will crash liblsl currently with no error message since max_buflen (being an int) will wrap around to a negative value that then blows up later). That happened to me while testing jfrey's use case. It made me think that we might want to, in the very long run, replace some things that are currently int by size_t, and one could slowly but surely build up towards without breaking anything by starting with some low-level containers. The external API wouldn't be affected by that since the size value there is before it's multiplied by the potentially huge sampling rate.

Just something to think about, especially now that 6 MHz is actually achievable with a sufficiently speedy PC, and also considering that some specialized/professional use cases may find the ability to have such large buffers practical.

Copy link
Collaborator

@tstenner tstenner Nov 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sample is at least 48 bytes on x64 so that'd require at least 192GB RAM per queue. Not totally impossible, but I guess not the typical LSL users.

: registry_(registry),
buffer_(new item_t[size]),
size_(size),
// largest integer at which we can wrap correctly
wrap_at_(std::numeric_limits<std::size_t>::max() - size - std::numeric_limits<std::size_t>::max() % size)
{
assert(size_ > 1);
for (std::size_t i=0; i<size_; ++i)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to store() everything in the constructor already could we use standard initialization (: write_idx_(0), read_idx_(0)…)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before I looked it up, I was actually under the impression that write_idx_(0) would use the default memory order of std::atomic when it's used like a plain value, which is memory_order_seq_cst, and that's quite a bit stronger than what this data structure needs (so I'd worry a bit about some compiler pessimizations, e.g., additional memory fences). However, it turns out that that type of initialization is apparently non-atomic, so that'd mean that the explicit store is actually necessary (since another thread can later acquire the value).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about an std::atomic_thread_fence(std::memory_order_release); at the end of the constructor?

buffer_[i].seq_state.store(i, std::memory_order_release);
write_idx_.store(0, std::memory_order_release);
read_idx_.store(0, std::memory_order_release);
done_sync_.store(false, std::memory_order_release);
if (registry_) registry_->register_consumer(this);
}

Expand All @@ -16,44 +26,27 @@ consumer_queue::~consumer_queue() {
if (registry_) registry_->unregister_consumer(this);
} catch (std::exception &e) {
LOG_F(ERROR,
"Unexpected error while trying to unregister a consumer queue from its registry: %s",
e.what());
"Unexpected error while trying to unregister a consumer queue from its registry: %s",
e.what());
}
}

void consumer_queue::push_sample(const sample_p &sample) {
// push a sample, dropping the oldest sample if the queue ist already full.
// During this operation the producer becomes a second consumer, i.e., a case
// where the underlying spsc queue isn't thread-safe) so the mutex is locked.
std::lock_guard<std::mutex> lk(mut_);
while (!buffer_.push(sample)) {
buffer_.pop();
}
cv_.notify_one();
}

sample_p consumer_queue::pop_sample(double timeout) {
sample_p result;
if (timeout <= 0.0) {
std::lock_guard<std::mutex> lk(mut_);
buffer_.pop(result);
} else {
std::unique_lock<std::mutex> lk(mut_);
if (!buffer_.pop(result)) {
// wait for a new sample until the thread calling push_sample delivers one and sends a
// notification, or until timeout
std::chrono::duration<double> sec(timeout);
cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); });
}
}
return result;
delete[] buffer_;
}

uint32_t consumer_queue::flush() noexcept {
std::lock_guard<std::mutex> lk(mut_);
uint32_t n = 0;
while (buffer_.pop()) n++;
while (try_pop()) n++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick idea without having thought it through: can we make something like auto diff=write_idx_-read_idx_; read_idx_+=diff; return diff; work in a multithreaded context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be pretty nice, I'll look into that. The implementation would probably come down to something similar to the range/iterator versions of push/pop like those that boost.lockfree has.

return n;
}

bool consumer_queue::empty() { return buffer_.empty(); }
std::size_t consumer_queue::read_available() const {
std::size_t write_index = write_idx_.load(std::memory_order_acquire);
std::size_t read_index = read_idx_.load(std::memory_order_relaxed);
if (write_index >= read_index)
return write_index - read_index;
const std::size_t ret = write_index + size_ - read_index;
return ret;
}

bool consumer_queue::empty() const {
return write_idx_.load(std::memory_order_acquire) == read_idx_.load(std::memory_order_relaxed);
}
157 changes: 132 additions & 25 deletions src/consumer_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,167 @@
#define CONSUMER_QUEUE_H

#include "common.h"
#include "forward.h"
#include <boost/lockfree/spsc_queue.hpp>
#include <condition_variable>
#include "sample.h"
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <thread>

namespace lsl {
/**
* A thread-safe producer-consumer queue of unread samples.
* A thread-safe producer/consumer queue of unread samples.
*
* Erases the oldest samples if max capacity is exceeded. Implemented as a circular buffer.
* Erases the oldest samples if max capacity is exceeded. Implemented as a ring buffer (wait-free
* unless the buffer is full or empty).
*/
class consumer_queue {
using buffer_type = lslboost::lockfree::spsc_queue<sample_p>;

public:
/**
* Create a new queue with a given capacity.
* @param max_capacity The maximum number of samples that can be held by the queue. Beyond that,
* @param size The maximum number of samples that can be held by the queue. Beyond that,
* the oldest samples are dropped.
* @param registry Optionally a pointer to a registration facility, for multiple-reader
* arrangements.
*/
consumer_queue(std::size_t max_capacity, send_buffer_p registry = send_buffer_p());
explicit consumer_queue(std::size_t size, send_buffer_p registry = send_buffer_p());

/// Destructor. Unregisters from the send buffer, if any.
~consumer_queue();

/// Push a new sample onto the queue.
void push_sample(const sample_p &sample);
/**
* Push a new sample onto the queue. Can only be called by one thread (single-producer).
* This deletes the oldest sample if the max capacity is exceeded.
*/
template<class T>
void push_sample(T&& sample) {
while (!try_push(std::forward<T>(sample))) {
// buffer full, drop oldest sample
if (!done_sync_.load(std::memory_order_acquire)) {
// synchronizes-with store to done_sync_ in ctor
std::atomic_thread_fence(std::memory_order_acquire);
done_sync_.store(true, std::memory_order_release);
}
try_pop();
}
{
// ensure that notify_one doesn't happen in between try_pop and wait_for
std::lock_guard<std::mutex> lk(mut_);
cv_.notify_one();
}
}

/**
* Pop a sample from the queue.
* Blocks if empty.
* @param timeout Timeout for the blocking, in seconds. If expired, an empty sample is returned.
* Pop a sample from the queue. Can be called by multiple threads (multi-consumer).
* Blocks if empty and if a nonzero timeout is used.
* @param timeout Timeout for the blocking, in seconds. If expired, an empty sample is returned.
*/
sample_p pop_sample(double timeout = FOREVER);
sample_p pop_sample(double timeout = FOREVER) {
sample_p result;
bool success = try_pop(result);
if (!success && timeout > 0.0) {
// only acquire mutex if we have to do a blocking wait with timeout
std::chrono::duration<double> sec(timeout);
std::unique_lock<std::mutex> lk(mut_);
if (!try_pop(result))
cv_.wait_for(lk, sec, [&]{ return this->try_pop(result); });
}
return result;
}

/// Number of available samples
std::size_t read_available() const { return buffer_.read_available(); }
/// Number of available samples. This is approximate unless called by the thread calling the
/// pop_sample().
std::size_t read_available() const;

/// Flush the queue, return the number of dropped samples
/// Flush the queue, return the number of dropped samples.
uint32_t flush() noexcept;

/// Check whether the buffer is empty.
bool empty();
/// Check whether the buffer is empty. This is approximate unless called by the thread calling
/// the pop_sample().
bool empty() const;

consumer_queue(const consumer_queue&) = delete;
consumer_queue(consumer_queue&&) = delete;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving a queue could come in useful as long as we make sure nobody's 1) using the moved-from queue 2) there's no other thread accessing the queue during the move

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we could add those ops (I think setting the read/write indices would need the same kinds of store ops as the constructor has). Need to make sure that the condition variable is movable though (might not be).

consumer_queue& operator=(const consumer_queue&) = delete;

consumer_queue& operator= (consumer_queue&&) = delete;
private:
send_buffer_p registry_; // optional consumer registry
buffer_type buffer_; // the sample buffer
std::mutex mut_; // mutex for cond var (also to protect queue at buffer overflow)
std::condition_variable cv_; // to allow for blocking wait by consumer
// an item stored in the queue
struct item_t {
std::atomic<std::size_t> seq_state;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked this though in detail, but I'll hopefully get to it after the 1.14 release.

Copy link
Contributor Author

@chkothe chkothe Nov 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I also hope to find some time to contrib some unit tests that stress concurrency a bit more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already have some benchmarks in the unit tests that stress the queue a fair bit, but more are always welcome.

sample_p value;
};

// Push a new element to the queue.
// Returns true if successful or false if queue full.
template <class T>
bool try_push(T&& sample) {
std::size_t write_index = write_idx_.load(std::memory_order_acquire);
std::size_t next_idx = add_wrap(write_index, 1);
item_t &item = buffer_[write_index % size_];
if (UNLIKELY(write_index != item.seq_state.load(std::memory_order_acquire)))
return false; // item currently occupied, queue full
write_idx_.store(next_idx, std::memory_order_release);
copy_or_move(item.value, std::forward<T>(sample));
item.seq_state.store(next_idx, std::memory_order_release);
return true;
}

// Pop an element from the queue (can be called with zero or one argument). Returns true if
// successful or false if queue is empty. Uses the same method as Vyukov's bounded MPMC queue.
template <class ... T>
bool try_pop(T&... result) {
item_t* item;
std::size_t read_index = read_idx_.load(std::memory_order_relaxed);
for (;;) {
item = &buffer_[read_index % size_];
const std::size_t seq_state = item->seq_state.load(std::memory_order_acquire);
const std::size_t next_idx = add_wrap(read_index, 1);
// check if the item is ok to pop
if (LIKELY(seq_state == next_idx)) {
// yes, try to claim slot using CAS
if (LIKELY(read_idx_.compare_exchange_weak(read_index, next_idx, std::memory_order_relaxed)))
break;
} else if (LIKELY(seq_state == read_index))
return false; // queue empty
else
// we're behind or ahead of another pop, try again
read_index = read_idx_.load(std::memory_order_relaxed);
}
move_or_drop(item->value, result...);
// mark item as free for next pass
item->seq_state.store(add_wrap(read_index, size_), std::memory_order_release);
return true;
}

// helper to either copy or move a value, depending on whether it's an rvalue ref
inline static void copy_or_move(sample_p& dst, const sample_p& src) { dst = src; }
inline static void copy_or_move(sample_p& dst, sample_p&& src) { dst = std::move(src); }
// helper to either move or drop a value, depending on whether a dst argument is given
inline static void move_or_drop(sample_p &src) { src.~sample_p(); }
inline static void move_or_drop(sample_p &src, sample_p &dst) { dst = std::move(src); }
// helper to add a delta to the given index and wrap correctly
FORCEINLINE std::size_t add_wrap(std::size_t x, std::size_t delta) const {
const std::size_t xp = x + delta;
return xp >= wrap_at_ ? xp - wrap_at_ : xp;
}

/// the sample buffer
item_t *buffer_;
/// max number of elements in the queue
const std::size_t size_;
/// threshold at which to wrap read/write indices
const std::size_t wrap_at_;
// whether we have performed a sync on the data stored by the constructor
alignas(CACHELINE_BYTES) std::atomic<bool> done_sync_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the aligned members to consecutive? IIUC these three members will have at least 56 empty padding bytes between them, so could we reorder the members to put the space to use?

/// current write position
alignas(CACHELINE_BYTES) std::atomic<std::size_t> write_idx_;
/// current read position
alignas(CACHELINE_BYTES) std::atomic<std::size_t> read_idx_;
/// for use with the condition variable
std::mutex mut_;
/// condition for waiting with timeout
std::condition_variable cv_;
/// optional consumer registry
send_buffer_p registry_;
};

} // namespace lsl
Expand Down