Skip to content

Commit 665b518

Browse files
authored
Event engine implementation (#139)
Provide generic event engine implementation which can operate over different states. feat(ee): provide subscribe states Provide list of states which correspond to subscribe state machine. feat(ee): provide list of subscribe state machine events Provide list of events which trigger transition between states in subscribe state machine. feat(ee): define transitions Define transition directions in response of event depending from current state machine state. feat(ee): provide list of effects and invocations Provide list of effects invocation and effects which can be called as part of transitions.
1 parent d90a8f2 commit 665b518

File tree

20 files changed

+2867
-28
lines changed

20 files changed

+2867
-28
lines changed

Cargo.toml

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,7 @@ build = "build.rs"
1414
[features]
1515

1616
# Enables all non-conflicting features
17-
full = [
18-
"publish",
19-
"access",
20-
"serde",
21-
"reqwest",
22-
"aescbc",
23-
"parse_token",
24-
"blocking",
25-
"std"
26-
]
17+
full = ["publish", "access", "serde", "reqwest", "aescbc", "parse_token", "blocking", "std"]
2718

2819
# Enables all default features
2920
default = ["publish", "serde", "reqwest", "aescbc", "std", "blocking"]
@@ -54,23 +45,7 @@ reqwest = ["dep:reqwest", "dep:bytes"]
5445
blocking = ["reqwest?/blocking"]
5546

5647
## Enables std library
57-
std = [
58-
"derive_builder/std",
59-
"log/std",
60-
"uuid/std",
61-
"base64/std",
62-
"spin/std",
63-
"snafu/std",
64-
"hmac/std",
65-
"sha2/std",
66-
"time/std",
67-
"bytes?/std",
68-
"getrandom/std",
69-
"rand/default",
70-
"serde?/std",
71-
"serde_json?/std",
72-
"ciborium?/std"
73-
]
48+
std = ["derive_builder/std", "log/std", "uuid/std", "base64/std", "spin/std", "snafu/std", "hmac/std", "sha2/std", "time/std", "bytes?/std", "getrandom/std", "rand/default", "serde?/std", "serde_json?/std", "ciborium?/std"]
7449

7550
## Enables very specific implementations for different platforms.
7651
##
@@ -96,6 +71,7 @@ async-trait = "0.1"
9671
log = "0.4"
9772
hashbrown = "0.13"
9873
spin = "0.9"
74+
phantom-type = { vestion = "0.4.2", default-features = false }
9975
percent-encoding = { version = "2.1", default-features = false }
10076
base64 = { version = "0.21", features = ["alloc"], default-features = false }
10177
derive_builder = {version = "0.12", default-features = false }

src/core/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use snafu::Snafu;
2828
/// ```
2929
///
3030
/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
31-
#[derive(Snafu, Debug, Clone)]
31+
#[derive(Snafu, Debug, Clone, PartialEq)]
3232
pub enum PubNubError {
3333
/// this error is returned when the transport layer fails
3434
#[snafu(display("Transport error: {details}"))]

src/core/event_engine/effect.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use crate::{
2+
core::event_engine::EffectInvocation,
3+
lib::alloc::{string::String, vec::Vec},
4+
};
5+
6+
pub(crate) trait Effect {
7+
type Invocation: EffectInvocation;
8+
9+
/// Unique effect identifier.
10+
fn id(&self) -> String;
11+
12+
/// Run work associated with effect.
13+
fn run<F>(&self, f: F)
14+
where
15+
F: FnMut(Option<Vec<<Self::Invocation as EffectInvocation>::Event>>);
16+
17+
/// Cancel any ongoing effect's work.
18+
fn cancel(&self);
19+
}
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
use crate::{
2+
core::event_engine::{Effect, EffectHandler, EffectInvocation},
3+
lib::alloc::{rc::Rc, vec, vec::Vec},
4+
};
5+
use phantom_type::PhantomType;
6+
use spin::rwlock::RwLock;
7+
8+
/// State machine effects dispatcher.
9+
#[allow(dead_code)]
10+
pub(crate) struct EffectDispatcher<EH, EF, EI>
11+
where
12+
EI: EffectInvocation<Effect = EF>,
13+
EH: EffectHandler<EI, EF>,
14+
EF: Effect<Invocation = EI>,
15+
{
16+
/// Effect invocation handler.
17+
///
18+
/// Handler responsible for providing actual implementation of
19+
handler: EH,
20+
21+
/// Dispatched effects managed by dispatcher.
22+
///
23+
/// There are effects whose lifetime should be managed by the dispatcher.
24+
/// State machines may have some effects that are exclusive and can only run
25+
/// one type of them at once. The dispatcher handles such effects
26+
/// and cancels them when required.
27+
managed: RwLock<Vec<Rc<EF>>>,
28+
29+
_invocation: PhantomType<EI>,
30+
}
31+
32+
impl<EH, EF, EI> EffectDispatcher<EH, EF, EI>
33+
where
34+
EI: EffectInvocation<Effect = EF>,
35+
EH: EffectHandler<EI, EF>,
36+
EF: Effect<Invocation = EI>,
37+
{
38+
/// Create new effects dispatcher.
39+
pub fn new(handler: EH) -> Self {
40+
EffectDispatcher {
41+
handler,
42+
managed: RwLock::new(vec![]),
43+
_invocation: Default::default(),
44+
}
45+
}
46+
47+
/// Dispatch effect associated with `invocation`.
48+
pub fn dispatch<F>(&self, invocation: &EI, mut f: F)
49+
where
50+
F: FnMut(Option<Vec<EI::Event>>),
51+
{
52+
if let Some(effect) = self.handler.create(invocation) {
53+
let effect = Rc::new(effect);
54+
55+
if invocation.managed() {
56+
let mut managed = self.managed.write();
57+
managed.push(effect.clone());
58+
}
59+
60+
// Placeholder for effect invocation.
61+
effect.run(|events| {
62+
// Try remove effect from list of managed.
63+
self.remove_managed_effect(&effect);
64+
65+
// Notify about effect run completion.
66+
// Placeholder for effect events processing (pass to effects handler).
67+
f(events);
68+
});
69+
} else if invocation.cancelling() {
70+
self.cancel_effect(invocation);
71+
72+
// Placeholder for effect events processing (pass to effects handler).
73+
f(None);
74+
}
75+
}
76+
77+
/// Handle effect cancellation.
78+
///
79+
/// Effects with managed lifecycle can be cancelled by corresponding effect
80+
/// invocations.
81+
fn cancel_effect(&self, invocation: &EI) {
82+
let mut managed = self.managed.write();
83+
if let Some(position) = managed.iter().position(|e| invocation.cancelling_effect(e)) {
84+
managed.remove(position).cancel();
85+
}
86+
}
87+
88+
/// Remove managed effect.
89+
fn remove_managed_effect(&self, effect: &EF) {
90+
let mut managed = self.managed.write();
91+
if let Some(position) = managed.iter().position(|ef| ef.id() == effect.id()) {
92+
managed.remove(position);
93+
}
94+
}
95+
}
96+
97+
#[cfg(test)]
98+
mod should {
99+
use super::*;
100+
use crate::core::event_engine::Event;
101+
102+
enum TestEvent {}
103+
104+
impl Event for TestEvent {
105+
fn id(&self) -> &str {
106+
"no_id"
107+
}
108+
}
109+
110+
enum TestEffect {
111+
One,
112+
Two,
113+
Three,
114+
}
115+
116+
impl Effect for TestEffect {
117+
type Invocation = TestInvocation;
118+
119+
fn id(&self) -> String {
120+
match self {
121+
Self::One => "EFFECT_ONE".into(),
122+
Self::Two => "EFFECT_TWO".into(),
123+
Self::Three => "EFFECT_THREE".into(),
124+
}
125+
}
126+
127+
fn run<F>(&self, mut f: F)
128+
where
129+
F: FnMut(Option<Vec<TestEvent>>),
130+
{
131+
match self {
132+
Self::Three => {}
133+
_ => f(None),
134+
}
135+
}
136+
137+
fn cancel(&self) {
138+
// Do nothing.
139+
}
140+
}
141+
142+
enum TestInvocation {
143+
One,
144+
Two,
145+
Three,
146+
CancelThree,
147+
}
148+
149+
impl EffectInvocation for TestInvocation {
150+
type Effect = TestEffect;
151+
type Event = TestEvent;
152+
153+
fn id(&self) -> &str {
154+
match self {
155+
Self::One => "EFFECT_ONE_INVOCATION",
156+
Self::Two => "EFFECT_TWO_INVOCATION",
157+
Self::Three => "EFFECT_THREE_INVOCATION",
158+
Self::CancelThree => "EFFECT_THREE_CANCEL_INVOCATION",
159+
}
160+
}
161+
162+
fn managed(&self) -> bool {
163+
matches!(self, Self::Two | Self::Three)
164+
}
165+
166+
fn cancelling(&self) -> bool {
167+
matches!(self, Self::CancelThree)
168+
}
169+
170+
fn cancelling_effect(&self, effect: &Self::Effect) -> bool {
171+
match self {
172+
TestInvocation::CancelThree => matches!(effect, TestEffect::Three),
173+
_ => false,
174+
}
175+
}
176+
}
177+
178+
struct TestEffectHandler {}
179+
180+
impl EffectHandler<TestInvocation, TestEffect> for TestEffectHandler {
181+
fn create(&self, invocation: &TestInvocation) -> Option<TestEffect> {
182+
match invocation {
183+
TestInvocation::One => Some(TestEffect::One),
184+
TestInvocation::Two => Some(TestEffect::Two),
185+
TestInvocation::Three => Some(TestEffect::Three),
186+
_ => None,
187+
}
188+
}
189+
}
190+
191+
#[test]
192+
fn run_not_managed_effect() {
193+
let mut called = false;
194+
let dispatcher = EffectDispatcher::new(TestEffectHandler {});
195+
dispatcher.dispatch(&TestInvocation::One, |_| {
196+
called = true;
197+
});
198+
199+
assert!(called, "Expected to call effect for TestInvocation::One");
200+
assert_eq!(
201+
dispatcher.managed.read().len(),
202+
0,
203+
"Non managed effects shouldn't be stored"
204+
);
205+
}
206+
207+
#[test]
208+
fn run_managed_effect() {
209+
let mut called = false;
210+
let dispatcher = EffectDispatcher::new(TestEffectHandler {});
211+
dispatcher.dispatch(&TestInvocation::Two, |_| {
212+
called = true;
213+
});
214+
215+
assert!(called, "Expected to call effect for TestInvocation::Two");
216+
assert_eq!(
217+
dispatcher.managed.read().len(),
218+
0,
219+
"Managed effect should be removed on completion"
220+
);
221+
}
222+
223+
#[test]
224+
fn cancel_managed_effect() {
225+
let mut called_managed = false;
226+
let mut cancelled_managed = false;
227+
let dispatcher = EffectDispatcher::new(TestEffectHandler {});
228+
dispatcher.dispatch(&TestInvocation::Three, |_| {
229+
called_managed = true;
230+
});
231+
232+
assert!(
233+
!called_managed,
234+
"Expected that effect for TestInvocation::Three won't be called"
235+
);
236+
assert_eq!(
237+
dispatcher.managed.read().len(),
238+
1,
239+
"Managed effect shouldn't complete run because doesn't have completion call"
240+
);
241+
242+
dispatcher.dispatch(&TestInvocation::CancelThree, |_| {
243+
cancelled_managed = true;
244+
});
245+
246+
assert!(
247+
cancelled_managed,
248+
"Expected to call effect for TestInvocation::CancelThree"
249+
);
250+
assert!(
251+
!called_managed,
252+
"Expected that effect for TestInvocation::Three won't be called"
253+
);
254+
assert_eq!(
255+
dispatcher.managed.read().len(),
256+
0,
257+
"Managed effect should be cancelled"
258+
);
259+
}
260+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
use crate::core::event_engine::{Effect, EffectInvocation};
2+
3+
pub(crate) trait EffectHandler<I, EF>
4+
where
5+
I: EffectInvocation,
6+
EF: Effect,
7+
{
8+
/// Create effect using information of effect `invocation`.
9+
fn create(&self, invocation: &I) -> Option<EF>;
10+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use crate::core::event_engine::{Effect, Event};
2+
3+
/// Effect invocation trait.
4+
///
5+
/// Invocation is an intention to run an effect. Effect dispatcher uses intents
6+
/// to schedule actual effect invocation.
7+
pub(crate) trait EffectInvocation {
8+
type Effect: Effect;
9+
type Event: Event;
10+
11+
/// Unique effect invocation identifier.
12+
fn id(&self) -> &str;
13+
14+
/// Whether invoked effect lifetime should be managed by dispatcher or not.
15+
fn managed(&self) -> bool;
16+
17+
/// Whether effect invocation cancels managed effect or not.
18+
fn cancelling(&self) -> bool;
19+
20+
/// Whether effect invocation cancels specific managed effect or not.
21+
fn cancelling_effect(&self, effect: &Self::Effect) -> bool;
22+
}

src/core/event_engine/event.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/// Event engine external event.
2+
///
3+
/// State machine uses events to calculate transition path and list of effects
4+
/// invocations.
5+
///
6+
/// Types which are expected to be used as events should implement this trait.
7+
pub(crate) trait Event {
8+
/// Event identifier.
9+
fn id(&self) -> &str;
10+
}

0 commit comments

Comments
 (0)