diff --git a/.cirrus.yml b/.cirrus.yml index 0148ad414d8..82fd7a7436d 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -6,6 +6,8 @@ env: RUST_STABLE: stable RUST_NIGHTLY: nightly-2025-01-25 RUSTFLAGS: -D warnings + # This excludes unstable features like io_uring, which require '--cfg tokio_unstable'. + TOKIO_STABLE_FEATURES: full,test-util # Test FreeBSD in a full VM on cirrus-ci.com. Test the i686 target too, in the # same VM. The binary will be built in 32-bit mode, but will execute on a @@ -23,7 +25,13 @@ task: rustc --version test_script: - . $HOME/.cargo/env - - cargo test --all --all-features + - cargo test --all --features $TOKIO_STABLE_FEATURES + # Free the disk space before the next build, + # otherwise cirrus-ci complains about "No space left on device". + - cargo clean + # Enable all unstable features, including io_uring, because it supports + # x86_64 FreeBSD. + - RUSTFLAGS="$RUSTFLAGS --cfg tokio_unstable" RUSTDOCFLAGS="$RUSTDOCFLAGS --cfg tokio_unstable" cargo test --all --all-features task: name: FreeBSD docs @@ -55,4 +63,4 @@ task: rustc --version test_script: - . $HOME/.cargo/env - - cargo test --all --all-features --target i686-unknown-freebsd + - cargo test --all --features $TOKIO_STABLE_FEATURES --target i686-unknown-freebsd diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 911600f92b9..8d028574ed1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,6 +29,9 @@ env: # - tokio-test/Cargo.toml # - tokio-stream/Cargo.toml rust_min: '1.70' + # This excludes unstable features like io_uring, + # which require '--cfg tokio_unstable'. + TOKIO_STABLE_FEATURES: "full,test-util" defaults: run: @@ -73,15 +76,13 @@ jobs: - uses: Swatinem/rust-cache@v2 - # Run `tokio` with `full` features. This excludes testing utilities which + # Run `tokio` with stable features. This excludes testing utilities which # can alter the runtime behavior of Tokio. - name: test tokio full run: | set -euxo pipefail - # We use `--features "full,test-util"` instead of `--all-features` since - # `--all-features` includes `io_uring`, which is not available on all targets. - cargo nextest run --features full,test-util - cargo test --doc --features full,test-util + cargo nextest run --features full + cargo test --doc --features full working-directory: tokio test-workspace-all-features: @@ -107,12 +108,11 @@ jobs: - uses: Swatinem/rust-cache@v2 - # Test **all** crates in the workspace with all features. - - name: test all --all-features + - name: test --features ${{ env.TOKIO_STABLE_FEATURES }} run: | set -euxo pipefail - cargo nextest run --workspace --all-features - cargo test --doc --workspace --all-features + cargo nextest run --workspace --features $TOKIO_STABLE_FEATURES + cargo test --doc --workspace --features $TOKIO_STABLE_FEATURES test-workspace-all-features-panic-abort: needs: basics @@ -137,10 +137,15 @@ jobs: - uses: Swatinem/rust-cache@v2 - - name: test all --all-features panic=abort + - name: test --features ${{ env.TOKIO_STABLE_FEATURES }} panic=abort run: | set -euxo pipefail - RUSTFLAGS="$RUSTFLAGS -C panic=abort -Zpanic-abort-tests" cargo nextest run --workspace --exclude tokio-macros --exclude tests-build --all-features --tests + RUSTFLAGS="$RUSTFLAGS -C panic=abort -Zpanic-abort-tests" cargo nextest run \ + --workspace \ + --exclude tokio-macros \ + --exclude tests-build \ + --features $TOKIO_STABLE_FEATURES \ + --tests test-integration-tests-per-feature: needs: basics @@ -204,8 +209,9 @@ jobs: run: sed -i '/\[features\]/a plsend = ["parking_lot/send_guard"]' tokio/Cargo.toml - uses: Swatinem/rust-cache@v2 - - name: Check tests with all features enabled - run: cargo check --workspace --all-features --tests + + - name: Check tests --features ${{ env.TOKIO_STABLE_FEATURES }} + run: cargo check --workspace --tests --features $TOKIO_STABLE_FEATURES valgrind: name: valgrind @@ -247,12 +253,11 @@ jobs: strategy: matrix: include: - # We use `--features "full,test-util"` instead of `--all-features` since - # `--all-features` includes `io_uring`, which is not available on all targets. - - { os: windows-latest, features: "full,test-util" } - - { os: ubuntu-latest, features: "full,test-util" } - - { os: ubuntu-latest, features: "full,test-util,io-uring" } - - { os: macos-latest, features: "full,test-util" } + - { os: windows-latest, extra_features: "" } + - { os: ubuntu-latest, extra_features: "" } + # only Linux supports io_uring + - { os: ubuntu-latest, extra_features: io-uring } + - { os: macos-latest, extra_features: "" } steps: - uses: actions/checkout@v5 - name: Install Rust ${{ env.rust_stable }} @@ -270,8 +275,8 @@ jobs: - name: test tokio full --cfg unstable run: | set -euxo pipefail - cargo nextest run --features ${{ matrix.features }} - cargo test --doc --features ${{ matrix.features }} + cargo nextest run --features $TOKIO_STABLE_FEATURES,${{ matrix.extra_features }} + cargo test --doc --features $TOKIO_STABLE_FEATURES,${{ matrix.extra_features }} working-directory: tokio env: RUSTFLAGS: --cfg tokio_unstable -Dwarnings @@ -304,8 +309,11 @@ jobs: - name: test tokio full --cfg unstable --cfg taskdump run: | set -euxo pipefail - cargo nextest run --all-features - cargo test --doc --all-features + # taskdump is an unstable feature, but it can only be enabled + # by --cfg tokio_taskdump, not by a feature flag, so we can + # use $TOKIO_STABLE_FEATURES here. + cargo nextest run --features $TOKIO_STABLE_FEATURES + cargo test --doc --features $TOKIO_STABLE_FEATURES working-directory: tokio env: RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings @@ -332,7 +340,8 @@ jobs: with: tool: cargo-nextest - uses: Swatinem/rust-cache@v2 - # Run `tokio` with "unstable" and "taskdump" cfg flags. + # Since the internal-mt-counters feature is only for debugging purposes, + # we can enable all features including unstable. - name: check tokio full --cfg unstable --cfg internal-mt-counters run: | set -euxo pipefail @@ -405,7 +414,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: miri-doc-test run: | - cargo miri test --doc --all-features --no-fail-fast + cargo miri test --doc --features $TOKIO_STABLE_FEATURES --no-fail-fast working-directory: tokio env: MIRIFLAGS: -Zmiri-disable-isolation -Zmiri-strict-provenance -Zmiri-retag-fields @@ -426,7 +435,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: asan - run: cargo test --workspace --all-features --target x86_64-unknown-linux-gnu --tests -- --test-threads 1 --nocapture + run: cargo test --workspace --features $TOKIO_STABLE_FEATURES --target x86_64-unknown-linux-gnu --tests -- --test-threads 1 --nocapture env: RUSTFLAGS: -Z sanitizer=address --cfg tokio_no_tuning_tests # Ignore `trybuild` errors as they are irrelevant and flaky on nightly @@ -444,6 +453,9 @@ jobs: rust-toolchain: ${{ env.rust_stable }} package: tokio release-type: minor + feature-group: only-explicit-features + # We don't care about the semver of unstable tokio features. + features: ${{ env.TOKIO_STABLE_FEATURES }} - name: Check semver for rest of the workspace if: ${{ !startsWith(github.event.pull_request.base.ref, 'tokio-1.') }} uses: obi1kenobi/cargo-semver-checks-action@v2 @@ -669,9 +681,9 @@ jobs: strategy: matrix: include: - - { name: "", rustflags: "" } - - { name: "--unstable", rustflags: "--cfg tokio_unstable -Dwarnings" } - - { name: "--unstable --taskdump", rustflags: "--cfg tokio_unstable -Dwarnings --cfg tokio_taskdump" } + - { name: "", rustflags: "", exclude_features: "io-uring" } + - { name: "--unstable", rustflags: "--cfg tokio_unstable -Dwarnings", exclude_features: "" } + - { name: "--unstable --taskdump", rustflags: "--cfg tokio_unstable -Dwarnings --cfg tokio_taskdump", exclude_features: "" } steps: - uses: actions/checkout@v5 - name: Install Rust ${{ env.rust_nightly }} @@ -684,7 +696,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: check --feature-powerset ${{ matrix.name }} - run: cargo hack check --all --feature-powerset --depth 2 --keep-going + run: cargo hack check --all --feature-powerset --exclude-features "${{ matrix.exclude_features }}" --depth 2 --keep-going env: RUSTFLAGS: ${{ matrix.rustflags }} @@ -698,17 +710,19 @@ jobs: with: toolchain: ${{ env.rust_min }} - uses: Swatinem/rust-cache@v2 - - name: "check --workspace --all-features" + - name: "cargo check" run: | if [[ "${{ github.event.pull_request.base.ref }}" =~ ^tokio-1\..* ]]; then # Only check `tokio` crate as the PR is backporting to an earlier tokio release. - cargo check -p tokio --all-features + + cargo check -p tokio --features $TOKIO_STABLE_FEATURES else # Check all crates in the workspace - cargo check --workspace --all-features + + cargo check -p tokio --features $TOKIO_STABLE_FEATURES + # Other crates doesn't have unstable features, so we can use --all-features. + cargo check -p tokio-macros -p tokio-stream -p tokio-util -p tokio-test --all-features fi - env: - RUSTFLAGS: "" # remove -Dwarnings minimal-versions: name: minimal-versions @@ -724,14 +738,15 @@ jobs: uses: taiki-e/install-action@cargo-hack - uses: Swatinem/rust-cache@v2 - - name: "check --all-features -Z minimal-versions" + - name: "check -Z minimal-versions" run: | # Remove dev-dependencies from Cargo.toml to prevent the next `cargo update` # from determining minimal versions based on dev-dependencies. cargo hack --remove-dev-deps --workspace # Update Cargo.lock to minimal version dependencies. cargo update -Z minimal-versions - cargo hack check --all-features --ignore-private + cargo hack check -p tokio --features $TOKIO_STABLE_FEATURES --ignore-private + cargo hack check -p tokio-macros -p tokio-stream -p tokio-util -p tokio-test --all-features --ignore-private - name: "check --all-features --unstable -Z minimal-versions" env: RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings @@ -766,11 +781,6 @@ jobs: clippy: name: clippy runs-on: ubuntu-latest - strategy: - matrix: - rustflags: - - "" - - "--cfg tokio_unstable --cfg tokio_taskdump -Dwarnings" steps: - uses: actions/checkout@v5 - name: Install Rust ${{ env.rust_clippy }} @@ -780,10 +790,12 @@ jobs: components: clippy - uses: Swatinem/rust-cache@v2 # Run clippy - - name: "clippy --all ${{ matrix.rustflags }}" - run: cargo clippy --all --tests --all-features --no-deps + - name: "clippy --all --features ${{ env.TOKIO_STABLE_FEATURES }}" + run: cargo clippy --all --tests --no-deps --features $TOKIO_STABLE_FEATURES + - name: "clippy --all --all-features --unstable" + run: cargo clippy --all --tests --no-deps --all-features env: - RUSTFLAGS: ${{ matrix.rustflags }} + RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings docs: name: docs @@ -970,7 +982,7 @@ jobs: toolchain: ${{ env.rust_nightly }} target: x86_64-unknown-redox - name: check tokio on redox - run: cargo check --target x86_64-unknown-redox --all-features + run: cargo check --target x86_64-unknown-redox --features $TOKIO_STABLE_FEATURES working-directory: tokio wasm32-unknown-unknown: @@ -1090,7 +1102,7 @@ jobs: with: tool: cargo-check-external-types@0.1.13 - name: check-external-types - run: cargo check-external-types --all-features + run: cargo check-external-types --features $TOKIO_STABLE_FEATURES working-directory: tokio check-fuzzing: diff --git a/netlify.toml b/netlify.toml index e6e7343de24..c36d26b0864 100644 --- a/netlify.toml +++ b/netlify.toml @@ -1,6 +1,15 @@ [build] + # TODO: unfreeze toolchain + # error[E0557]: feature has been removed + # --> /opt/buildhome/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/lock_api-0.4.13/src/lib.rs:89:29 + # | + # 89 | #![cfg_attr(docsrs, feature(doc_auto_cfg))] + # | ^^^^^^^^^^^^ feature has been removed + # | + # = note: removed in 1.58.0; see TokenStream { /// panic!("This panic will shutdown the runtime."); /// }).await; /// } -/// # #[cfg(not(tokio_unstable))] +/// /// # fn main() { } /// ``` /// @@ -560,7 +560,7 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { /// }).await; /// }) /// } -/// # #[cfg(not(tokio_unstable))] +/// /// # fn main() { } /// ``` /// diff --git a/tokio-util/src/codec/mod.rs b/tokio-util/src/codec/mod.rs index a03c0b943d4..43ed38c9270 100644 --- a/tokio-util/src/codec/mod.rs +++ b/tokio-util/src/codec/mod.rs @@ -313,7 +313,7 @@ //! [`AsyncWrite`]: tokio::io::AsyncWrite //! [`Stream`]: futures_core::Stream //! [`Sink`]: futures_sink::Sink -//! [`SinkExt`]: futures::sink::SinkExt +//! [`SinkExt`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close //! [`FramedRead`]: struct@crate::codec::FramedRead //! [`FramedWrite`]: struct@crate::codec::FramedWrite diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index 3a8ac2bdd27..ae5d7e5368e 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -2,6 +2,8 @@ use mockall::mock; use crate::sync::oneshot; +#[cfg(all(test, unix))] +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::{ cell::RefCell, collections::VecDeque, @@ -96,6 +98,14 @@ impl Write for &'_ MockFile { } } +#[cfg(all(test, unix))] +impl From for OwnedFd { + #[inline] + fn from(file: MockFile) -> OwnedFd { + unsafe { OwnedFd::from_raw_fd(file.as_raw_fd()) } + } +} + tokio_thread_local! { static QUEUE: RefCell>> = RefCell::new(VecDeque::new()) } diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index f5d18e84366..543f97fd40a 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -1,4 +1,4 @@ -use crate::fs::asyncify; +use crate::{fs::asyncify, util::as_ref::OwnedBuf}; use std::{io, path::Path}; @@ -24,8 +24,69 @@ use std::{io, path::Path}; /// # } /// ``` pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Result<()> { - let path = path.as_ref().to_owned(); + let path = path.as_ref(); let contents = crate::util::as_ref::upgrade(contents); + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle.check_and_init()? { + return write_uring(path, contents).await; + } + } + + write_spawn_blocking(path, contents).await +} + +#[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] +async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { + use crate::{fs::OpenOptions, runtime::driver::op::Op}; + use std::os::fd::OwnedFd; + + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await?; + + let mut fd: OwnedFd = file + .try_into_std() + .expect("unexpected in-flight operation detected") + .into(); + + let total: usize = buf.as_ref().len(); + let mut buf_offset: usize = 0; + let mut file_offset: u64 = 0; + while buf_offset < total { + let (n, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await?; + if n == 0 { + return Err(io::ErrorKind::WriteZero.into()); + } + + buf = _buf; + fd = _fd; + buf_offset += n as usize; + file_offset += n as u64; + } + + Ok(()) +} + +async fn write_spawn_blocking(path: &Path, contents: OwnedBuf) -> io::Result<()> { + let path = path.to_owned(); asyncify(move || std::fs::write(path, contents)).await } diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs index e5ac85af604..4899d0a4a0b 100644 --- a/tokio/src/io/uring/mod.rs +++ b/tokio/src/io/uring/mod.rs @@ -1,2 +1,3 @@ pub(crate) mod open; pub(crate) mod utils; +pub(crate) mod write; diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs new file mode 100644 index 00000000000..9332bd0071a --- /dev/null +++ b/tokio/src/io/uring/write.rs @@ -0,0 +1,54 @@ +use crate::{ + runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}, + util::as_ref::OwnedBuf, +}; +use io_uring::{opcode, types}; +use std::{ + io, + os::fd::{AsRawFd, OwnedFd}, +}; + +#[derive(Debug)] +pub(crate) struct Write { + buf: OwnedBuf, + fd: OwnedFd, +} + +impl Completable for Write { + type Output = (u32, OwnedBuf, OwnedFd); + fn complete(self, cqe: CqeResult) -> io::Result { + Ok((cqe.result?, self.buf, self.fd)) + } +} + +impl Cancellable for Write { + fn cancel(self) -> CancelData { + CancelData::Write(self) + } +} + +impl Op { + /// Issue a write that starts at `buf_offset` within `buf` and writes some bytes + /// into `file` at `file_offset`. + pub(crate) fn write_at( + fd: OwnedFd, + buf: OwnedBuf, + buf_offset: usize, + file_offset: u64, + ) -> io::Result { + // There is a cap on how many bytes we can write in a single uring write operation. + // ref: https://github.com/axboe/liburing/discussions/497 + let len = u32::try_from(buf.as_ref().len() - buf_offset).unwrap_or(u32::MAX); + + let ptr = buf.as_ref()[buf_offset..buf_offset + len as usize].as_ptr(); + + let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), ptr, len) + .offset(file_offset) + .build(); + + // SAFETY: parameters of the entry, such as `fd` and `buf`, are valid + // until this operation completes. + let op = unsafe { Op::new(sqe, Write { buf, fd }) }; + Ok(op) + } +} diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 33ee9eb87f2..9ee0ca94234 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -480,6 +480,9 @@ compile_error! { ))] compile_error!("Only features sync,macros,io-util,rt,time are supported on wasm."); +#[cfg(all(not(tokio_unstable), feature = "io-uring"))] +compile_error!("The `io-uring` feature requires `--cfg tokio_unstable`."); + #[cfg(all(not(tokio_unstable), tokio_taskdump))] compile_error!("The `tokio_taskdump` feature requires `--cfg tokio_unstable`."); diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 94afe163a13..413dd4a83f4 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -1,4 +1,5 @@ use crate::io::uring::open::Open; +use crate::io::uring::write::Write; use crate::runtime::Handle; use io_uring::cqueue; use io_uring::squeue::Entry; @@ -9,13 +10,13 @@ use std::task::Poll; use std::task::Waker; use std::{io, mem}; +// This field isn't accessed directly, but it holds cancellation data, +// so `#[allow(dead_code)]` is needed. +#[allow(dead_code)] #[derive(Debug)] pub(crate) enum CancelData { - Open( - // This field isn't accessed directly, but it holds cancellation data, - // so `#[allow(dead_code)]` is needed. - #[allow(dead_code)] Open, - ), + Open(Open), + Write(Write), } #[derive(Debug)] diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 3a74dda2d73..a70e76aa29c 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -177,7 +177,6 @@ //! [`tokio::main`]: ../attr.main.html //! [runtime builder]: crate::runtime::Builder //! [`Runtime::new`]: crate::runtime::Runtime::new -//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler //! [`Builder::enable_io`]: crate::runtime::Builder::enable_io //! [`Builder::enable_time`]: crate::runtime::Builder::enable_time //! [`Builder::enable_all`]: crate::runtime::Builder::enable_all diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 8a1cb5af55e..05dad6393d9 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1170,7 +1170,7 @@ impl Receiver { /// Returns true if there aren't any messages in the channel that the [`Receiver`] /// has yet to receive. /// - /// [`Receiver]: create::sync::broadcast::Receiver + /// [`Receiver`]: crate::sync::broadcast::Receiver /// /// # Examples /// diff --git a/tokio/tests/fs_write.rs b/tokio/tests/fs_write.rs new file mode 100644 index 00000000000..a125e040875 --- /dev/null +++ b/tokio/tests/fs_write.rs @@ -0,0 +1,16 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", not(target_os = "wasi")))] // WASI does not support all fs operations + +use tempfile::tempdir; +use tokio::fs; + +#[tokio::test] +async fn write() { + let dir = tempdir().unwrap(); + let path = dir.path().join("test.txt"); + + fs::write(&path, "Hello, World!").await.unwrap(); + + let contents = fs::read_to_string(&path).await.unwrap(); + assert_eq!(contents, "Hello, World!"); +}