diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 44579922b..389322fda 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -3,7 +3,6 @@ #include "sample.h" #include "send_buffer.h" #include -#include using namespace lsl; @@ -23,30 +22,36 @@ consumer_queue::~consumer_queue() { } void consumer_queue::push_sample(const sample_p &sample) { - while (!buffer_.push(sample)) { - sample_p dummy; - buffer_.pop(dummy); - } + { + std::lock_guard lk(mut_); + while (!buffer_.push(sample)) { + // buffer full, drop oldest sample + // (during this operation the producer becomes a second consumer, i.e., a case + // where the underlying spsc queue isn't thread-safe) + buffer_.pop(); + } + } + cv_.notify_one(); } sample_p consumer_queue::pop_sample(double timeout) { - sample_p result; - if (timeout <= 0.0) { - buffer_.pop(result); - } else { - if (!buffer_.pop(result)) { - // turn timeout into the point in time at which we give up - timeout += lsl::lsl_clock(); - do { - if (lsl::lsl_clock() >= timeout) break; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } while (!buffer_.pop(result)); - } - } - return result; + sample_p result; + if (timeout <= 0.0) { + std::lock_guard lk(mut_); + buffer_.pop(result); + } else { + std::unique_lock lk(mut_); + if (!buffer_.pop(result)) { + // release lock, wait for a new sample until the thread calling push_sample delivers one, or until timeout + std::chrono::duration sec(timeout); + cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); + } + } + return result; } uint32_t consumer_queue::flush() noexcept { + std::lock_guard lk(mut_); uint32_t n = 0; while (buffer_.pop()) n++; return n; diff --git a/src/consumer_queue.h b/src/consumer_queue.h index 8a47f21bd..8bb623ede 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -4,6 +4,8 @@ #include "common.h" #include "forward.h" #include +#include +#include namespace lsl { /** @@ -52,6 +54,8 @@ class consumer_queue { private: send_buffer_p registry_; // optional consumer registry buffer_type buffer_; // the sample buffer + std::mutex mut_; // mutex for cond var (also to protect queue at buffer overflow) + std::condition_variable cv_; // to allow for blocking wait by consumer }; } // namespace lsl