diff --git a/CMakeLists.txt b/CMakeLists.txt index 45dfa4b5..8dbcc504 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,7 @@ if (NOT BOOST_COBALT_IS_ROOT) src/detail/exception.cpp src/detail/util.cpp src/channel.cpp + src/ring_buffer.cpp src/error.cpp src/main.cpp src/this_thread.cpp @@ -186,6 +187,7 @@ else() src/detail/util.cpp src/error.cpp src/channel.cpp + src/ring_buffer.cpp src/main.cpp src/this_thread.cpp src/thread.cpp) diff --git a/build/Jamfile b/build/Jamfile index 1f539aee..ec169b77 100644 --- a/build/Jamfile +++ b/build/Jamfile @@ -40,6 +40,7 @@ alias cobalt_sources channel.cpp error.cpp main.cpp + ring_buffer.cpp this_thread.cpp thread.cpp ; diff --git a/doc/index.adoc b/doc/index.adoc index 2a9dbb1c..a4f5d1e9 100644 --- a/doc/index.adoc +++ b/doc/index.adoc @@ -66,6 +66,7 @@ include::reference/concepts.adoc[] include::reference/this_coro.adoc[] include::reference/this_thread.adoc[] include::reference/channel.adoc[] +include::reference/ring_buffer.adoc[] include::reference/with.adoc[] include::reference/race.adoc[] include::reference/gather.adoc[] diff --git a/doc/reference/channel.adoc b/doc/reference/channel.adoc index 58d7d015..f5baf580 100644 --- a/doc/reference/channel.adoc +++ b/doc/reference/channel.adoc @@ -54,7 +54,7 @@ generator> merge( channel & c2) { while (c1 && c2) - co_yield co_await race(c1, c2); + co_yield co_await race(c1.read(), c2.read()); } ---- diff --git a/doc/reference/ring_buffer.adoc b/doc/reference/ring_buffer.adoc new file mode 100644 index 00000000..5e0fbba0 --- /dev/null +++ b/doc/reference/ring_buffer.adoc @@ -0,0 +1,53 @@ +[#ring_buffer] +== cobalt/ring_buffer.hpp + +Channels can be used to exchange data between different coroutines +on a single thread. + +=== Outline + +.ring_buffer outline +[example] +[source,cpp,subs=+quotes] +---- +include::../../include/boost/cobalt/ring_buffer.hpp[tag=outline] +---- + +=== Description + +Ring Buffers are a tool for two coroutines to communicate and synchronize, +which does not block the writing coroutine, but overwrites the values in tbue ffer. + +[source,cpp] +---- +const std::size_t buffer_size = 2; +ring_buffer ch{exec, buffer_size}; + +// in coroutine <1> +co_await ch.write(42); + +// in coroutine <2> +auto val = co_await ch.read(); +---- +<1> Send a value to the ring_buffer - will not block, but may prioritize coroutine +<2> Read a value from the ring_buffer - will block until a value is awaitable. + +The read operation will block if no value is available. +The write operation might suspend and resume itself later if a read is waiting. + +NOTE: A ring_buffer type can be `void`, in which case `write` takes no parameter. + +The ring_buffer read operation can be cancelled without losing data. +This makes it usable with <>. + +[source,cpp] +---- +generator> merge( + ring_buffer & c1, + ring_buffer & c2) +{ + while (c1 && c2) + co_yield co_await race(c1.read(), c2.read()); +} +---- + diff --git a/include/boost/cobalt/channel.hpp b/include/boost/cobalt/channel.hpp index 26c7f2c3..338d3f31 100644 --- a/include/boost/cobalt/channel.hpp +++ b/include/boost/cobalt/channel.hpp @@ -33,6 +33,7 @@ struct channel { // end::outline[] #if defined(BOOST_COBALT_NO_PMR) + explicit channel(std::size_t limit = 0u, executor executor = this_thread::get_executor()); #else @@ -96,7 +97,7 @@ struct channel } struct cancel_impl; - bool await_ready() { return !chn->buffer_.empty() || chn->is_closed_; } + bool await_ready() const noexcept{ return !chn->buffer_.empty() || chn->is_closed_; } template BOOST_COBALT_MSVC_NOINLINE std::coroutine_handle await_suspend(std::coroutine_handle h); @@ -140,7 +141,7 @@ struct channel struct cancel_impl; - bool await_ready() { return !chn->buffer_.full() || chn->is_closed_; } + bool await_ready() const noexcept { return !chn->buffer_.full() || chn->is_closed_; } template BOOST_COBALT_MSVC_NOINLINE std::coroutine_handle await_suspend(std::coroutine_handle h); @@ -153,6 +154,7 @@ struct channel boost::intrusive::list > read_queue_; boost::intrusive::list > write_queue_; public: + BOOST_COBALT_MSVC_NOINLINE read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; } BOOST_COBALT_MSVC_NOINLINE @@ -179,7 +181,7 @@ struct channel BOOST_COBALT_MSVC_NOINLINE write_op write( T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) { - return write_op{{}, this, &value, loc}; + return write_op{{}, this, &static_cast(value), loc}; } /* // tag::outline[] @@ -257,7 +259,7 @@ struct channel } struct cancel_impl; - bool await_ready() + bool await_ready() const noexcept { return (chn->n_ > 0) || chn->is_closed_; } @@ -298,7 +300,7 @@ struct channel } struct cancel_impl; - bool await_ready() + bool await_ready() const noexcept { return chn->n_ < chn->limit_ || chn->is_closed_; } diff --git a/include/boost/cobalt/detail/fork.hpp b/include/boost/cobalt/detail/fork.hpp index 58fef621..000c825c 100644 --- a/include/boost/cobalt/detail/fork.hpp +++ b/include/boost/cobalt/detail/fork.hpp @@ -139,7 +139,7 @@ struct fork struct final_awaitable { promise_type * self; - bool await_ready() noexcept + bool await_ready() const noexcept { return self->state->use_count != 1u; } diff --git a/include/boost/cobalt/detail/with.hpp b/include/boost/cobalt/detail/with.hpp index 433f34f0..70875559 100644 --- a/include/boost/cobalt/detail/with.hpp +++ b/include/boost/cobalt/detail/with.hpp @@ -18,7 +18,7 @@ struct [[nodiscard]] with_impl { struct promise_type; - bool await_ready() { return false;} + bool await_ready() const { return false;} template BOOST_COBALT_MSVC_NOINLINE diff --git a/include/boost/cobalt/impl/ring_buffer.hpp b/include/boost/cobalt/impl/ring_buffer.hpp new file mode 100644 index 00000000..df42d17b --- /dev/null +++ b/include/boost/cobalt/impl/ring_buffer.hpp @@ -0,0 +1,295 @@ +// +// Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_COBALT_IMPL_RING_BUFFER_HPP +#define BOOST_COBALT_IMPL_RING_BUFFER_HPP + +#include +#include + +#include + +namespace boost::cobalt +{ +#if !defined(BOOST_COBALT_NO_PMR) +template +inline ring_buffer::ring_buffer( + std::size_t limit, + executor executor, + pmr::memory_resource * resource) + : buffer_(limit, pmr::polymorphic_allocator(resource)), executor_(executor) {} +#else +template +inline ring_buffer::ring_buffer( + std::size_t limit, + executor executor) + : buffer_(limit), executor_(executor) {} +#endif + +template +auto ring_buffer::get_executor() -> const executor_type & {return executor_;} + +template +bool ring_buffer::is_open() const {return !is_closed_;} + +template +ring_buffer::~ring_buffer() +{ + while (!read_queue_.empty()) + read_queue_.front().awaited_from.reset(); +} + +template +void ring_buffer::close() +{ + is_closed_ = true; + while (!read_queue_.empty()) + { + auto & op = read_queue_.front(); + op.unlink(); + op.cancelled = true; + op.cancel_slot.clear(); + + if (op.awaited_from) + asio::post(executor_, std::move(op.awaited_from)); + } +} + +template +std::size_t ring_buffer::available() const {return buffer_.size();} + +template +struct ring_buffer::read_op::cancel_impl +{ + read_op * op; + cancel_impl(read_op * op) : op(op) {} + void operator()(asio::cancellation_type) + { + op->cancelled = true; + op->unlink(); + if (op->awaited_from) + asio::post( + op->buf->executor_, + std::move(op->awaited_from)); + op->cancel_slot.clear(); + } +}; + +template +template +std::coroutine_handle ring_buffer::read_op::await_suspend(std::coroutine_handle h) +{ + if (cancelled) + return h; // already interrupted. + + if constexpr (requires {h.promise().get_cancellation_slot();}) + if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected()) + cancel_slot.emplace(this); + + if (awaited_from) + boost::throw_exception(std::runtime_error("already-awaited"), loc); + awaited_from.reset(h.address()); + // currently nothing to read + if constexpr (requires {h.promise().begin_transaction();}) + begin_transaction = +[](void * p){std::coroutine_handle::from_address(p).promise().begin_transaction();}; + + buf->read_queue_.push_back(*this); + return std::noop_coroutine(); +} + + +template +T ring_buffer::read_op::await_resume() +{ + return await_resume(as_result_tag{}).value(loc); +} + +template +std::tuple ring_buffer::read_op::await_resume(const struct as_tuple_tag &) +{ + auto res = await_resume(as_result_tag{}); + + if (res.has_error()) + return {res.error(), T{}}; + else + return {system::error_code{}, std::move(*res)}; + +} + +template +system::result ring_buffer::read_op::await_resume(const struct as_result_tag &) +{ + if (cancel_slot.is_connected()) + cancel_slot.clear(); + + if (direct) + return std::move(*direct); + + if (buf->is_closed_ && buf->buffer_.empty()) + { + constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION}; + return {system::in_place_error, asio::error::broken_pipe, &loc}; + } + + if (cancelled) + { + constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION}; + return {system::in_place_error, asio::error::operation_aborted, &loc}; + } + + auto value = std::move(buf->buffer_.front()); + buf->buffer_.pop_front(); + return {system::in_place_value, std::move(value)}; +} + +template +template +std::coroutine_handle ring_buffer::write_op::await_suspend(std::coroutine_handle h) +{ + if (cancelled) + return h; + + if (direct = !buf->read_queue_.empty()) + { + auto & op = buf->read_queue_.front(); + + if (buf->buffer_.empty()) + { + op.direct = std::move(value); + } + else + { + op.direct = std::move(buf->buffer_.front()); + buf->buffer_.pop_front(); + buf->buffer_.push_back(std::move(value)); + } + + op.transactional_unlink(); + asio::post(buf->executor_, unique_handle::from_promise(h.promise())); + return op.awaited_from.release(); + } + + return h; +} + +template +void ring_buffer::post(T && value) +{ + if (!read_queue_.empty()) + { + auto & op = read_queue_.front(); + + if (buffer_.empty()) + { + op.direct = std::move(value); + } + else + { + op.direct = std::move(buffer_.front()); + buffer_.pop_front(); + buffer_.push_back(std::move(value)); + } + + op.transactional_unlink(); + asio::post(executor_, std::move(op.awaited_from)); + } + else if (buffer_.capacity() > 0u) + buffer_.push_back(std::move(value)); +} + + +template +std::tuple ring_buffer::write_op::await_resume(const struct as_tuple_tag &) +{ + return await_resume(as_result_tag{}).error(); +} + +template +void ring_buffer::write_op::await_resume() +{ + await_resume(as_result_tag{}).value(loc); +} + +template +system::result ring_buffer::write_op::await_resume(const struct as_result_tag &) +{ + if (buf->is_closed_) + { + constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION}; + return {system::in_place_error, asio::error::broken_pipe, &loc}; + } + + if (cancelled) + { + constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION}; + return {system::in_place_error, asio::error::operation_aborted, &loc}; + } + if (!direct && buf->buffer_.capacity() > 0u) + buf->buffer_.push_back(std::move(value)); + + return system::in_place_value; +} + +struct ring_buffer::read_op::cancel_impl +{ + read_op * op; + cancel_impl(read_op * op) : op(op) {} + void operator()(asio::cancellation_type) + { + op->cancelled = true; + op->unlink(); + if (op->awaited_from) + asio::post(op->buf->executor_, std::move(op->awaited_from)); + op->cancel_slot.clear(); + } +}; + +template +std::coroutine_handle ring_buffer::read_op::await_suspend(std::coroutine_handle h) +{ + if (cancelled) + return h; // already interrupted. + + if constexpr (requires {h.promise().get_cancellation_slot();}) + if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected()) + cancel_slot.emplace(this); + + if (awaited_from) + boost::throw_exception(std::runtime_error("already-awaited"), loc); + awaited_from.reset(h.address()); + + if constexpr (requires {h.promise().begin_transaction();}) + begin_transaction = +[](void * p){std::coroutine_handle::from_address(p).promise().begin_transaction();}; + + buf->read_queue_.push_back(*this); + return std::noop_coroutine(); +} + + +template +std::coroutine_handle ring_buffer::write_op::await_suspend(std::coroutine_handle h) +{ + if (cancelled) + return h; // already interrupted. + + if (direct = !buf->read_queue_.empty()) + { + auto & op = buf->read_queue_.front(); + + op.direct = true; + + op.transactional_unlink(); + asio::post(buf->executor_, unique_handle::from_promise(h.promise())); + return op.awaited_from.release(); + } + + return h; +} + +} + +#endif //BOOST_COBALT_IMPL_RING_BUFFER_HPP diff --git a/include/boost/cobalt/ring_buffer.hpp b/include/boost/cobalt/ring_buffer.hpp new file mode 100644 index 00000000..1b769592 --- /dev/null +++ b/include/boost/cobalt/ring_buffer.hpp @@ -0,0 +1,268 @@ +// +// Copyright (c) 2025 Klemens Morgenstern (klemens.morgenstern@gmx.net) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_COBALT_RING_BUFFER_HPP +#define BOOST_COBALT_RING_BUFFER_HPP + +#include +#include +#include + +#include +#include +#include +#include + +#include + +namespace boost::cobalt +{ + +// tag::outline[] +template +struct ring_buffer +{ + // end::outline[] +#if defined(BOOST_COBALT_NO_PMR) + explicit + ring_buffer(std::size_t limit = 0u, + executor executor = this_thread::get_executor()); +#else + // tag::outline[] + // create a ring_buffer with a buffer limit, executor & resource. + explicit + ring_buffer(std::size_t limit = 0u, + executor executor = this_thread::get_executor(), + pmr::memory_resource * resource = this_thread::get_default_resource()); + // end::outline[] +#endif + // tag::outline[] + // not movable. + ring_buffer(ring_buffer && rhs) noexcept = delete; + ring_buffer & operator=(ring_buffer && lhs) noexcept = delete; + + using executor_type = executor; + const executor_type & get_executor(); + + // Closes the ring_buffer + ~ring_buffer(); + bool is_open() const; + // close the operation, will cancel all pending ops, too + void close(); + // Check how many values are available + std::size_t available() const; + + // end::outline[] + private: +#if !defined(BOOST_COBALT_NO_PMR) + boost::circular_buffer> buffer_; +#else + boost::circular_buffer buffer_; +#endif + executor_type executor_; + bool is_closed_{false}; + + struct read_op : intrusive::list_base_hook > + { + ring_buffer * buf; + boost::source_location loc; + bool cancelled = false; + std::optional direct; + asio::cancellation_slot cancel_slot{}; + unique_handle awaited_from{nullptr}; + + void (*begin_transaction)(void*) = nullptr; + + void transactional_unlink() + { + if (begin_transaction) + begin_transaction(awaited_from.get()); + this->unlink(); + } + + void interrupt_await() + { + this->cancelled = true; + if (this->awaited_from) + this->awaited_from.release().resume(); + } + + struct cancel_impl; + bool await_ready() const { return !buf->buffer_.empty() || buf->is_closed_; } + template + BOOST_COBALT_MSVC_NOINLINE + std::coroutine_handle await_suspend(std::coroutine_handle h); + T await_resume(); + std::tuple await_resume(const struct as_tuple_tag & ); + system::result await_resume(const struct as_result_tag &); + explicit operator bool() const {return buf && buf->is_open();} + }; + + struct write_op + { + ring_buffer * buf; + T value; + boost::source_location loc; + bool cancelled = false, direct = false; + + + void interrupt_await() // if interrupted between ready & suspend + { + this->cancelled = true; + } + + bool await_ready() const { return buf->read_queue_.empty() || buf->is_closed_; } + template + BOOST_COBALT_MSVC_NOINLINE + std::coroutine_handle await_suspend(std::coroutine_handle h); + + void await_resume(); + std::tuple await_resume(const struct as_tuple_tag & ); + system::result await_resume(const struct as_result_tag &); + }; + + boost::intrusive::list > read_queue_; + public: + + BOOST_COBALT_MSVC_NOINLINE + read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; } + + BOOST_COBALT_MSVC_NOINLINE + write_op write(T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return write_op{this, std::move(value), loc}; } + + BOOST_COBALT_MSVC_NOINLINE + write_op write(const T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return write_op{this, value, loc}; } + + + void post(T && value); + + /* + // tag::outline[] + // an awaitable that yields T + using __read_op__ = __unspecified__; + + // write a value to the ring_buffer + __write_op__ write( T && value); + + // write a value to the channel if T is void + __write_op__ write(); + + // Post a value to the ring_buffer + void post(T && value); + + // Post a value if the ring_buffer is void. That is, never block. + void post(); + + // end::outline[] + */ + // tag::outline[] + +}; +// end::outline[] + + +template<> +struct ring_buffer +{ + explicit + ring_buffer(std::size_t limit = 0u, + executor executor = this_thread::get_executor()) + : limit_(limit), executor_(executor) {} + + ring_buffer(ring_buffer && rhs) noexcept = delete; + ring_buffer & operator=(ring_buffer && lhs) noexcept = delete; + + using executor_type = executor; + const executor_type & get_executor() { return executor_;} + + ~ring_buffer(); + bool is_open() const; + void close(); + std::size_t available() const noexcept { return n_;} + private: + std::size_t limit_; + std::size_t n_{0u}; + executor_type executor_; + bool is_closed_{false}; + + struct read_op : intrusive::list_base_hook > + { + ring_buffer * buf; + boost::source_location loc; + bool cancelled = false, direct=true; + asio::cancellation_slot cancel_slot{}; + unique_handle awaited_from{nullptr}; + + void (*begin_transaction)(void*) = nullptr; + + void transactional_unlink() + { + if (begin_transaction) + begin_transaction(awaited_from.get()); + this->unlink(); + } + + void interrupt_await() + { + this->cancelled = true; + if (this->awaited_from) + this->awaited_from.release().resume(); + } + + struct cancel_impl; + bool await_ready() const { return buf->n_ > 0 || buf->is_closed_; } + template + BOOST_COBALT_MSVC_NOINLINE + std::coroutine_handle await_suspend(std::coroutine_handle h); + BOOST_COBALT_DECL void await_resume(); + BOOST_COBALT_DECL std::tuple await_resume(const struct as_tuple_tag & ); + BOOST_COBALT_DECL system::result await_resume(const struct as_result_tag &); + explicit operator bool() const {return buf && buf->is_open();} + }; + + struct write_op + { + ring_buffer * buf; + boost::source_location loc; + bool cancelled = false, direct = false; + + + void interrupt_await() // if interrupted between ready & suspend + { + this->cancelled = true; + } + + bool await_ready() const { return buf->read_queue_.empty() || buf->is_closed_; } + + template + BOOST_COBALT_MSVC_NOINLINE + std::coroutine_handle await_suspend(std::coroutine_handle h); + + void await_resume(); + std::tuple await_resume(const struct as_tuple_tag & ); + system::result await_resume(const struct as_result_tag &); + }; + + boost::intrusive::list > read_queue_; + public: + + BOOST_COBALT_MSVC_NOINLINE + read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; } + + BOOST_COBALT_MSVC_NOINLINE + write_op write(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return write_op{this, loc}; } + + void post(); + +}; + +} + +#include + + +#endif //BOOST_COBALT_RING_BUFFER_HPP diff --git a/src/ring_buffer.cpp b/src/ring_buffer.cpp new file mode 100644 index 00000000..8ce4c2a5 --- /dev/null +++ b/src/ring_buffer.cpp @@ -0,0 +1,111 @@ +// +// Copyright (c) 2025 Klemens Morgenstern (klemens.morgenstern@gmx.net) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +namespace boost::cobalt +{ + +ring_buffer::~ring_buffer() +{ + while (!read_queue_.empty()) + read_queue_.front().awaited_from.reset(); +} + +void ring_buffer::close() +{ + is_closed_ = true; + while (!read_queue_.empty()) { + auto &op = read_queue_.front(); + op.unlink(); + op.cancelled = true; + op.cancel_slot.clear(); + if (op.awaited_from) + asio::defer(executor_, std::move(op.awaited_from)); + } +} + +system::result ring_buffer::read_op::await_resume(const struct as_result_tag &) +{ + if (cancel_slot.is_connected()) + cancel_slot.clear(); + if (buf->is_closed_) + { + constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION}; + return {system::in_place_error, asio::error::broken_pipe, &loc}; + } + + + if (cancelled) + { + constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION}; + return {system::in_place_error, asio::error::operation_aborted, &loc}; + } + + if (!direct) + buf->n_--; + return {system::in_place_value}; +} + + +void ring_buffer::read_op::await_resume() +{ + await_resume(as_result_tag{}).value(loc); +} + +std::tuple ring_buffer::read_op::await_resume(const struct as_tuple_tag & ) +{ + return await_resume(as_result_tag{}).error(); +} + +void ring_buffer::write_op::await_resume() +{ + await_resume(as_result_tag{}).value(loc); +} + +std::tuple ring_buffer::write_op::await_resume(const struct as_tuple_tag &) +{ + return await_resume(as_result_tag{}).error(); +} + +system::result ring_buffer::write_op::await_resume(const as_result_tag &) +{ + if (buf->is_closed_) + { + constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION}; + return {system::in_place_error, asio::error::broken_pipe, &loc}; + } + + if (cancelled) + { + constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION}; + return {system::in_place_error, asio::error::operation_aborted, &loc}; + } + + if (!direct && buf->n_ < buf->limit_) + buf->n_++; + + return system::in_place_value; +} + +void ring_buffer::post() +{ + if (!read_queue_.empty()) + { + auto & op = read_queue_.front(); + if (n_ == limit_) + op.direct = true; + op.transactional_unlink(); + asio::defer(executor_, std::move(op.awaited_from)); + } + else if (n_ < limit_) + n_++; +} + + +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index cdb7eccc..16542cef 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -12,7 +12,7 @@ add_executable(boost_cobalt_basic_tests EXCLUDE_FROM_ALL async_for.cpp test_main.cpp promise.cpp with.cpp op.cpp handler.cpp join.cpp race.cpp this_coro.cpp channel.cpp generator.cpp run.cpp task.cpp gather.cpp wait_group.cpp wrappers.cpp left_race.cpp strand.cpp fork.cpp thread.cpp any_completion_handler.cpp detached.cpp monotonic_resource.cpp sbo_resource.cpp - composition.cpp) + composition.cpp ring_buffer.cpp) target_link_libraries(boost_cobalt_main Boost::cobalt) target_link_libraries(boost_cobalt_main_compile Boost::cobalt) diff --git a/test/ring_buffer.cpp b/test/ring_buffer.cpp new file mode 100644 index 00000000..04ebb313 --- /dev/null +++ b/test/ring_buffer.cpp @@ -0,0 +1,120 @@ +// +// Copyright (c) 2025 Klemens Morgenstern (klemens.morgenstern@gmx.net) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + + +#include "test.hpp" + +namespace cobalt = boost::cobalt; + +cobalt::promise do_write(cobalt::ring_buffer &chn, std::vector & seq) +{ + seq.push_back(0); + co_await chn.write(); seq.push_back(1); + co_await chn.write(); seq.push_back(2); + (co_await cobalt::as_result(chn.write())).value(); seq.push_back(3); + co_await cobalt::as_tuple(chn.write()); seq.push_back(4); + co_await chn.write(); seq.push_back(5); + co_await boost::asio::post(cobalt::use_op); + co_await chn.write(); seq.push_back(6); + co_await chn.write(); seq.push_back(7); +} + +cobalt::promise do_read(cobalt::ring_buffer &chn, std::vector & seq) +{ + seq.push_back(10); + co_await chn.read(); seq.push_back(11); + co_await chn.read(); seq.push_back(12); + (co_await cobalt::as_result(chn.read())).value(); seq.push_back(13); + co_await cobalt::as_tuple(chn.read()); seq.push_back(14); + co_await chn.read(); seq.push_back(15); + co_await boost::asio::post(cobalt::use_op); + co_await chn.read(); seq.push_back(16); + co_await chn.read(); seq.push_back(17); +} + +BOOST_AUTO_TEST_SUITE(ring_buffer); + +CO_TEST_CASE(void_) +{ + cobalt::ring_buffer chn{2u, co_await cobalt::this_coro::executor}; + + std::vector seq; + auto r = do_read(chn, seq); + auto w = do_write(chn, seq); + + co_await r; + co_await w; + BOOST_REQUIRE(seq.size() == 16); + BOOST_CHECK_EQUAL(seq[0], 10); + BOOST_CHECK_EQUAL(seq[1], 0); + BOOST_CHECK_EQUAL(seq[2], 11); + BOOST_CHECK_EQUAL(seq[3], 1); + BOOST_CHECK_EQUAL(seq[4], 12); + BOOST_CHECK_EQUAL(seq[5], 2); + BOOST_CHECK_EQUAL(seq[6], 13); + BOOST_CHECK_EQUAL(seq[7], 3); + BOOST_CHECK_EQUAL(seq[8], 14); + BOOST_CHECK_EQUAL(seq[9], 4); + BOOST_CHECK_EQUAL(seq[10], 15); + BOOST_CHECK_EQUAL(seq[11], 5); + BOOST_CHECK_EQUAL(seq[12], 16); + BOOST_CHECK_EQUAL(seq[13], 6); + BOOST_CHECK_EQUAL(seq[14], 17); + BOOST_CHECK_EQUAL(seq[15], 7); +} + +CO_TEST_CASE(void_0) +{ + cobalt::ring_buffer chn{0u, co_await cobalt::this_coro::executor}; + + co_await chn.write(); + co_await chn.write(); + co_await chn.write(); + auto r = [](cobalt::ring_buffer & rb) -> cobalt::promise { co_await rb.read();}(chn); + + BOOST_CHECK(!r.ready()); + co_await chn.write(); + BOOST_CHECK(r.ready()); +} + +CO_TEST_CASE(int_0) +{ + cobalt::ring_buffer chn{0u, co_await cobalt::this_coro::executor}; + co_await chn.write(0); + co_await chn.write(1); + co_await chn.write(2); + auto r = [](cobalt::ring_buffer & rb) -> cobalt::promise { co_return co_await rb.read();}(chn); + + BOOST_CHECK(!r.ready()); + co_await chn.write(3); + BOOST_CHECK(r.ready()); + BOOST_CHECK(3 == co_await r); +} + + +CO_TEST_CASE(int_2) +{ + cobalt::ring_buffer chn{2u, co_await cobalt::this_coro::executor}; + co_await chn.write(0); + co_await chn.write(1); + co_await chn.write(2); + auto r = [](cobalt::ring_buffer & rb) -> cobalt::promise { co_return co_await rb.read();}(chn); + + BOOST_CHECK(r.ready()); + co_await chn.write(3); + BOOST_CHECK(r.ready()); + BOOST_CHECK_EQUAL(co_await r, 1); + BOOST_CHECK_EQUAL(co_await chn.read(), 2); + BOOST_CHECK_EQUAL(co_await chn.read(), 3); + +} + + +}