From 4bfec09587a756d5f3dbffaa986defdae75a3263 Mon Sep 17 00:00:00 2001 From: jfrey Date: Sat, 28 Mar 2020 15:54:15 +0100 Subject: [PATCH 1/6] improve efficiency while waiting for new samples --- src/consumer_queue.cpp | 13 ++++++------- src/consumer_queue.h | 5 +++++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 6789828ee..66613c49b 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -2,7 +2,7 @@ #include "common.h" #include "sample.h" #include "send_buffer.h" -#include +#include using namespace lsl; @@ -26,6 +26,7 @@ void consumer_queue::push_sample(const sample_p &sample) { sample_p dummy; buffer_.pop(dummy); } + cv_.notify_one(); } sample_p consumer_queue::pop_sample(double timeout) { @@ -34,12 +35,10 @@ sample_p consumer_queue::pop_sample(double timeout) { buffer_.pop(result); } else { if (!buffer_.pop(result)) { - // turn timeout into the point in time at which we give up - timeout += lsl::lsl_clock(); - do { - if (lsl::lsl_clock() >= timeout) break; - lslboost::this_thread::sleep_for(lslboost::chrono::milliseconds(1)); - } while (!buffer_.pop(result)); + // wait untill for a new sample until the thread calling push_sample delivers one, or until timeout + std::unique_lock lk(lock_); + std::chrono::duration sec(timeout); + cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); } } return result; diff --git a/src/consumer_queue.h b/src/consumer_queue.h index ac0f24cd1..49b364877 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -4,6 +4,8 @@ #include "common.h" #include "forward.h" #include +#include +#include namespace lsl { /** @@ -43,6 +45,9 @@ class consumer_queue : private lslboost::noncopyable { private: send_buffer_p registry_; // optional consumer registry buffer_type buffer_; // the sample buffer + // used to wait for new samples + std::mutex lock_; + std::condition_variable cv_; }; } // namespace lsl From 1ed03886fe87840ae8b36dd30ee44f89d61678b7 Mon Sep 17 00:00:00 2001 From: jfrey Date: Sat, 28 Mar 2020 18:48:20 +0100 Subject: [PATCH 2/6] delete expired samples which memory was allocated separately --- src/sample.h | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/sample.h b/src/sample.h index df31e7808..ed6cacdf2 100644 --- a/src/sample.h +++ b/src/sample.h @@ -125,7 +125,7 @@ class sample { factory *factory_; /// the data payload begins here BOOST_ALIGNMENT(8) char data_; - + public: // === Construction === @@ -141,12 +141,10 @@ class sample { void operator delete(void *x) { // delete the underlying memory only if it wasn't allocated in the factory's storage area sample *s = (sample *)x; - if (s && !(s->factory_ && - (((char *)s) >= s->factory_->storage_ && - ((char *)s) <= s->factory_->storage_ + s->factory_->storage_size_))) + if (s && !s->is_from_factory()) delete[](char *) x; } - + /// Test for equality with another sample. bool operator==(const sample &rhs) const noexcept; @@ -361,16 +359,27 @@ class sample { ; } + /// Test if the sample wasn't allocated in the factory's storage area + bool is_from_factory(void) { + return (factory_ && (((char *)this) >= factory_->storage_ && + ((char *)this) <= factory_->storage_ + factory_->storage_size_)); + } + /// Increment ref count. friend void intrusive_ptr_add_ref(sample *s) { s->refcount_.fetch_add(1, std::memory_order_relaxed); } - /// Decrement ref count and reclaim if unreferenced. + /// Decrement ref count, reclaim if unreferenced and belong to factory's storage, delete otherwise to avoid memory leaks friend void intrusive_ptr_release(sample *s) { if (s->refcount_.fetch_sub(1, std::memory_order_release) == 1) { std::atomic_thread_fence(std::memory_order_acquire); - s->factory_->reclaim_sample(s); + if (s->is_from_factory()) { + s->factory_->reclaim_sample(s); + } + else { + delete s; + } } } }; From 7a4b710ae96d34495119ab34d66af998138a7f45 Mon Sep 17 00:00:00 2001 From: jfrey Date: Sat, 28 Mar 2020 19:01:01 +0100 Subject: [PATCH 3/6] prevent thread race condition with consumer's buffer --- src/consumer_queue.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 66613c49b..499ea2c9b 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -22,7 +22,10 @@ consumer_queue::~consumer_queue() { } void consumer_queue::push_sample(const sample_p &sample) { + // if the buffer is full, drop oldest samples while (!buffer_.push(sample)) { + // lock before freeing buffer ta avoid a thread race in pop_sample + std::unique_lock lk(lock_); sample_p dummy; buffer_.pop(dummy); } @@ -32,11 +35,12 @@ void consumer_queue::push_sample(const sample_p &sample) { sample_p consumer_queue::pop_sample(double timeout) { sample_p result; if (timeout <= 0.0) { + std::unique_lock lk(lock_); buffer_.pop(result); } else { + std::unique_lock lk(lock_); if (!buffer_.pop(result)) { - // wait untill for a new sample until the thread calling push_sample delivers one, or until timeout - std::unique_lock lk(lock_); + // release lock, wait untill for a new sample until the thread calling push_sample delivers one, or until timeout std::chrono::duration sec(timeout); cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); } From 4ad3896baa6f430629cd764dfd079479d47193b0 Mon Sep 17 00:00:00 2001 From: jfrey Date: Mon, 6 Apr 2020 21:02:14 +0200 Subject: [PATCH 4/6] consumer_queue: acquire lock in push_sample() for more predictable behavior --- src/consumer_queue.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index d70cffa02..712a405f8 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -22,6 +22,8 @@ consumer_queue::~consumer_queue() { } void consumer_queue::push_sample(const sample_p &sample) { + // acquire lock for more predictable behavior in regards to pop_sample() + std::unique_lock lk(lock_); while (!buffer_.push(sample)) { sample_p dummy; buffer_.pop(dummy); From 0102bd08ae8890cbadae7f7f46757029857a0ad3 Mon Sep 17 00:00:00 2001 From: jfrey Date: Mon, 6 Apr 2020 21:08:15 +0200 Subject: [PATCH 5/6] consumer_queue: avoid another possible race condition --- src/consumer_queue.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 4223ff75d..65fa9d7c1 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -49,6 +49,7 @@ sample_p consumer_queue::pop_sample(double timeout) { } uint32_t consumer_queue::flush() noexcept { + std::unique_lock lk(lock_); uint32_t n = 0; while (buffer_.pop()) n++; return n; From 926d2bab96a8d997a3822327721857ddffe926fa Mon Sep 17 00:00:00 2001 From: jfrey Date: Fri, 30 Oct 2020 09:46:49 +0100 Subject: [PATCH 6/6] fix variable name --- src/consumer_queue.cpp | 6 +++--- src/consumer_queue.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 712a405f8..0201be79b 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -23,7 +23,7 @@ consumer_queue::~consumer_queue() { void consumer_queue::push_sample(const sample_p &sample) { // acquire lock for more predictable behavior in regards to pop_sample() - std::unique_lock lk(lock_); + std::unique_lock lk(mut_); while (!buffer_.push(sample)) { sample_p dummy; buffer_.pop(dummy); @@ -37,8 +37,8 @@ sample_p consumer_queue::pop_sample(double timeout) { buffer_.pop(result); } else { if (!buffer_.pop(result)) { - // wait untill for a new sample until the thread calling push_sample delivers one, or until timeout - std::unique_lock lk(lock_); + // wait for a new sample until the thread calling push_sample delivers one, or until timeout + std::unique_lock lk(mut_); std::chrono::duration sec(timeout); cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); } diff --git a/src/consumer_queue.h b/src/consumer_queue.h index 545a6e870..6c5c2ea21 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -55,7 +55,7 @@ class consumer_queue { send_buffer_p registry_; // optional consumer registry buffer_type buffer_; // the sample buffer // used to wait for new samples - std::mutex lock_; + std::mutex mut_; std::condition_variable cv_; };