Skip to content

Commit b5ff792

Browse files
Xavraxparfeon
andauthored
add handshake effect (#142)
Co-authored-by: Serhii Mamontov <[email protected]>
1 parent 665b518 commit b5ff792

File tree

7 files changed

+138
-18
lines changed

7 files changed

+138
-18
lines changed

src/dx/subscribe/event_engine/effect_handler.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,22 @@ use crate::{
77
lib::alloc::{string::String, vec::Vec},
88
};
99

10+
use super::SubscribeEvent;
11+
1012
pub(crate) type HandshakeFunction = fn(
11-
channels: Option<Vec<String>>,
12-
channel_groups: Option<Vec<String>>,
13+
channels: &Option<Vec<String>>,
14+
channel_groups: &Option<Vec<String>>,
1315
attempt: u8,
1416
reason: Option<PubNubError>,
15-
);
17+
) -> Vec<SubscribeEvent>;
1618

1719
pub(crate) type ReceiveFunction = fn(
18-
channels: Option<Vec<String>>,
19-
channel_groups: Option<Vec<String>>,
20-
cursor: SubscribeCursor,
20+
channels: &Option<Vec<String>>,
21+
channel_groups: &Option<Vec<String>>,
22+
cursor: &SubscribeCursor,
2123
attempt: u8,
2224
reason: Option<PubNubError>,
23-
);
25+
) -> Vec<SubscribeEvent>;
2426

