-
-
Notifications
You must be signed in to change notification settings - Fork 804
Description
Is your feature request related to a problem? Please describe.
Currently Effects in reactive_graph use any_spawner to provide a runtime. This is not suitable for environments with no globally set runtime, such as when trying to integrate reactive_graph with bevy through bevy-tokio-tasks.
Describe the solution you'd like
Versions of the Effect::new and Effect::new_sync API that allow for a callback to which you can provide custom spawn logic, for example
Effect::new_sync_with_runtime(
move || {
/* do something with effect */
},
|task| runtime.spawn(task),
);Describe alternatives you've considered
I am not aware of an alternative way to achieve this result at present, as custom executors in any_spawner are required to be 'static, and this would enable using a runtime bound on a local lifetime (such as one provided by a bevy resource) for the execution context.
Additional context
I implemented a hacky version by forking and rewriting the new_sync function to accept a reference to a Tokio runtime directly, and it fixed the problems I was having. The callback API would be a more flexible version of this.
pub fn new_sync_2<T, M>(
mut fun: impl EffectFunction<T, M> + Send + Sync + 'static,
rt: &Runtime,
) -> Self
where
T: Send + Sync + 'static,
{
let inner = cfg!(feature = "effects").then(|| {
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));
rt.spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if subscriber
.with_observer(|| subscriber.update_if_necessary())
|| first_run
{
first_run = false;
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| {
run_in_effect_scope(|| fun.run(old_value))
})
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});
ArenaItem::new_with_storage(Some(inner))
});
Self { inner }
}