Skip to content

Commit 177252d

Browse files
authored
feat(retry-policy): add request retry policy (#144)
Add `RequestRetryPolicy` for delay computation between failed requests retry. fix(reqwest-response): fix missing headers Fix missing `headers` in `RequestResponse` struct when used with `reqwest` crate.
1 parent b5ff792 commit 177252d

File tree

3 files changed

+315
-4
lines changed

3 files changed

+315
-4
lines changed

src/core/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ pub mod transport_response;
3737
#[doc(inline)]
3838
pub use serialize::Serialize;
3939

40+
#[doc(inline)]
41+
pub use retry_policy::RequestRetryPolicy;
42+
pub mod retry_policy;
43+
4044
#[cfg(any(feature = "publish", feature = "access"))]
4145
pub mod headers;
4246

src/core/retry_policy.rs

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
//! # Request retry policy
2+
//!
3+
//! This module contains the [`RequestRetryPolicy`] struct.
4+
//! It is used to calculate delays between failed requests to the [`PubNub API`]
5+
//! for next retry attempt.
6+
//! It is intended to be used by the [`pubnub`] crate.
7+
//!
8+
//! [`PubNub API`]: https://www.pubnub.com/docs
9+
//! [`pubnub`]: ../index.html
10+
//!
11+
use crate::core::TransportResponse;
12+
13+
/// Request retry policy.
14+
///
15+
///
16+
pub enum RequestRetryPolicy {
17+
/// Requests shouldn't be tried again.
18+
None,
19+
20+
/// Retry the request after the same amount of time.
21+
Linear {
22+
/// The delay between failed retry attempts.
23+
delay: u32,
24+
25+
/// Number of times a request can be retried.
26+
max_retry: u32,
27+
},
28+
29+
/// Retry the request using exponential amount of time.
30+
Exponential {
31+
/// Minimum delay between failed retry attempts.
32+
min_delay: u32,
33+
34+
/// Maximum delay between failed retry attempts.
35+
max_delay: u32,
36+
37+
/// Number of times a request can be retried.
38+
max_retry: u32,
39+
},
40+
}
41+
42+
impl RequestRetryPolicy {
43+
#[allow(dead_code)]
44+
pub(crate) fn retry_delay(&self, attempt: &u32, response: &TransportResponse) -> Option<u32> {
45+
match response.status {
46+
// Respect service requested delay.
47+
429 => (!matches!(self, Self::None))
48+
.then(|| response.headers.get("retry-after"))
49+
.flatten()
50+
.and_then(|value| value.parse::<u32>().ok()),
51+
500..=599 => match self {
52+
RequestRetryPolicy::None => None,
53+
RequestRetryPolicy::Linear { delay, max_retry } => {
54+
(*attempt).le(max_retry).then_some(*delay)
55+
}
56+
RequestRetryPolicy::Exponential {
57+
min_delay,
58+
max_delay,
59+
max_retry,
60+
} => (*attempt)
61+
.le(max_retry)
62+
.then_some((*min_delay).pow(*attempt).min(*max_delay)),
63+
},
64+
_ => None,
65+
}
66+
}
67+
}
68+
69+
impl Default for RequestRetryPolicy {
70+
fn default() -> Self {
71+
Self::Exponential {
72+
min_delay: 2,
73+
max_delay: 300,
74+
max_retry: 2,
75+
}
76+
}
77+
}
78+
79+
#[cfg(test)]
80+
mod should {
81+
use super::*;
82+
use crate::lib::collections::HashMap;
83+
84+
fn client_error_response() -> TransportResponse {
85+
TransportResponse {
86+
status: 400,
87+
..Default::default()
88+
}
89+
}
90+
91+
fn too_many_requests_error_response() -> TransportResponse {
92+
TransportResponse {
93+
status: 429,
94+
headers: HashMap::from([("retry-after".into(), "150".into())]),
95+
..Default::default()
96+
}
97+
}
98+
99+
fn server_error_response() -> TransportResponse {
100+
TransportResponse {
101+
status: 500,
102+
..Default::default()
103+
}
104+
}
105+
106+
#[test]
107+
fn create_exponential_by_default() {
108+
let policy: RequestRetryPolicy = Default::default();
109+
assert!(matches!(policy, RequestRetryPolicy::Exponential { .. }));
110+
}
111+
112+
mod none_policy {
113+
use super::*;
114+
115+
#[test]
116+
fn return_none_delay_for_client_error_response() {
117+
assert_eq!(
118+
RequestRetryPolicy::None.retry_delay(&1, &client_error_response()),
119+
None
120+
);
121+
}
122+
123+
#[test]
124+
fn return_none_delay_for_server_error_response() {
125+
assert_eq!(
126+
RequestRetryPolicy::None.retry_delay(&1, &server_error_response()),
127+
None
128+
);
129+
}
130+
131+
#[test]
132+
fn return_none_delay_for_too_many_requests_error_response() {
133+
assert_eq!(
134+
RequestRetryPolicy::None.retry_delay(&1, &too_many_requests_error_response()),
135+
None
136+
);
137+
}
138+
}
139+
140+
mod linear_policy {
141+
use super::*;
142+
143+
#[test]
144+
fn return_none_delay_for_client_error_response() {
145+
let policy = RequestRetryPolicy::Linear {
146+
delay: 10,
147+
max_retry: 5,
148+
};
149+
150+
assert_eq!(policy.retry_delay(&1, &client_error_response()), None);
151+
}
152+
153+
#[test]
154+
fn return_same_delay_for_server_error_response() {
155+
let expected_delay = 10;
156+
let policy = RequestRetryPolicy::Linear {
157+
delay: expected_delay,
158+
max_retry: 5,
159+
};
160+
161+
assert_eq!(
162+
policy.retry_delay(&1, &server_error_response()),
163+
Some(expected_delay)
164+
);
165+
166+
assert_eq!(
167+
policy.retry_delay(&2, &server_error_response()),
168+
Some(expected_delay)
169+
);
170+
}
171+
172+
#[test]
173+
fn return_none_delay_when_reach_max_retry_for_server_error_response() {
174+
let expected_delay = 10;
175+
let policy = RequestRetryPolicy::Linear {
176+
delay: expected_delay,
177+
max_retry: 2,
178+
};
179+
180+
assert_eq!(
181+
policy.retry_delay(&2, &server_error_response()),
182+
Some(expected_delay)
183+
);
184+
185+
assert_eq!(policy.retry_delay(&3, &server_error_response()), None);
186+
}
187+
188+
#[test]
189+
fn return_service_delay_for_too_many_requests_error_response() {
190+
let policy = RequestRetryPolicy::Linear {
191+
delay: 10,
192+
max_retry: 2,
193+
};
194+
195+
// 150 is from 'server_error_response' `Retry-After` header.
196+
assert_eq!(
197+
policy.retry_delay(&2, &too_many_requests_error_response()),
198+
Some(150)
199+
);
200+
}
201+
}
202+
203+
mod exponential_policy {
204+
use super::*;
205+
206+
#[test]
207+
fn return_none_delay_for_client_error_response() {
208+
let expected_delay = 8;
209+
let policy = RequestRetryPolicy::Exponential {
210+
min_delay: expected_delay,
211+
max_delay: 100,
212+
max_retry: 2,
213+
};
214+
215+
assert_eq!(policy.retry_delay(&1, &client_error_response()), None);
216+
}
217+
218+
#[test]
219+
fn return_exponential_delay_for_server_error_response() {
220+
let expected_delay = 8;
221+
let policy = RequestRetryPolicy::Exponential {
222+
min_delay: expected_delay,
223+
max_delay: 100,
224+
max_retry: 2,
225+
};
226+
227+
assert_eq!(
228+
policy.retry_delay(&1, &server_error_response()),
229+
Some(expected_delay)
230+
);
231+
232+
assert_eq!(
233+
policy.retry_delay(&2, &server_error_response()),
234+
Some(expected_delay.pow(2))
235+
);
236+
}
237+
238+
#[test]
239+
fn return_none_delay_when_reach_max_retry_for_server_error_response() {
240+
let expected_delay = 8;
241+
let policy = RequestRetryPolicy::Exponential {
242+
min_delay: expected_delay,
243+
max_delay: 100,
244+
max_retry: 2,
245+
};
246+
247+
assert_eq!(
248+
policy.retry_delay(&2, &server_error_response()),
249+
Some(expected_delay.pow(2))
250+
);
251+
252+
assert_eq!(policy.retry_delay(&3, &server_error_response()), None);
253+
}
254+
255+
#[test]
256+
fn return_max_delay_when_reach_max_value_for_server_error_response() {
257+
let expected_delay = 8;
258+
let max_delay = 50;
259+
let policy = RequestRetryPolicy::Exponential {
260+
min_delay: expected_delay,
261+
max_delay,
262+
max_retry: 5,
263+
};
264+
265+
assert_eq!(
266+
policy.retry_delay(&1, &server_error_response()),
267+
Some(expected_delay)
268+
);
269+
270+
assert_eq!(
271+
policy.retry_delay(&2, &server_error_response()),
272+
Some(max_delay)
273+
);
274+
}
275+
276+
#[test]
277+
fn return_service_delay_for_too_many_requests_error_response() {
278+
let policy = RequestRetryPolicy::Exponential {
279+
min_delay: 10,
280+
max_delay: 100,
281+
max_retry: 2,
282+
};
283+
284+
// 150 is from 'server_error_response' `Retry-After` header.
285+
assert_eq!(
286+
policy.retry_delay(&2, &too_many_requests_error_response()),
287+
Some(150)
288+
);
289+
}
290+
}
291+
}

src/transport/reqwest.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,15 @@ impl Transport for TransportReqwest {
8989
details: e.to_string(),
9090
})?;
9191

