Skip to content

Commit 8b848ea

Browse files
Xavraxparfeon
andauthored
add reconnection effect (#143)
Co-authored-by: Serhii Mamontov <[email protected]>
1 parent 177252d commit 8b848ea

File tree

6 files changed

+158
-13
lines changed

6 files changed

+158
-13
lines changed

src/dx/subscribe/event_engine/effect_handler.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub(crate) type HandshakeFunction = fn(
1414
channel_groups: &Option<Vec<String>>,
1515
attempt: u8,
1616
reason: Option<PubNubError>,
17-
) -> Vec<SubscribeEvent>;
17+
) -> Result<Vec<SubscribeEvent>, PubNubError>;
1818

1919
pub(crate) type ReceiveFunction = fn(
2020
channels: &Option<Vec<String>>,
@@ -66,6 +66,7 @@ impl EffectHandler<SubscribeEffectInvocation, SubscribeEffect> for SubscribeEffe
6666
channel_groups: channel_groups.clone(),
6767
attempts: *attempts,
6868
reason: reason.clone(),
69+
executor: self.handshake,
6970
}),
7071
SubscribeEffectInvocation::Receive {
7172
channels,

src/dx/subscribe/event_engine/effects/handshake.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use crate::dx::subscribe::event_engine::{effect_handler::HandshakeFunction, SubscribeEvent};
2-
use crate::lib::alloc::{string::String, vec::Vec};
2+
use crate::lib::alloc::{string::String, vec, vec::Vec};
33

44
pub(super) fn execute(
55
channels: &Option<Vec<String>>,
66
channel_groups: &Option<Vec<String>>,
77
executor: HandshakeFunction,
88
) -> Option<Vec<SubscribeEvent>> {
9-
Some(executor(channels, channel_groups, 0, None))
9+
Some(
10+
executor(channels, channel_groups, 0, None)
11+
.unwrap_or_else(|err| vec![SubscribeEvent::HandshakeFailure { reason: err }]),
12+
)
1013
}
1114

1215
#[cfg(test)]
@@ -21,18 +24,18 @@ mod should {
2124
channel_groups: &Option<Vec<String>>,
2225
attempt: u8,
2326
reason: Option<PubNubError>,
24-
) -> Vec<SubscribeEvent> {
27+
) -> Result<Vec<SubscribeEvent>, PubNubError> {
2528
assert_eq!(channels, &Some(vec!["ch1".to_string()]));
2629
assert_eq!(channel_groups, &Some(vec!["cg1".to_string()]));
2730
assert_eq!(attempt, 0);
2831
assert_eq!(reason, None);
2932

30-
vec![SubscribeEvent::HandshakeSuccess {
33+
Ok(vec![SubscribeEvent::HandshakeSuccess {
3134
cursor: SubscribeCursor {
3235
timetoken: 0,
3336
region: 0,
3437
},
35-
}]
38+
}])
3639
}
3740

3841
let result = execute(
@@ -42,6 +45,33 @@ mod should {
4245
);
4346

4447
assert!(result.is_some());
45-
assert!(!result.unwrap().is_empty())
48+
assert!(matches!(
49+
result.unwrap().first().unwrap(),
50+
&SubscribeEvent::HandshakeSuccess { .. }
51+
))
52+
}
53+
54+
#[test]
55+
fn return_handskahe_failure_event_on_err() {
56+
fn mock_handshake_function(
57+
_channels: &Option<Vec<String>>,
58+
_channel_groups: &Option<Vec<String>>,
59+
_attempt: u8,
60+
_reason: Option<PubNubError>,
61+
) -> Result<Vec<SubscribeEvent>, PubNubError> {
62+
Err(PubNubError::Transport {
63+
details: "test".into(),
64+
})
65+
}
66+
67+
let binding = execute(
68+
&Some(vec!["ch1".to_string()]),
69+
&Some(vec!["cg1".to_string()]),
70+
mock_handshake_function,
71+
)
72+
.unwrap();
73+
let result = &binding[0];
74+
75+
assert!(matches!(result, &SubscribeEvent::HandshakeFailure { .. }));
4676
}
4777
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use crate::lib::alloc::{string::String, vec, vec::Vec};
2+
use crate::{
3+
core::PubNubError,
4+
dx::subscribe::event_engine::{effect_handler::HandshakeFunction, SubscribeEvent},
5+
};
6+
7+
pub(super) fn execute(
8+
channels: &Option<Vec<String>>,
9+
channel_groups: &Option<Vec<String>>,
10+
attempt: u8,
11+
reason: PubNubError,
12+
executor: HandshakeFunction,
13+
) -> Option<Vec<SubscribeEvent>> {
14+
Some(
15+
executor(channels, channel_groups, attempt, Some(reason))
16+
.unwrap_or_else(|err| vec![SubscribeEvent::HandshakeReconnectFailure { reason: err }]),
17+
)
18+
}
19+
20+
#[cfg(test)]
21+
mod should {
22+
use super::*;
23+
use crate::{core::PubNubError, dx::subscribe::SubscribeCursor};
24+
25+
#[test]
26+
fn initialize_handshake_reconnect_attempt() {
27+
fn mock_handshake_function(
28+
channels: &Option<Vec<String>>,
29+
channel_groups: &Option<Vec<String>>,
30+
attempt: u8,
31+
reason: Option<PubNubError>,
32+
) -> Result<Vec<SubscribeEvent>, PubNubError> {
33+
assert_eq!(channels, &Some(vec!["ch1".to_string()]));
34+
assert_eq!(channel_groups, &Some(vec!["cg1".to_string()]));
35+
assert_eq!(attempt, 1);
36+
assert_eq!(
37+
reason.unwrap(),
38+
PubNubError::Transport {
39+
details: "test".into(),
40+
}
41+
);
42+
43+
Ok(vec![SubscribeEvent::HandshakeSuccess {
44+
cursor: SubscribeCursor {
45+
timetoken: 0,
46+
region: 0,
47+
},
48+
}])
49+
}
50+
51+
let result = execute(
52+
&Some(vec!["ch1".to_string()]),
53+
&Some(vec!["cg1".to_string()]),
54+
1,
55+
PubNubError::Transport {
56+
details: "test".into(),
57+
},
58+
mock_handshake_function,
59+
);
60+
61+
assert!(result.is_some());
62+
assert!(!result.unwrap().is_empty())
63+
}
64+
65+
#[test]
66+
fn return_handskahe_failure_event_on_err() {
67+
fn mock_handshake_function(
68+
_channels: &Option<Vec<String>>,
69+
_channel_groups: &Option<Vec<String>>,
70+
_attempt: u8,
71+
_reason: Option<PubNubError>,
72+
) -> Result<Vec<SubscribeEvent>, PubNubError> {
73+
Err(PubNubError::Transport {
74+
details: "test".into(),
75+
})
76+
}
77+
78+
let binding = execute(
79+
&Some(vec!["ch1".to_string()]),
80+
&Some(vec!["cg1".to_string()]),
81+
1,
82+
PubNubError::Transport {
83+
details: "test".into(),
84+
},
85+
mock_handshake_function,
86+
)
87+
.unwrap();
88+
let result = &binding[0];
89+
90+
assert!(matches!(
91+
result,
92+
&SubscribeEvent::HandshakeReconnectFailure { .. }
93+
));
94+
}
95+
}

src/dx/subscribe/event_engine/effects/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
use super::effect_handler::HandshakeFunction;
99

1010
mod handshake;
11+
mod handshake_reconnection;
1112

1213
/// Subscription state machine effects.
1314
#[allow(dead_code)]
@@ -53,6 +54,11 @@ pub(crate) enum SubscribeEffect {
5354

5455
/// Initial subscribe attempt failure reason.
5556
reason: PubNubError,
57+
58+
/// Executor function.
59+
///
60+
/// Function which will be used to execute initial subscription.
61+
executor: HandshakeFunction,
5662
},
5763

5864
/// Receive updates effect invocation.
@@ -136,6 +142,19 @@ impl Effect for SubscribeEffect {
136142
channel_groups,
137143
executor,
138144
} => handshake::execute(channels, channel_groups, *executor),
145+
SubscribeEffect::HandshakeReconnect {
146+
channels,
147+
channel_groups,
148+
attempts,
149+
reason,
150+
executor,
151+
} => handshake_reconnection::execute(
152+
channels,
153+
channel_groups,
154+
*attempts,
155+
reason.clone(), // TODO: Does run function need to borrow self? Or we can consume it?
156+
*executor,
157+
),
139158
_ => {
140159
/* TODO: Implement other effects */
141160
None

src/dx/subscribe/event_engine/state.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ impl SubscribeState {
319319
Self::HandshakeReconnecting {
320320
channels: channels.clone(),
321321
channel_groups: channel_groups.clone(),
322-
attempts: 0,
322+
attempts: 1,
323323
reason: reason.clone(),
324324
},
325325
None,
@@ -663,7 +663,7 @@ impl State for SubscribeState {
663663
SubscribeEvent::HandshakeFailure { reason } => {
664664
self.handshake_failure_transition(reason)
665665
}
666-
SubscribeEvent::HandshakeReconnectFailure { reason } => {
666+
SubscribeEvent::HandshakeReconnectFailure { reason, .. } => {
667667
self.handshake_reconnect_failure_transition(reason)
668668
}
669669
SubscribeEvent::HandshakeReconnectGiveUp { reason } => {
@@ -715,9 +715,9 @@ mod should {
715715
_channel_groups: &Option<Vec<String>>,
716716
_attempt: u8,
717717
_reason: Option<PubNubError>,
718-
) -> Vec<SubscribeEvent> {
718+
) -> Result<Vec<SubscribeEvent>, PubNubError> {
719719
// Do nothing.
720-
vec![]
720+
Ok(vec![])
721721
}
722722

723723
fn receive_function(
@@ -819,7 +819,7 @@ mod should {
819819
SubscribeState::HandshakeReconnecting {
820820
channels: Some(vec!["ch1".to_string()]),
821821
channel_groups: Some(vec!["gr1".to_string()]),
822-
attempts: 0,
822+
attempts: 1,
823823
reason: PubNubError::Transport { details: "Test reason".to_string() },
824824
};
825825
"to handshake reconnect on handshake failure"

src/dx/subscribe/subscription.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl Subscription {
2929
T: Transport,
3030
{
3131
// TODO: implementation is a part of the different task
32-
let handshake: HandshakeFunction = |_, _, _, _| vec![];
32+
let handshake: HandshakeFunction = |_, _, _, _| Ok(vec![]);
3333

3434
let receive: ReceiveFunction = |&_, &_, &_, _, _| vec![];
3535

0 commit comments

Comments
 (0)