Skip to content

Commit a1a9ac0

Browse files
committed
tokio-test: add tokio_test::sink_mock
1 parent 20c1fdc commit a1a9ac0

File tree

4 files changed

+294
-10
lines changed

4 files changed

+294
-10
lines changed

tokio-test/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ async-stream = "0.3.3"
2323

2424
bytes = "1.0.0"
2525
futures-core = "0.3.0"
26+
futures-sink = "0.3.0"
2627

2728
[dev-dependencies]
2829
tokio = { version = "1.2.0", path = "../tokio", features = ["full"] }
29-
futures-util = "0.3.0"
30+
futures-util = { version = "0.3.0", features = ["sink"] }
3031

3132
[package.metadata.docs.rs]
3233
all-features = true

tokio-test/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//! Tokio and Futures based testing utilities
1313
1414
pub mod io;
15+
pub mod sink_mock;
1516
pub mod stream_mock;
1617

1718
mod macros;

tokio-test/src/sink_mock.rs

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
#![cfg(not(loom))]
2+
3+
//! A mock sink implementing [`Sink`].
4+
//!
5+
//! # Overview
6+
//! This module provides a `SinkMock` that can be used to test code that interacts with sinks.
7+
//! It allows you to mock the behavior of a sink and control the items it expects and the waiting
8+
//! intervals required between items.
9+
//!
10+
//! # Usage
11+
//! To use the `SinkMock`, you need to create a builder using [`SinkMockBuilder`].
12+
//! The builder allows you to enqueue actions such as
13+
//! requiring items or requiring a pause between items.
14+
//!
15+
//! # Example
16+
//!
17+
//! ```rust
18+
//! use tokio_test::sink_mock::SinkMockBuilder;
19+
//! use futures_util::SinkExt;
20+
//! use std::time::Duration;
21+
//!
22+
//! async fn test_sink_mock_wait() {
23+
//! let mut sink_mock = SinkMockBuilder::new()
24+
//! .require(1)
25+
//! .require_wait(Duration::from_millis(300))
26+
//! .require(2)
27+
//! .build();
28+
//!
29+
//! assert_eq!(sink_mock.send(1).await, Ok(()));
30+
//! tokio::time::sleep(Duration::from_millis(300)).await;
31+
//! assert_eq!(sink_mock.send(2).await, Ok(()));
32+
//! }
33+
//! ```
34+
35+
use std::{
36+
collections::VecDeque,
37+
pin::Pin,
38+
task::Poll,
39+
time::{Duration, Instant},
40+
};
41+
42+
use futures_sink::Sink;
43+
44+
#[derive(Debug, Clone, PartialEq, Eq)]
45+
enum Action<T, E> {
46+
Consume(T),
47+
ConsumeWithError(T, E),
48+
Pause(Duration),
49+
}
50+
51+
/// A builder for [`SinkMock`].
52+
#[derive(Debug, Clone)]
53+
pub struct SinkMockBuilder<T, E> {
54+
actions: VecDeque<Action<T, E>>,
55+
}
56+
57+
impl<T: Unpin, E: Unpin> SinkMockBuilder<T, E> {
58+
/// Create a new empty [`SinkMockBuilder`].
59+
pub fn new() -> Self {
60+
SinkMockBuilder::default()
61+
}
62+
63+
/// Queue an item to be required by the [`Sink`].
64+
pub fn require(mut self, value: T) -> Self {
65+
self.actions.push_back(Action::Consume(value));
66+
self
67+
}
68+
69+
/// Queue an item to be required by the [`Sink`],
70+
/// which shall produce the given error when polled.
71+
pub fn require_with_error(mut self, value: T, error: E) -> Self {
72+
let action = Action::ConsumeWithError(value, error);
73+
self.actions.push_back(action);
74+
self
75+
}
76+
77+
/// Queue the sink to require waiting for a while before receiving another value.
78+
pub fn require_wait(mut self, duration: Duration) -> Self {
79+
self.actions.push_back(Action::Pause(duration));
80+
self
81+
}
82+
83+
/// Build the [`SinkMock`].
84+
pub fn build(self) -> SinkMock<T, E> {
85+
SinkMock {
86+
actions: self.actions,
87+
last_action: Instant::now(),
88+
}
89+
}
90+
}
91+
92+
impl<T: Unpin, E: Unpin> Default for SinkMockBuilder<T, E> {
93+
fn default() -> Self {
94+
SinkMockBuilder {
95+
actions: VecDeque::new(),
96+
}
97+
}
98+
}
99+
100+
/// A mock sink implementing [`Sink`].
101+
///
102+
/// See [`SinkMockBuilder`] for more information.
103+
#[derive(Debug)]
104+
pub struct SinkMock<T, E> {
105+
actions: VecDeque<Action<T, E>>,
106+
last_action: Instant,
107+
}
108+
109+
impl<T: Unpin + Eq + std::fmt::Debug, E: Unpin> Sink<T> for SinkMock<T, E> {
110+
type Error = E;
111+
112+
fn poll_ready(
113+
self: Pin<&mut Self>,
114+
_cx: &mut std::task::Context<'_>,
115+
) -> std::task::Poll<Result<(), Self::Error>> {
116+
Poll::Ready(Ok(()))
117+
}
118+
119+
// Requires `Eq + std::fmt::Debug` due to usage of `assert_eq!`.
120+
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
121+
loop {
122+
let Some(action) = self.actions.pop_front() else {
123+
panic!("Sink does not expect any items");
124+
};
125+
match action {
126+
Action::Pause(duration) => {
127+
let now = Instant::now();
128+
if (self.last_action + duration) <= now {
129+
self.last_action = now;
130+
continue;
131+
} else {
132+
panic!("Sink received item too early");
133+
}
134+
}
135+
Action::Consume(queued_item) => {
136+
assert_eq!(item, queued_item);
137+
self.last_action = Instant::now();
138+
break Ok(());
139+
}
140+
Action::ConsumeWithError(queued_item, queued_error) => {
141+
assert_eq!(item, queued_item);
142+
self.last_action = Instant::now();
143+
break Err(queued_error);
144+
}
145+
}
146+
}
147+
}
148+
149+
fn poll_flush(
150+
self: Pin<&mut Self>,
151+
_cx: &mut std::task::Context<'_>,
152+
) -> std::task::Poll<Result<(), Self::Error>> {
153+
Poll::Ready(Ok(()))
154+
}
155+
156+
fn poll_close(
157+
mut self: Pin<&mut Self>,
158+
_cx: &mut std::task::Context<'_>,
159+
) -> std::task::Poll<Result<(), Self::Error>> {
160+
self.try_close();
161+
Poll::Ready(Ok(()))
162+
}
163+
}
164+
165+
impl<T, E> Drop for SinkMock<T, E> {
166+
fn drop(&mut self) {
167+
// Avoid double panicking to make debugging easier.
168+
if std::thread::panicking() {
169+
return;
170+
}
171+
self.try_close();
172+
}
173+
}
174+
175+
impl<T, E> SinkMock<T, E> {
176+
fn try_close(&mut self) {
177+
loop {
178+
let Some(action) = self.actions.pop_front() else {
179+
break;
180+
};
181+
match action {
182+
Action::Pause(duration) => {
183+
let now = Instant::now();
184+
if (self.last_action + duration) <= now {
185+
self.last_action += duration;
186+
continue;
187+
} else {
188+
panic!("Sink closed too early");
189+
}
190+
}
191+
Action::Consume(..) | Action::ConsumeWithError(..) => {
192+
panic!("Sink expects more items")
193+
}
194+
}
195+
}
196+
}
197+
}
198+
199+
#[cfg(test)]
200+
mod test {
201+
202+
use crate::sink_mock::{SinkMock, SinkMockBuilder};
203+
use futures_util::SinkExt;
204+
use std::time::Duration;
205+
206+
#[test]
207+
#[should_panic(expected = "Sink expects more items")]
208+
fn dropping_nonempty_sink_panics() {
209+
let sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new().require(1).build();
210+
drop(sink_mock);
211+
}
212+
213+
#[tokio::test]
214+
#[should_panic(expected = "Sink does not expect any items")]
215+
async fn empty_sink_panics_on_send() {
216+
let mut sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new().build();
217+
let _ = sink_mock.send(1).await;
218+
}
219+
220+
#[tokio::test]
221+
#[should_panic(expected = "Sink received item too early")]
222+
async fn should_reject_values_when_sent_too_early() {
223+
let mut sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new()
224+
.require_wait(Duration::from_secs(1))
225+
.build();
226+
227+
sink_mock.send(1).await.unwrap();
228+
}
229+
230+
#[test]
231+
#[should_panic(expected = "Sink closed too early")]
232+
fn paused_sink_panics_on_drop() {
233+
let sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new()
234+
.require_wait(Duration::from_secs(1))
235+
.build();
236+
237+
drop(sink_mock);
238+
}
239+
240+
#[tokio::test]
241+
#[should_panic(expected = "Sink closed too early")]
242+
async fn paused_sink_panics_on_close() {
243+
let mut sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new()
244+
.require_wait(Duration::from_secs(1))
245+
.build();
246+
247+
sink_mock.close().await.unwrap();
248+
}
249+
250+
#[tokio::test]
251+
async fn should_yield_error() {
252+
let mut sink_mock = SinkMockBuilder::new()
253+
.require_with_error(1, "oh no")
254+
.require_with_error(2, "well...")
255+
.require_wait(Duration::from_millis(500))
256+
.require_with_error(3, "ok.")
257+
.build();
258+
259+
assert_eq!(sink_mock.send(1).await.unwrap_err(), "oh no");
260+
assert_eq!(sink_mock.send(2).await.unwrap_err(), "well...");
261+
tokio::time::sleep(Duration::from_millis(500)).await;
262+
assert_eq!(sink_mock.send(3).await.unwrap_err(), "ok.");
263+
}
264+
265+
#[tokio::test]
266+
async fn should_sum_pause_durations() {
267+
let mut sink_mock = SinkMockBuilder::<i32, ()>::new()
268+
.require_wait(Duration::from_millis(1))
269+
.require_wait(Duration::from_millis(1))
270+
.require_wait(Duration::from_millis(1))
271+
.build();
272+
273+
tokio::time::sleep(Duration::from_millis(3)).await;
274+
275+
let _ = sink_mock.close().await;
276+
}
277+
278+
#[tokio::test]
279+
async fn should_require_value_after_waiting() {
280+
let mut sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new()
281+
.require(1)
282+
.require_wait(Duration::from_millis(300))
283+
.require(3)
284+
.build();
285+
286+
sink_mock.send(1).await.unwrap();
287+
tokio::time::sleep(Duration::from_millis(300)).await;
288+
sink_mock.send(3).await.unwrap();
289+
}
290+
}

tokio-test/src/stream_mock.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! A mock stream implementing [`Stream`].
44
//!
55
//! # Overview
6-
//! This crate provides a `StreamMock` that can be used to test code that interacts with streams.
6+
//! This module provides a `StreamMock` that can be used to test code that interacts with streams.
77
//! It allows you to mock the behavior of a stream and control the items it yields and the waiting
88
//! intervals between items.
99
//!
@@ -67,14 +67,6 @@ impl<T: Unpin> StreamMockBuilder<T> {
6767
self
6868
}
6969

70-
// Queue an item to be consumed by the sink,
71-
// commented out until Sink is implemented.
72-
//
73-
// pub fn consume(mut self, value: T) -> Self {
74-
// self.actions.push_back(Action::Consume(value));
75-
// self
76-
// }
77-
7870
/// Queue the stream to wait for a duration
7971
pub fn wait(mut self, duration: Duration) -> Self {
8072
self.actions.push_back(Action::Wait(duration));

0 commit comments

Comments
 (0)