92+
let headers = result.headers().clone();
9293
let status = result.status();
9394
result
9495
.bytes()
9596
.await
9697
.map_err(|e| PubNubError::Transport {
9798
details: e.to_string(),
9899
})
99-
.and_then(|bytes| create_result(status, bytes))
100+
.and_then(|bytes| create_result(status, bytes, &headers))
100101
}
101102
}
102103

@@ -204,11 +205,25 @@ fn prepare_url(hostname: &str, path: &str, query_params: &HashMap<String, String
204205
qp
205206
}
206207

207-
fn create_result(status: StatusCode, body: Bytes) -> Result<TransportResponse, PubNubError> {
208+
fn create_result(
209+
status: StatusCode,
210+
body: Bytes,
211+
headers: &HeaderMap,
212+
) -> Result<TransportResponse, PubNubError> {
213+
let headers: HashMap<String, String> =
214+
headers
215+
.iter()
216+
.fold(HashMap::new(), |mut acc, (name, value)| {
217+
if let Ok(value) = value.to_str() {
218+
acc.insert(name.to_string(), value.to_string());
219+
}
220+
acc
221+
});
222+
208223
Ok(TransportResponse {
209224
status: status.as_u16(),
210225
body: (!body.is_empty()).then(|| body.to_vec()),
211-
..Default::default()
226+
headers,
212227
})
213228
}
214229

@@ -324,13 +339,14 @@ pub mod blocking {
324339
details: e.to_string(),
325340
})?;
326341

342+
let headers = result.headers().clone();
327343
let status = result.status();
328344
result
329345
.bytes()
330346
.map_err(|e| PubNubError::Transport {
331347
details: e.to_string(),
332348
})
333-
.and_then(|bytes| create_result(status, bytes))
349+
.and_then(|bytes| create_result(status, bytes, &headers))
334350
}
335351
}
336352

0 commit comments

Comments
 (0)