Skip to content

Commit 8a113ed

Browse files
committed
Various small fixes to align with LEWG feedback.
1 parent 7e6f9dc commit 8a113ed

File tree

6 files changed

+57
-48
lines changed

6 files changed

+57
-48
lines changed

include/exec/__detail/__system_context_default_impl.hpp

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ namespace exec::__system_context_default_impl {
2929
using namespace stdexec::tags;
3030
using system_context_replaceability::receiver;
3131
using system_context_replaceability::bulk_item_receiver;
32-
using system_context_replaceability::storage;
3332
using system_context_replaceability::parallel_scheduler_backend;
3433
using system_context_replaceability::__parallel_scheduler_backend_factory;
3534

@@ -104,17 +103,18 @@ namespace exec::__system_context_default_impl {
104103
};
105104

106105
/// Ensure that `__storage` is aligned to `__alignment`. Shrinks the storage, if needed, to match desired alignment.
107-
inline storage __ensure_alignment(storage __storage, size_t __alignment) noexcept {
108-
auto __pn = reinterpret_cast<uintptr_t>(__storage.__data);
106+
inline std::span<std::byte>
107+
__ensure_alignment(std::span<std::byte> __storage, size_t __alignment) noexcept {
108+
auto __pn = reinterpret_cast<uintptr_t>(__storage.data());
109109
if (__pn % __alignment == 0) {
110110
return __storage;
111-
} else if (__storage.__size < __alignment) {
112-
return {nullptr, 0};
111+
} else if (__storage.size() < __alignment) {
112+
return {};
113113
} else {
114114
auto __new_pn = (__pn + __alignment - 1) & ~(__alignment - 1);
115115
return {
116-
reinterpret_cast<void*>(__new_pn),
117-
static_cast<uint32_t>(__storage.__size - (__new_pn - __pn))};
116+
reinterpret_cast<std::byte*>(__new_pn),
117+
static_cast<size_t>(__storage.size() - (__new_pn - __pn))};
118118
}
119119
}
120120

@@ -126,13 +126,15 @@ namespace exec::__system_context_default_impl {
126126
bool __on_heap_;
127127

128128
/// Try to construct the operation in the preallocated memory if it fits, otherwise allocate a new operation.
129-
static __operation*
130-
__construct_maybe_alloc(storage __storage, receiver* __completion, _Sender __sndr) {
129+
static __operation* __construct_maybe_alloc(
130+
std::span<std::byte> __storage,
131+
receiver* __completion,
132+
_Sender __sndr) {
131133
__storage = __ensure_alignment(__storage, alignof(__operation));
132-
if (__storage.__data == nullptr || __storage.__size < sizeof(__operation)) {
134+
if (__storage.data() == nullptr || __storage.size() < sizeof(__operation)) {
133135
return new __operation(std::move(__sndr), __completion, true);
134136
} else {
135-
return new (__storage.__data) __operation(std::move(__sndr), __completion, false);
137+
return new (__storage.data()) __operation(std::move(__sndr), __completion, false);
136138
}
137139
}
138140

@@ -174,7 +176,7 @@ namespace exec::__system_context_default_impl {
174176
bulk_item_receiver* __r_;
175177

176178
void operator()(unsigned long __idx) const noexcept {
177-
__r_->start(static_cast<uint32_t>(__idx));
179+
__r_->execute(static_cast<uint32_t>(__idx));
178180
}
179181
};
180182

@@ -187,27 +189,29 @@ namespace exec::__system_context_default_impl {
187189
std::declval<__bulk_functor>()))>;
188190

189191
public:
190-
void schedule(storage __storage, receiver* __r) noexcept override {
192+
void schedule(std::span<std::byte> __storage, receiver& __r) noexcept override {
191193
try {
192194
auto __sndr = stdexec::schedule(__pool_scheduler_);
193195
auto __os =
194-
__schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr));
196+
__schedule_operation_t::__construct_maybe_alloc(__storage, &__r, std::move(__sndr));
195197
__os->start();
196198
} catch (std::exception& __e) {
197-
__r->set_error(std::current_exception());
199+
__r.set_error(std::current_exception());
198200
}
199201
}
200202

201-
void
202-
bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r) noexcept override {
203+
void bulk_schedule(
204+
uint32_t __size,
205+
std::span<std::byte> __storage,
206+
bulk_item_receiver& __r) noexcept override {
203207
try {
204208
auto __sndr =
205-
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{__r});
209+
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{&__r});
206210
auto __os =
207-
__bulk_schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr));
211+
__bulk_schedule_operation_t::__construct_maybe_alloc(__storage, &__r, std::move(__sndr));
208212
__os->start();
209213
} catch (std::exception& __e) {
210-
__r->set_error(std::current_exception());
214+
__r.set_error(std::current_exception());
211215
}
212216
}
213217
};

include/exec/__detail/__system_context_replaceability_api.hpp

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
#include "stdexec/__detail/__execution_fwd.hpp"
2121

2222
#include <cstdint>
23+
#include <cstddef>
2324
#include <exception>
2425
#include <optional>
2526
#include <memory>
27+
#include <span>
2628

2729
struct __uuid {
2830
std::uint64_t __parts1;
@@ -111,14 +113,7 @@ namespace exec::system_context_replaceability {
111113
/// Receiver for bulk sheduling operations.
112114
struct bulk_item_receiver : receiver {
113115
/// Called for each item of a bulk operation, possible on different threads.
114-
virtual void start(std::uint32_t) noexcept = 0;
115-
};
116-
117-
/// Describes a storage space.
118-
/// Used to pass preallocated storage from the frontend to the backend.
119-
struct storage {
120-
void* __data;
121-
std::uint32_t __size;
116+
virtual void execute(std::uint32_t) noexcept = 0;
122117
};
123118

124119
/// Interface for the parallel scheduler backend.
@@ -129,11 +124,13 @@ namespace exec::system_context_replaceability {
129124

130125
/// Schedule work on parallel scheduler, calling `__r` when done and using `__s` for preallocated
131126
/// memory.
132-
virtual void schedule(storage __s, receiver* __r) noexcept = 0;
127+
virtual void schedule(std::span<std::byte> __s, receiver& __r) noexcept = 0;
133128
/// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for each item and then
134129
/// when done, and using `__s` for preallocated memory.
135-
virtual void
136-
bulk_schedule(std::uint32_t __n, storage __s, bulk_item_receiver* __r) noexcept = 0;
130+
virtual void bulk_schedule(
131+
std::uint32_t __n,
132+
std::span<std::byte> __s,
133+
bulk_item_receiver& __r) noexcept = 0;
137134
};
138135

139136
} // namespace exec::system_context_replaceability

include/exec/system_context.hpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ namespace exec {
121121
struct __aligned_storage {
122122
alignas(_Align) unsigned char __data_[_Size];
123123

124-
system_context_replaceability::storage __as_storage() noexcept {
125-
return {__data_, _Size};
124+
std::span<std::byte> __as_storage() noexcept {
125+
return {reinterpret_cast<std::byte*>(__data_), _Size};
126126
}
127127

128128
template <class _T>
@@ -192,7 +192,7 @@ namespace exec {
192192
auto& __scheduler_impl = __preallocated_.__as<__backend_ptr>();
193193
auto __impl = std::move(__scheduler_impl);
194194
std::destroy_at(&__scheduler_impl);
195-
__impl->schedule(__preallocated_.__as_storage(), &__rcvr_);
195+
__impl->schedule(__preallocated_.__as_storage(), __rcvr_);
196196
}
197197

198198
/// Object that receives completion from the work described by the sender.
@@ -228,6 +228,10 @@ namespace exec {
228228
return {__scheduler_};
229229
}
230230

231+
/// Value completion happens on the parallel scheduler.
232+
parallel_scheduler
233+
query(stdexec::get_completion_scheduler_t<stdexec::set_value_t>) const noexcept;
234+
231235
/// Connects `__self` to `__rcvr`, returning the operation state containing the work to be done.
232236
template <stdexec::receiver _Rcvr>
233237
auto connect(_Rcvr __rcvr) && noexcept(stdexec::__nothrow_move_constructible<_Rcvr>) //
@@ -354,7 +358,7 @@ namespace exec {
354358
}
355359

356360
/// Calls the bulk functor passing `__index` and the values from the previous sender.
357-
void start(uint32_t __index) noexcept override {
361+
void execute(uint32_t __index) noexcept override {
358362
auto __state = reinterpret_cast<_BulkState*>(this);
359363
std::apply(
360364
[&](auto&&... __args) { __state->__fun_(__index, __args...); },
@@ -382,8 +386,7 @@ namespace exec {
382386
_Rcvr __rcvr_;
383387

384388
/// Function that prepares the preallocated storage for calling the backend.
385-
system_context_replaceability::storage (*__prepare_storage_for_backend)(__bulk_state_base*){
386-
nullptr};
389+
std::span<std::byte> (*__prepare_storage_for_backend)(__bulk_state_base*){nullptr};
387390

388391
__bulk_state_base(_Fn&& __fun, _Rcvr&& __rcvr)
389392
: __fun_{std::move(__fun)}
@@ -426,7 +429,7 @@ namespace exec {
426429

427430
// Schedule the bulk work on the system scheduler.
428431
// This will invoke `start` on our receiver multiple times, and then a completion signal (e.g., `set_value`).
429-
__scheduler->bulk_schedule(__size, __storage, __r);
432+
__scheduler->bulk_schedule(__size, __storage, *__r);
430433
}
431434

432435
/// Invoked when the previous sender completes with "stopped" to stop the entire work.
@@ -469,8 +472,7 @@ namespace exec {
469472
__aligned_storage<_PreallocatedSize, _PreallocatedAlign> __preallocated_;
470473

471474
/// Destroys the inner operation state object, and returns the preallocated storage for it to be used by the backend.
472-
static system_context_replaceability::storage
473-
__prepare_storage_for_backend_impl(__bulk_state_base_t* __base) {
475+
static std::span<std::byte> __prepare_storage_for_backend_impl(__bulk_state_base_t* __base) {
474476
auto* __self = static_cast<__system_bulk_op*>(__base);
475477
// We don't need anymore the storage for the previous operation state.
476478
__self->__preallocated_.template __as<__inner_op_state>().~__inner_op_state();
@@ -583,6 +585,11 @@ namespace exec {
583585
return parallel_scheduler{std::move(__impl)};
584586
}
585587

588+
inline parallel_scheduler __parallel_sender::query(
589+
stdexec::get_completion_scheduler_t<stdexec::set_value_t>) const noexcept {
590+
return __detail::__make_parallel_scheduler_from(stdexec::set_value_t{}, __scheduler_);
591+
}
592+
586593
inline auto parallel_scheduler::query(stdexec::get_forward_progress_guarantee_t) const noexcept
587594
-> stdexec::forward_progress_guarantee {
588595
return stdexec::forward_progress_guarantee::parallel;

test/exec/test_system_context.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ struct my_parallel_scheduler_backend_impl
208208
return count_schedules_;
209209
}
210210

211-
void schedule(scr::storage __s, scr::receiver* __r) noexcept override {
211+
void schedule(std::span<std::byte> __s, scr::receiver& __r) noexcept override {
212212
count_schedules_++;
213213
base_t::schedule(__s, __r);
214214
}
@@ -219,14 +219,15 @@ struct my_parallel_scheduler_backend_impl
219219
};
220220

221221
struct my_inline_scheduler_backend_impl : scr::parallel_scheduler_backend {
222-
void schedule(scr::storage s, scr::receiver* r) noexcept override {
223-
r->set_value();
222+
void schedule(std::span<std::byte> s, scr::receiver& r) noexcept override {
223+
r.set_value();
224224
}
225225

226-
void bulk_schedule(uint32_t count, scr::storage s, scr::bulk_item_receiver* r) noexcept override {
226+
void bulk_schedule(uint32_t count, std::span<std::byte> s, scr::bulk_item_receiver& r) noexcept
227+
override {
227228
for (uint32_t i = 0; i < count; ++i)
228-
r->start(i);
229-
r->set_value();
229+
r.execute(i);
230+
r.set_value();
230231
}
231232
};
232233

test/exec/test_system_context_replaceability.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ namespace {
3232

3333
my_parallel_scheduler_backend_impl() = default;
3434

35-
void schedule(scr::storage __s, scr::receiver* __r) noexcept override {
35+
void schedule(std::span<std::byte> __s, scr::receiver& __r) noexcept override {
3636
count_schedules++;
3737
base_t::schedule(__s, __r);
3838
}

test_package/test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#include <cstdlib>
55

66
int main() {
7-
auto x = stdexec::starts_on(exec::system_context().get_scheduler(), stdexec::just(42));
7+
auto x = stdexec::starts_on(exec::get_parallel_scheduler(), stdexec::just(42));
88
auto [a] = stdexec::sync_wait(std::move(x)).value();
99
return a == 42 ? EXIT_SUCCESS : EXIT_FAILURE;
1010
}

0 commit comments

Comments
 (0)