2527
/// Subscription effect handler.
2628
///
@@ -52,6 +54,7 @@ impl EffectHandler<SubscribeEffectInvocation, SubscribeEffect> for SubscribeEffe
5254
} => Some(SubscribeEffect::Handshake {
5355
channels: channels.clone(),
5456
channel_groups: channel_groups.clone(),
57+
executor: self.handshake,
5558
}),
5659
SubscribeEffectInvocation::HandshakeReconnect {
5760
channels,
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use crate::dx::subscribe::event_engine::{effect_handler::HandshakeFunction, SubscribeEvent};
2+
use crate::lib::alloc::{string::String, vec::Vec};
3+
4+
pub(super) fn execute(
5+
channels: &Option<Vec<String>>,
6+
channel_groups: &Option<Vec<String>>,
7+
executor: HandshakeFunction,
8+
) -> Option<Vec<SubscribeEvent>> {
9+
Some(executor(channels, channel_groups, 0, None))
10+
}
11+
12+
#[cfg(test)]
13+
mod should {
14+
use super::*;
15+
use crate::{core::PubNubError, dx::subscribe::SubscribeCursor};
16+
17+
#[test]
18+
fn initialize_handshake_for_first_attempt() {
19+
fn mock_handshake_function(
20+
channels: &Option<Vec<String>>,
21+
channel_groups: &Option<Vec<String>>,
22+
attempt: u8,
23+
reason: Option<PubNubError>,
24+
) -> Vec<SubscribeEvent> {
25+
assert_eq!(channels, &Some(vec!["ch1".to_string()]));
26+
assert_eq!(channel_groups, &Some(vec!["cg1".to_string()]));
27+
assert_eq!(attempt, 0);
28+
assert_eq!(reason, None);
29+
30+
vec![SubscribeEvent::HandshakeSuccess {
31+
cursor: SubscribeCursor {
32+
timetoken: 0,
33+
region: 0,
34+
},
35+
}]
36+
}
37+
38+
let result = execute(
39+
&Some(vec!["ch1".to_string()]),
40+
&Some(vec!["cg1".to_string()]),
41+
mock_handshake_function,
42+
);
43+
44+
assert!(result.is_some());
45+
assert!(!result.unwrap().is_empty())
46+
}
47+
}

src/dx/subscribe/event_engine/effect.rs renamed to src/dx/subscribe/event_engine/effects/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ use crate::{
55
lib::alloc::{string::String, vec::Vec},
66
};
77

8+
use super::effect_handler::HandshakeFunction;
9+
10+
mod handshake;
11+
812
/// Subscription state machine effects.
913
#[allow(dead_code)]
1014
pub(crate) enum SubscribeEffect {
@@ -21,6 +25,11 @@ pub(crate) enum SubscribeEffect {
2125
/// List of channel groups which will be source of real-time updates
2226
/// after initial subscription completion.
2327
channel_groups: Option<Vec<String>>,
28+
29+
/// Executor function.
30+
///
31+
/// Function which will be used to execute initial subscription.
32+
executor: HandshakeFunction,
2433
},
2534

2635
/// Retry initial subscribe effect invocation.
@@ -121,7 +130,19 @@ impl Effect for SubscribeEffect {
121130
F: FnMut(Option<Vec<SubscribeEvent>>),
122131
{
123132
// TODO: Run actual effect implementation. Maybe Effect.run function need change something in arguments.
124-
f(None);
133+
let events = match self {
134+
SubscribeEffect::Handshake {
135+
channels,
136+
channel_groups,
137+
executor,
138+
} => handshake::execute(channels, channel_groups, *executor),
139+
_ => {
140+
/* TODO: Implement other effects */
141+
None
142+
}
143+
};
144+
145+
f(events);
125146
}
126147

127148
fn cancel(&self) {

src/dx/subscribe/event_engine/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
//! Subscribe Event Engine module
22
33
#[doc(inline)]
4-
pub(crate) use effect::SubscribeEffect;
5-
pub(crate) mod effect;
4+
pub(crate) use effects::SubscribeEffect;
5+
pub(crate) mod effects;
66

77
#[doc(inline)]
88
#[allow(unused_imports)]
9-
pub(crate) use effect_handler::SubscribeEffectHandler;
9+
pub(crate) use effect_handler::{HandshakeFunction, ReceiveFunction, SubscribeEffectHandler};
1010
pub(crate) mod effect_handler;
1111

1212
#[doc(inline)]

src/dx/subscribe/event_engine/state.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -711,22 +711,24 @@ mod should {
711711
use test_case::test_case;
712712

713713
fn handshake_function(
714-
_channels: Option<Vec<String>>,
715-
_channel_groups: Option<Vec<String>>,
714+
_channels: &Option<Vec<String>>,
715+
_channel_groups: &Option<Vec<String>>,
716716
_attempt: u8,
717717
_reason: Option<PubNubError>,
718-
) {
718+
) -> Vec<SubscribeEvent> {
719719
// Do nothing.
720+
vec![]
720721
}
721722

722723
fn receive_function(
723-
_channels: Option<Vec<String>>,
724-
_channel_groups: Option<Vec<String>>,
725-
_cursor: SubscribeCursor,
724+
_channels: &Option<Vec<String>>,
725+
_channel_groups: &Option<Vec<String>>,
726+
_cursor: &SubscribeCursor,
726727
_attempt: u8,
727728
_reason: Option<PubNubError>,
728-
) {
729+
) -> Vec<SubscribeEvent> {
729730
// Do nothing.
731+
vec![]
730732
}
731733

732734
fn event_engine(

src/dx/subscribe/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,8 @@ pub(crate) mod event_engine;
66

77
#[doc(inline)]
88
pub use types::{SubscribeCursor, SubscribeStatus};
9+
910
pub mod types;
11+
12+
#[allow(dead_code)]
13+
pub(crate) mod subscription;

src/dx/subscribe/subscription.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use crate::{
2+
core::{blocking::Transport, event_engine::EventEngine},
3+
dx::subscribe::event_engine::{
4+
effect_handler::{HandshakeFunction, ReceiveFunction, SubscribeEffectHandler},
5+
SubscribeState,
6+
},
7+
lib::alloc::vec,
8+
PubNubGenericClient,
9+
};
10+
11+
use super::event_engine::{SubscribeEffect, SubscribeEffectInvocation};
12+
13+
type SubscribeEngine =
14+
EventEngine<SubscribeState, SubscribeEffectHandler, SubscribeEffect, SubscribeEffectInvocation>;
15+
16+
/// Subscription that is responsible for getting messages from PubNub.
17+
///
18+
/// Subscription provides a way to get messages from PubNub. It is responsible
19+
/// for handshake and receiving messages.
20+
///
21+
/// TODO: more description and examples
22+
pub struct Subscription {
23+
engine: SubscribeEngine,
24+
}
25+
26+
impl Subscription {
27+
pub(crate) fn subscribe<T>(_client: PubNubGenericClient<T>) -> Self
28+
where
29+
T: Transport,
30+
{
31+
// TODO: implementation is a part of the different task
32+
let handshake: HandshakeFunction = |_, _, _, _| vec![];
33+
34+
let receive: ReceiveFunction = |&_, &_, &_, _, _| vec![];
35+
36+
Self {
37+
engine: SubscribeEngine::new(
38+
SubscribeEffectHandler::new(handshake, receive),
39+
SubscribeState::Unsubscribed,
40+
),
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)