From 131e178634fbd7425ff51d7f8e33455b04b7a4d3 Mon Sep 17 00:00:00 2001 From: Rafael Bachmann Date: Sun, 2 Mar 2025 18:47:56 +0100 Subject: [PATCH 1/3] tokio-test: add tokio_test::sink_mock --- tokio-test/Cargo.toml | 3 +- tokio-test/src/lib.rs | 1 + tokio-test/src/sink_mock.rs | 290 ++++++++++++++++++++++++++++++++++ tokio-test/src/stream_mock.rs | 10 +- 4 files changed, 294 insertions(+), 10 deletions(-) create mode 100644 tokio-test/src/sink_mock.rs diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index c8d998fd368..f2293da979b 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -23,10 +23,11 @@ async-stream = "0.3.3" bytes = "1.0.0" futures-core = "0.3.0" +futures-sink = "0.3.0" [dev-dependencies] tokio = { version = "1.2.0", path = "../tokio", features = ["full"] } -futures-util = "0.3.0" +futures-util = { version = "0.3.0", features = ["sink"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index 87e63861210..1b4c6fad17a 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -12,6 +12,7 @@ //! Tokio and Futures based testing utilities pub mod io; +pub mod sink_mock; pub mod stream_mock; mod macros; diff --git a/tokio-test/src/sink_mock.rs b/tokio-test/src/sink_mock.rs new file mode 100644 index 00000000000..bf7377510e4 --- /dev/null +++ b/tokio-test/src/sink_mock.rs @@ -0,0 +1,290 @@ +#![cfg(not(loom))] + +//! A mock sink implementing [`Sink`]. +//! +//! # Overview +//! This module provides a `SinkMock` that can be used to test code that interacts with sinks. +//! It allows you to mock the behavior of a sink and control the items it expects and the waiting +//! intervals required between items. +//! +//! # Usage +//! To use the `SinkMock`, you need to create a builder using [`SinkMockBuilder`]. +//! The builder allows you to enqueue actions such as +//! requiring items or requiring a pause between items. +//! +//! # Example +//! +//! ```rust +//! use tokio_test::sink_mock::SinkMockBuilder; +//! use futures_util::SinkExt; +//! use std::time::Duration; +//! +//! async fn test_sink_mock_wait() { +//! let mut sink_mock = SinkMockBuilder::new() +//! .require(1) +//! .require_wait(Duration::from_millis(300)) +//! .require(2) +//! .build(); +//! +//! assert_eq!(sink_mock.send(1).await, Ok(())); +//! tokio::time::sleep(Duration::from_millis(300)).await; +//! assert_eq!(sink_mock.send(2).await, Ok(())); +//! } +//! ``` + +use std::{ + collections::VecDeque, + pin::Pin, + task::Poll, + time::{Duration, Instant}, +}; + +use futures_sink::Sink; + +#[derive(Debug, Clone, PartialEq, Eq)] +enum Action { + Consume(T), + ConsumeWithError(T, E), + Pause(Duration), +} + +/// A builder for [`SinkMock`]. +#[derive(Debug, Clone)] +pub struct SinkMockBuilder { + actions: VecDeque>, +} + +impl SinkMockBuilder { + /// Create a new empty [`SinkMockBuilder`]. + pub fn new() -> Self { + SinkMockBuilder::default() + } + + /// Queue an item to be required by the [`Sink`]. + pub fn require(mut self, value: T) -> Self { + self.actions.push_back(Action::Consume(value)); + self + } + + /// Queue an item to be required by the [`Sink`], + /// which shall produce the given error when polled. + pub fn require_with_error(mut self, value: T, error: E) -> Self { + let action = Action::ConsumeWithError(value, error); + self.actions.push_back(action); + self + } + + /// Queue the sink to require waiting for a while before receiving another value. + pub fn require_wait(mut self, duration: Duration) -> Self { + self.actions.push_back(Action::Pause(duration)); + self + } + + /// Build the [`SinkMock`]. + pub fn build(self) -> SinkMock { + SinkMock { + actions: self.actions, + last_action: Instant::now(), + } + } +} + +impl Default for SinkMockBuilder { + fn default() -> Self { + SinkMockBuilder { + actions: VecDeque::new(), + } + } +} + +/// A mock sink implementing [`Sink`]. +/// +/// See [`SinkMockBuilder`] for more information. +#[derive(Debug)] +pub struct SinkMock { + actions: VecDeque>, + last_action: Instant, +} + +impl Sink for SinkMock { + type Error = E; + + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Poll::Ready(Ok(())) + } + + // Requires `Eq + std::fmt::Debug` due to usage of `assert_eq!`. + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + loop { + let Some(action) = self.actions.pop_front() else { + panic!("Sink does not expect any items"); + }; + match action { + Action::Pause(duration) => { + let now = Instant::now(); + if (self.last_action + duration) <= now { + self.last_action = now; + continue; + } else { + panic!("Sink received item too early"); + } + } + Action::Consume(queued_item) => { + assert_eq!(item, queued_item); + self.last_action = Instant::now(); + break Ok(()); + } + Action::ConsumeWithError(queued_item, queued_error) => { + assert_eq!(item, queued_item); + self.last_action = Instant::now(); + break Err(queued_error); + } + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close( + mut self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.try_close(); + Poll::Ready(Ok(())) + } +} + +impl Drop for SinkMock { + fn drop(&mut self) { + // Avoid double panicking to make debugging easier. + if std::thread::panicking() { + return; + } + self.try_close(); + } +} + +impl SinkMock { + fn try_close(&mut self) { + loop { + let Some(action) = self.actions.pop_front() else { + break; + }; + match action { + Action::Pause(duration) => { + let now = Instant::now(); + if (self.last_action + duration) <= now { + self.last_action += duration; + continue; + } else { + panic!("Sink closed too early"); + } + } + Action::Consume(..) | Action::ConsumeWithError(..) => { + panic!("Sink expects more items") + } + } + } + } +} + +#[cfg(test)] +mod test { + + use crate::sink_mock::{SinkMock, SinkMockBuilder}; + use futures_util::SinkExt; + use std::time::Duration; + + #[test] + #[should_panic(expected = "Sink expects more items")] + fn dropping_nonempty_sink_panics() { + let sink_mock: SinkMock = SinkMockBuilder::new().require(1).build(); + drop(sink_mock); + } + + #[tokio::test] + #[should_panic(expected = "Sink does not expect any items")] + async fn empty_sink_panics_on_send() { + let mut sink_mock: SinkMock = SinkMockBuilder::new().build(); + let _ = sink_mock.send(1).await; + } + + #[tokio::test] + #[should_panic(expected = "Sink received item too early")] + async fn should_reject_values_when_sent_too_early() { + let mut sink_mock: SinkMock = SinkMockBuilder::new() + .require_wait(Duration::from_secs(1)) + .build(); + + sink_mock.send(1).await.unwrap(); + } + + #[test] + #[should_panic(expected = "Sink closed too early")] + fn paused_sink_panics_on_drop() { + let sink_mock: SinkMock = SinkMockBuilder::new() + .require_wait(Duration::from_secs(1)) + .build(); + + drop(sink_mock); + } + + #[tokio::test] + #[should_panic(expected = "Sink closed too early")] + async fn paused_sink_panics_on_close() { + let mut sink_mock: SinkMock = SinkMockBuilder::new() + .require_wait(Duration::from_secs(1)) + .build(); + + sink_mock.close().await.unwrap(); + } + + #[tokio::test] + async fn should_yield_error() { + let mut sink_mock = SinkMockBuilder::new() + .require_with_error(1, "oh no") + .require_with_error(2, "well...") + .require_wait(Duration::from_millis(500)) + .require_with_error(3, "ok.") + .build(); + + assert_eq!(sink_mock.send(1).await.unwrap_err(), "oh no"); + assert_eq!(sink_mock.send(2).await.unwrap_err(), "well..."); + tokio::time::sleep(Duration::from_millis(500)).await; + assert_eq!(sink_mock.send(3).await.unwrap_err(), "ok."); + } + + #[tokio::test] + async fn should_sum_pause_durations() { + let mut sink_mock = SinkMockBuilder::::new() + .require_wait(Duration::from_millis(1)) + .require_wait(Duration::from_millis(1)) + .require_wait(Duration::from_millis(1)) + .build(); + + tokio::time::sleep(Duration::from_millis(3)).await; + + let _ = sink_mock.close().await; + } + + #[tokio::test] + async fn should_require_value_after_waiting() { + let mut sink_mock: SinkMock = SinkMockBuilder::new() + .require(1) + .require_wait(Duration::from_millis(300)) + .require(3) + .build(); + + sink_mock.send(1).await.unwrap(); + tokio::time::sleep(Duration::from_millis(300)).await; + sink_mock.send(3).await.unwrap(); + } +} diff --git a/tokio-test/src/stream_mock.rs b/tokio-test/src/stream_mock.rs index 50808596ac4..3a20670903b 100644 --- a/tokio-test/src/stream_mock.rs +++ b/tokio-test/src/stream_mock.rs @@ -3,7 +3,7 @@ //! A mock stream implementing [`Stream`]. //! //! # Overview -//! This crate provides a `StreamMock` that can be used to test code that interacts with streams. +//! This module provides a `StreamMock` that can be used to test code that interacts with streams. //! It allows you to mock the behavior of a stream and control the items it yields and the waiting //! intervals between items. //! @@ -67,14 +67,6 @@ impl StreamMockBuilder { self } - // Queue an item to be consumed by the sink, - // commented out until Sink is implemented. - // - // pub fn consume(mut self, value: T) -> Self { - // self.actions.push_back(Action::Consume(value)); - // self - // } - /// Queue the stream to wait for a duration pub fn wait(mut self, duration: Duration) -> Self { self.actions.push_back(Action::Wait(duration)); From dbd9ec6d84bf27bed921c377a98226188e114c4a Mon Sep 17 00:00:00 2001 From: Rafael Bachmann Date: Sun, 2 Mar 2025 18:48:11 +0100 Subject: [PATCH 2/3] sync: fix doc link typo cr|e|ate --- tokio/src/sync/broadcast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 09a99022d9e..d879738c41d 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1173,7 +1173,7 @@ impl Receiver { /// Returns true if there aren't any messages in the channel that the [`Receiver`] /// has yet to receive. /// - /// [`Receiver]: create::sync::broadcast::Receiver + /// [`Receiver]: crate::sync::broadcast::Receiver /// /// # Examples /// From ef1d25f8f188ca6a7ae7dc3a8967230ac9826710 Mon Sep 17 00:00:00 2001 From: Rafael Bachmann Date: Sun, 2 Mar 2025 20:27:40 +0100 Subject: [PATCH 3/3] docs: clarify commit message requirements --- CONTRIBUTING.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 69efa24a95f..bc1b1538e02 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -397,14 +397,14 @@ A good commit message should describe what changed and why. 1. The first line should: + * be prefixed with the name of the module being changed; usually this is the + same as the M-* label on the PR + * start with an imperative verb * contain a short description of the change (preferably 50 characters or less, and no more than 72 characters) * be entirely in lowercase with the exception of proper nouns, acronyms, and the words that refer to code, like function/variable names - * start with an imperative verb * not have a period at the end - * be prefixed with the name of the module being changed; usually this is the - same as the M-* label on the PR Examples: