Skip to content

Commit dca7cc1

Browse files
added ring_buffer.
1 parent fce91b5 commit dca7cc1

File tree

8 files changed

+841
-1
lines changed

8 files changed

+841
-1
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ if (NOT BOOST_COBALT_IS_ROOT)
3434
src/detail/exception.cpp
3535
src/detail/util.cpp
3636
src/channel.cpp
37+
src/ring_buffer.cpp
3738
src/error.cpp
3839
src/main.cpp
3940
src/this_thread.cpp
@@ -186,6 +187,7 @@ else()
186187
src/detail/util.cpp
187188
src/error.cpp
188189
src/channel.cpp
190+
src/ring_buffer.cpp
189191
src/main.cpp
190192
src/this_thread.cpp
191193
src/thread.cpp)

doc/index.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ include::reference/concepts.adoc[]
6666
include::reference/this_coro.adoc[]
6767
include::reference/this_thread.adoc[]
6868
include::reference/channel.adoc[]
69+
include::reference/ring_buffer.adoc[]
6970
include::reference/with.adoc[]
7071
include::reference/race.adoc[]
7172
include::reference/gather.adoc[]

doc/reference/ring_buffer.adoc

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
[#ring_buffer]
2+
== cobalt/ring_buffer.hpp
3+
4+
Channels can be used to exchange data between different coroutines
5+
on a single thread.
6+
7+
=== Outline
8+
9+
.ring_buffer outline
10+
[example]
11+
[source,cpp,subs=+quotes]
12+
----
13+
include::../../include/boost/cobalt/ring_buffer.hpp[tag=outline]
14+
----
15+
16+
=== Description
17+
18+
Ring Buffers are a tool for two coroutines to communicate and synchronize,
19+
which does not block the writing coroutine, but overwrites the values in tbue ffer.
20+
21+
[source,cpp]
22+
----
23+
const std::size_t buffer_size = 2;
24+
ring_buffer<int> ch{exec, buffer_size};
25+
26+
// in coroutine <1>
27+
co_await ch.write(42);
28+
29+
// in coroutine <2>
30+
auto val = co_await ch.read();
31+
----
32+
<1> Send a value to the ring_buffer - will not block, but may prioritize coroutine
33+
<2> Read a value from the ring_buffer - will block until a value is awaitable.
34+
35+
The read operation will block if no value is available.
36+
The write operation might suspend and resume itself later if a read is waiting.
37+
38+
NOTE: A ring_buffer type can be `void`, in which case `write` takes no parameter.
39+
40+
The ring_buffer read operation can be cancelled without losing data.
41+
This makes it usable with <<race, race>>.
42+
43+
[source,cpp]
44+
----
45+
generator<variant2::variant<int, double>> merge(
46+
ring_buffer<int> & c1,
47+
ring_buffer<double> & c2)
48+
{
49+
while (c1 && c2)
50+
co_yield co_await race(c1.read(), c2.read());
51+
}
52+
----
53+
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
//
2+
// Copyright (c) 2022 Klemens Morgenstern ([email protected])
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
8+
#ifndef BOOST_COBALT_IMPL_RING_BUFFER_HPP
9+
#define BOOST_COBALT_IMPL_RING_BUFFER_HPP
10+
11+
#include <boost/cobalt/ring_buffer.hpp>
12+
#include <boost/cobalt/result.hpp>
13+
14+
#include <boost/asio/post.hpp>
15+
16+
namespace boost::cobalt
17+
{
18+
#if !defined(BOOST_COBALT_NO_PMR)
19+
template<typename T>
20+
inline ring_buffer<T>::ring_buffer(
21+
std::size_t limit,
22+
executor executor,
23+
pmr::memory_resource * resource)
24+
: buffer_(limit, pmr::polymorphic_allocator<T>(resource)), executor_(executor) {}
25+
#else
26+
template<typename T>
27+
inline ring_buffer<T>::ring_buffer(
28+
std::size_t limit,
29+
executor executor)
30+
: buffer_(limit), executor_(executor) {}
31+
#endif
32+
33+
template<typename T>
34+
auto ring_buffer<T>::get_executor() -> const executor_type & {return executor_;}
35+
36+
template<typename T>
37+
bool ring_buffer<T>::is_open() const {return !is_closed_;}
38+
39+
template<typename T>
40+
ring_buffer<T>::~ring_buffer()
41+
{
42+
while (!read_queue_.empty())
43+
read_queue_.front().awaited_from.reset();
44+
}
45+
46+
template<typename T>
47+
void ring_buffer<T>::close()
48+
{
49+
is_closed_ = true;
50+
while (!read_queue_.empty())
51+
{
52+
auto & op = read_queue_.front();
53+
op.unlink();
54+
op.cancelled = true;
55+
op.cancel_slot.clear();
56+
57+
if (op.awaited_from)
58+
asio::post(executor_, std::move(op.awaited_from));
59+
}
60+
}
61+
62+
template<typename T>
63+
struct ring_buffer<T>::read_op::cancel_impl
64+
{
65+
read_op * op;
66+
cancel_impl(read_op * op) : op(op) {}
67+
void operator()(asio::cancellation_type)
68+
{
69+
op->cancelled = true;
70+
op->unlink();
71+
if (op->awaited_from)
72+
asio::post(
73+
op->buf->executor_,
74+
std::move(op->awaited_from));
75+
op->cancel_slot.clear();
76+
}
77+
};
78+
79+
template<typename T>
80+
template<typename Promise>
81+
std::coroutine_handle<void> ring_buffer<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
82+
{
83+
if (cancelled)
84+
return h; // already interrupted.
85+
86+
if constexpr (requires {h.promise().get_cancellation_slot();})
87+
if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
88+
cancel_slot.emplace<cancel_impl>(this);
89+
90+
if (awaited_from)
91+
boost::throw_exception(std::runtime_error("already-awaited"), loc);
92+
awaited_from.reset(h.address());
93+
// currently nothing to read
94+
if constexpr (requires {h.promise().begin_transaction();})
95+
begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
96+
97+
buf->read_queue_.push_back(*this);
98+
return std::noop_coroutine();
99+
}
100+
101+
102+
template<typename T>
103+
T ring_buffer<T>::read_op::await_resume()
104+
{
105+
return await_resume(as_result_tag{}).value(loc);
106+
}
107+
108+
template<typename T>
109+
std::tuple<system::error_code, T> ring_buffer<T>::read_op::await_resume(const struct as_tuple_tag &)
110+
{
111+
auto res = await_resume(as_result_tag{});
112+
113+
if (res.has_error())
114+
return {res.error(), T{}};
115+
else
116+
return {system::error_code{}, std::move(*res)};
117+
118+
}
119+
120+
template<typename T>
121+
system::result<T> ring_buffer<T>::read_op::await_resume(const struct as_result_tag &)
122+
{
123+
if (cancel_slot.is_connected())
124+
cancel_slot.clear();
125+
126+
if (direct)
127+
return std::move(*direct);
128+
129+
if (buf->is_closed_ && buf->buffer_.empty())
130+
{
131+
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
132+
return {system::in_place_error, asio::error::broken_pipe, &loc};
133+
}
134+
135+
if (cancelled)
136+
{
137+
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
138+
return {system::in_place_error, asio::error::operation_aborted, &loc};
139+
}
140+
141+
auto value = std::move(buf->buffer_.front());
142+
buf->buffer_.pop_front();
143+
return {system::in_place_value, std::move(value)};
144+
}
145+
146+
template<typename T>
147+
template<typename Promise>
148+
std::coroutine_handle<void> ring_buffer<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
149+
{
150+
if (cancelled)
151+
return h;
152+
153+
154+
if (!buf->read_queue_.empty())
155+
{
156+
auto & op = buf->read_queue_.front();
157+
158+
if (buf->buffer_.empty())
159+
{
160+
op.direct = std::move(value);
161+
}
162+
else
163+
{
164+
op.direct = std::move(buf->buffer_.front());
165+
buf->buffer_.pop_front();
166+
buf->buffer_.push_back(std::move(value));
167+
}
168+
169+
op.transactional_unlink();
170+
asio::post(buf->executor_, unique_handle<Promise>::from_promise(h.promise()));
171+
return op.awaited_from.release();
172+
}
173+
else if (buf->buffer_.capacity() > 0u)
174+
buf->buffer_.push_back(std::move(value));
175+
176+
return h;
177+
}
178+
179+
template<typename T>
180+
void ring_buffer<T>::post(T && value)
181+
{
182+
if (!read_queue_.empty())
183+
{
184+
auto & op = read_queue_.front();
185+
186+
if (buffer_.empty())
187+
{
188+
op.direct = std::move(value);
189+
}
190+
else
191+
{
192+
op.direct = std::move(buffer_.front());
193+
buffer_.pop_front();
194+
buffer_.push_back(std::move(value));
195+
}
196+
197+
op.transactional_unlink();
198+
asio::post(executor_, std::move(op.awaited_from));
199+
}
200+
else if (buffer_.capacity() > 0u)
201+
buffer_.push_back(std::move(value));
202+
}
203+
204+
205+
template<typename T>
206+
std::tuple<system::error_code> ring_buffer<T>::write_op::await_resume(const struct as_tuple_tag &)
207+
{
208+
return await_resume(as_result_tag{}).error();
209+
}
210+
211+
template<typename T>
212+
void ring_buffer<T>::write_op::await_resume()
213+
{
214+
await_resume(as_result_tag{}).value(loc);
215+
}
216+
217+
template<typename T>
218+
system::result<void> ring_buffer<T>::write_op::await_resume(const struct as_result_tag &)
219+
{
220+
if (buf->is_closed_)
221+
{
222+
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
223+
return {system::in_place_error, asio::error::broken_pipe, &loc};
224+
}
225+
226+
if (cancelled)
227+
{
228+
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
229+
return {system::in_place_error, asio::error::operation_aborted, &loc};
230+
}
231+
return system::in_place_value;
232+
}
233+
234+
struct ring_buffer<void>::read_op::cancel_impl
235+
{
236+
read_op * op;
237+
cancel_impl(read_op * op) : op(op) {}
238+
void operator()(asio::cancellation_type)
239+
{
240+
op->cancelled = true;
241+
op->unlink();
242+
if (op->awaited_from)
243+
asio::post(op->buf->executor_, std::move(op->awaited_from));
244+
op->cancel_slot.clear();
245+
}
246+
};
247+
248+
template<typename Promise>
249+
std::coroutine_handle<void> ring_buffer<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
250+
{
251+
if (cancelled)
252+
return h; // already interrupted.
253+
254+
if constexpr (requires {h.promise().get_cancellation_slot();})
255+
if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
256+
cancel_slot.emplace<cancel_impl>(this);
257+
258+
if (awaited_from)
259+
boost::throw_exception(std::runtime_error("already-awaited"), loc);
260+
awaited_from.reset(h.address());
261+
262+
if constexpr (requires {h.promise().begin_transaction();})
263+
begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
264+
265+
buf->read_queue_.push_back(*this);
266+
return std::noop_coroutine();
267+
}
268+
269+
270+
template<typename Promise>
271+
std::coroutine_handle<void> ring_buffer<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
272+
{
273+
if (cancelled)
274+
return h; // already interrupted.
275+
276+
if (!buf->read_queue_.empty())
277+
{
278+
auto & op = buf->read_queue_.front();
279+
280+
op.direct = true;
281+
282+
op.transactional_unlink();
283+
asio::post(buf->executor_, unique_handle<Promise>::from_promise(h.promise()));
284+
return op.awaited_from.release();
285+
}
286+
else
287+
buf->n_++;
288+
return h;
289+
}
290+
291+
}
292+
293+
#endif //BOOST_COBALT_IMPL_RING_BUFFER_HPP

0 commit comments

Comments
 (0)