From 94e600d8eeffca656cb0836564217d26f21ded18 Mon Sep 17 00:00:00 2001 From: Motoyuki Kimura Date: Mon, 1 Sep 2025 23:33:48 +0900 Subject: [PATCH 01/14] fs: support io_uring in `fs::write` --- tokio/src/fs/write.rs | 43 ++++++++++++++++++++++++++++++++++ tokio/src/io/uring/mod.rs | 1 + tokio/src/io/uring/write.rs | 42 +++++++++++++++++++++++++++++++++ tokio/src/runtime/driver/op.rs | 2 ++ tokio/tests/fs_write.rs | 16 +++++++++++++ 5 files changed, 104 insertions(+) create mode 100644 tokio/src/io/uring/write.rs create mode 100644 tokio/tests/fs_write.rs diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index f5d18e84366..13ed9fa5376 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -24,6 +24,49 @@ use std::{io, path::Path}; /// # } /// ``` pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Result<()> { + #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))] + { + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle.check_and_init()? { + return write_uring(path, contents.as_ref()).await; + } + } + + write_spawn_blocking(path, contents).await +} + +#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))] +async fn write_uring(path: impl AsRef, mut buf: &[u8]) -> io::Result<()> { + use crate::{fs::OpenOptions, runtime::driver::op::Op}; + use std::os::fd::AsFd; + + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path.as_ref()) + .await?; + + let mut pos = 0; + let fd = file.as_fd(); + while !buf.is_empty() { + let n = Op::write_at(fd, buf, pos)?.await? as usize; + if n == 0 { + return Err(io::ErrorKind::WriteZero.into()); + } + buf = &buf[n..]; + pos += n as u64; + } + + Ok(()) +} + +async fn write_spawn_blocking(path: P, contents: C) -> io::Result<()> +where + P: AsRef, + C: AsRef<[u8]>, +{ let path = path.as_ref().to_owned(); let contents = crate::util::as_ref::upgrade(contents); diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs index e5ac85af604..4899d0a4a0b 100644 --- a/tokio/src/io/uring/mod.rs +++ b/tokio/src/io/uring/mod.rs @@ -1,2 +1,3 @@ pub(crate) mod open; pub(crate) mod utils; +pub(crate) mod write; diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs new file mode 100644 index 00000000000..df3badb9bb9 --- /dev/null +++ b/tokio/src/io/uring/write.rs @@ -0,0 +1,42 @@ +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use io_uring::{opcode, types}; +use std::{ + cmp, io, + os::fd::{AsRawFd, BorrowedFd}, +}; + +#[derive(Debug)] +pub(crate) struct Write; + +impl Completable for Write { + // The number of bytes written. + type Output = u32; + + fn complete(self, cqe: CqeResult) -> io::Result { + cqe.result + } +} + +impl Cancellable for Write { + fn cancel(self) -> CancelData { + CancelData::Write(self) + } +} + +impl Op { + pub(crate) fn write_at(fd: BorrowedFd<'_>, buf: &[u8], offset: u64) -> io::Result { + // There is a cap on how many bytes we can write in a single uring write operation. + // ref: https://github.com/axboe/liburing/discussions/497 + let len: u32 = cmp::min(buf.len(), u32::MAX as usize) as u32; + + let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), buf.as_ptr(), len) + .offset(offset) + .build(); + + // SAFETY: `fd` and `buf` is valid until this future completes. + // If this operation is cancelled, drop of `Op` occurs before dropping + // `fd` or `buf`, ensuring the safety contract holds. + let op = unsafe { Op::new(sqe, Write) }; + Ok(op) + } +} diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 94afe163a13..81ad0493cb4 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -1,4 +1,5 @@ use crate::io::uring::open::Open; +use crate::io::uring::write::Write; use crate::runtime::Handle; use io_uring::cqueue; use io_uring::squeue::Entry; @@ -16,6 +17,7 @@ pub(crate) enum CancelData { // so `#[allow(dead_code)]` is needed. #[allow(dead_code)] Open, ), + Write(Write), } #[derive(Debug)] diff --git a/tokio/tests/fs_write.rs b/tokio/tests/fs_write.rs new file mode 100644 index 00000000000..a125e040875 --- /dev/null +++ b/tokio/tests/fs_write.rs @@ -0,0 +1,16 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", not(target_os = "wasi")))] // WASI does not support all fs operations + +use tempfile::tempdir; +use tokio::fs; + +#[tokio::test] +async fn write() { + let dir = tempdir().unwrap(); + let path = dir.path().join("test.txt"); + + fs::write(&path, "Hello, World!").await.unwrap(); + + let contents = fs::read_to_string(&path).await.unwrap(); + assert_eq!(contents, "Hello, World!"); +} From da5b9e192d376277c2892718771fd6b9d7a5aa3d Mon Sep 17 00:00:00 2001 From: Motoyuki Kimura Date: Wed, 3 Sep 2025 17:21:43 +0900 Subject: [PATCH 02/14] own resources & restructure code --- tokio/src/fs/mocks.rs | 10 +++++ tokio/src/fs/write.rs | 31 +++++++------- tokio/src/io/uring/utils.rs | 29 +++++++++++++ tokio/src/io/uring/write.rs | 32 ++++++++------ tokio/src/runtime/driver/op.rs | 6 ++- tokio/src/util/as_ref.rs | 78 +++++++++++++++++++++++++++++++--- 6 files changed, 149 insertions(+), 37 deletions(-) diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index 5343fb90b38..537ca2b1131 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -2,6 +2,8 @@ use mockall::mock; use crate::sync::oneshot; +#[cfg(all(test, unix))] +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::{ cell::RefCell, collections::VecDeque, @@ -89,6 +91,14 @@ impl Write for &'_ MockFile { } } +#[cfg(all(test, unix))] +impl From for OwnedFd { + #[inline] + fn from(file: MockFile) -> OwnedFd { + unsafe { OwnedFd::from_raw_fd(file.as_raw_fd()) } + } +} + tokio_thread_local! { static QUEUE: RefCell>> = RefCell::new(VecDeque::new()) } diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 13ed9fa5376..44099211cda 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -1,4 +1,5 @@ use crate::fs::asyncify; +use crate::util::as_ref::OwnedBuf; use std::{io, path::Path}; @@ -24,12 +25,15 @@ use std::{io, path::Path}; /// # } /// ``` pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Result<()> { + let path = path.as_ref(); + let contents = crate::util::as_ref::upgrade(contents); + #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))] { let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); if driver_handle.check_and_init()? { - return write_uring(path, contents.as_ref()).await; + return write_uring(path, contents).await; } } @@ -37,38 +41,33 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re } #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))] -async fn write_uring(path: impl AsRef, mut buf: &[u8]) -> io::Result<()> { - use crate::{fs::OpenOptions, runtime::driver::op::Op}; - use std::os::fd::AsFd; +async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> { + use crate::{fs::OpenOptions, io::uring::utils::SharedFd, runtime::driver::op::Op}; let file = OpenOptions::new() .write(true) .create(true) .truncate(true) - .open(path.as_ref()) + .open(path) .await?; + let fd = SharedFd::try_from(file).expect("unexpected in-flight operation detected"); + let mut pos = 0; - let fd = file.as_fd(); + let mut buf = contents; while !buf.is_empty() { - let n = Op::write_at(fd, buf, pos)?.await? as usize; + let n = Op::write_at(fd.clone(), buf.clone(), pos)?.await? as usize; if n == 0 { return Err(io::ErrorKind::WriteZero.into()); } - buf = &buf[n..]; + buf.advance(n); pos += n as u64; } Ok(()) } -async fn write_spawn_blocking(path: P, contents: C) -> io::Result<()> -where - P: AsRef, - C: AsRef<[u8]>, -{ - let path = path.as_ref().to_owned(); - let contents = crate::util::as_ref::upgrade(contents); - +async fn write_spawn_blocking(path: &Path, contents: OwnedBuf) -> io::Result<()> { + let path = path.to_owned(); asyncify(move || std::fs::write(path, contents)).await } diff --git a/tokio/src/io/uring/utils.rs b/tokio/src/io/uring/utils.rs index e30e7a5ddc4..e2f6d04abe6 100644 --- a/tokio/src/io/uring/utils.rs +++ b/tokio/src/io/uring/utils.rs @@ -1,6 +1,35 @@ +use std::os::fd::{AsRawFd, OwnedFd, RawFd}; use std::os::unix::ffi::OsStrExt; +use std::sync::Arc; use std::{ffi::CString, io, path::Path}; pub(crate) fn cstr(p: &Path) -> io::Result { Ok(CString::new(p.as_os_str().as_bytes())?) } + +/// A reference-counted handle to the `OwnedFd`. +#[derive(Clone, Debug)] +pub(crate) struct SharedFd { + inner: Arc, +} + +impl SharedFd { + pub(crate) fn new(fd: OwnedFd) -> SharedFd { + SharedFd { + inner: Arc::new(fd), + } + } +} + +impl AsRawFd for SharedFd { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl TryFrom for SharedFd { + type Error = crate::fs::File; + fn try_from(value: crate::fs::File) -> Result { + Ok(SharedFd::new(OwnedFd::from(value.try_into_std()?))) + } +} diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index df3badb9bb9..17bba4c8bb6 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -1,17 +1,22 @@ -use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; -use io_uring::{opcode, types}; -use std::{ - cmp, io, - os::fd::{AsRawFd, BorrowedFd}, +use crate::{ + io::uring::utils::SharedFd, + runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}, + util::as_ref::OwnedBuf, }; +use io_uring::{opcode, types}; +use std::{io, os::fd::AsRawFd}; +// This holds a strong ref to the resources such as `fd`, `buf`, +// preventing these resources from being dropped while the operation is in-flight. #[derive(Debug)] -pub(crate) struct Write; +pub(crate) struct Write { + _fd: SharedFd, + _buf: OwnedBuf, +} impl Completable for Write { // The number of bytes written. type Output = u32; - fn complete(self, cqe: CqeResult) -> io::Result { cqe.result } @@ -24,19 +29,18 @@ impl Cancellable for Write { } impl Op { - pub(crate) fn write_at(fd: BorrowedFd<'_>, buf: &[u8], offset: u64) -> io::Result { + pub(crate) fn write_at(fd: SharedFd, buf: OwnedBuf, offset: u64) -> io::Result { // There is a cap on how many bytes we can write in a single uring write operation. // ref: https://github.com/axboe/liburing/discussions/497 - let len: u32 = cmp::min(buf.len(), u32::MAX as usize) as u32; + let len: u32 = std::cmp::min(buf.len(), u32::MAX as usize) as u32; - let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), buf.as_ptr(), len) + let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), buf.as_ref().as_ptr(), len) .offset(offset) .build(); - // SAFETY: `fd` and `buf` is valid until this future completes. - // If this operation is cancelled, drop of `Op` occurs before dropping - // `fd` or `buf`, ensuring the safety contract holds. - let op = unsafe { Op::new(sqe, Write) }; + // SAFETY: `fd` and `buf` are owned by this `Op`, ensuring that these params are valid + // until `Op::drop` gets called. + let op = unsafe { Op::new(sqe, Write { _buf: buf, _fd: fd }) }; Ok(op) } } diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 81ad0493cb4..12544d3a37b 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -17,7 +17,11 @@ pub(crate) enum CancelData { // so `#[allow(dead_code)]` is needed. #[allow(dead_code)] Open, ), - Write(Write), + Write( + // This field isn't accessed directly, but it holds cancellation data, + // so `#[allow(dead_code)]` is needed. + #[allow(dead_code)] Write, + ), } #[derive(Debug)] diff --git a/tokio/src/util/as_ref.rs b/tokio/src/util/as_ref.rs index 464975d6070..61feca08cbd 100644 --- a/tokio/src/util/as_ref.rs +++ b/tokio/src/util/as_ref.rs @@ -1,16 +1,69 @@ +use std::sync::Arc; + +#[cfg(feature = "io-util")] +use bytes::Buf; + use super::typeid; -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum OwnedBuf { - Vec(Vec), + Vec(VecBuf), #[cfg(feature = "io-util")] Bytes(bytes::Bytes), } +#[allow(unused)] +impl OwnedBuf { + pub(crate) fn len(&self) -> usize { + match self { + OwnedBuf::Vec(vec) => vec.len(), + #[cfg(feature = "io-util")] + OwnedBuf::Bytes(b) => b.len(), + } + } + + pub(crate) fn advance(&mut self, n: usize) { + match self { + OwnedBuf::Vec(vec) => vec.advance(n), + #[cfg(feature = "io-util")] + OwnedBuf::Bytes(b) => b.advance(n), + } + } + + pub(crate) fn is_empty(&self) -> bool { + match self { + Self::Vec(vec) => vec.is_empty(), + #[cfg(feature = "io-util")] + Self::Bytes(bytes) => bytes.is_empty(), + } + } +} + +/// A Vec-like, reference-counted buffer. +#[derive(Debug, Clone)] +pub(crate) struct VecBuf { + data: Arc<[u8]>, + pos: usize, +} + +impl VecBuf { + fn len(&self) -> usize { + self.data.len().saturating_sub(self.pos) + } + + fn advance(&mut self, n: usize) { + self.pos = self.pos.saturating_add(n); + } + + fn is_empty(&self) -> bool { + self.data.len().saturating_sub(self.pos) == 0 + } +} + impl AsRef<[u8]> for OwnedBuf { fn as_ref(&self) -> &[u8] { match self { - Self::Vec(vec) => vec, + Self::Vec(VecBuf { data, pos }) => &data[*pos..], #[cfg(feature = "io-util")] Self::Bytes(bytes) => bytes, } @@ -19,12 +72,22 @@ impl AsRef<[u8]> for OwnedBuf { pub(crate) fn upgrade>(buf: B) -> OwnedBuf { let buf = match unsafe { typeid::try_transmute::>(buf) } { - Ok(vec) => return OwnedBuf::Vec(vec), + Ok(vec) => { + return OwnedBuf::Vec(VecBuf { + data: vec.into(), + pos: 0, + }); + } Err(original_buf) => original_buf, }; let buf = match unsafe { typeid::try_transmute::(buf) } { - Ok(string) => return OwnedBuf::Vec(string.into_bytes()), + Ok(string) => { + return OwnedBuf::Vec(VecBuf { + data: string.into_bytes().into(), + pos: 0, + }); + } Err(original_buf) => original_buf, }; @@ -34,5 +97,8 @@ pub(crate) fn upgrade>(buf: B) -> OwnedBuf { Err(original_buf) => original_buf, }; - OwnedBuf::Vec(buf.as_ref().to_owned()) + OwnedBuf::Vec(VecBuf { + data: buf.as_ref().to_owned().into(), + pos: 0, + }) } From 7c88b4ac018c0e383c07faa77668b65e91ee52af Mon Sep 17 00:00:00 2001 From: Motoyuki Kimura Date: Thu, 4 Sep 2025 23:16:20 +0900 Subject: [PATCH 03/14] remove refcounted types --- tokio/src/fs/mocks.rs | 10 ----- tokio/src/fs/write.rs | 14 +++--- tokio/src/io/uring/utils.rs | 29 ------------- tokio/src/io/uring/write.rs | 30 ++++++------- tokio/src/runtime/driver/op.rs | 6 +-- tokio/src/util/as_ref.rs | 78 +++------------------------------- 6 files changed, 26 insertions(+), 141 deletions(-) diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index 537ca2b1131..5343fb90b38 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -2,8 +2,6 @@ use mockall::mock; use crate::sync::oneshot; -#[cfg(all(test, unix))] -use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::{ cell::RefCell, collections::VecDeque, @@ -91,14 +89,6 @@ impl Write for &'_ MockFile { } } -#[cfg(all(test, unix))] -impl From for OwnedFd { - #[inline] - fn from(file: MockFile) -> OwnedFd { - unsafe { OwnedFd::from_raw_fd(file.as_raw_fd()) } - } -} - tokio_thread_local! { static QUEUE: RefCell>> = RefCell::new(VecDeque::new()) } diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 44099211cda..8ee4ead50a5 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -1,5 +1,4 @@ -use crate::fs::asyncify; -use crate::util::as_ref::OwnedBuf; +use crate::{fs::asyncify, util::as_ref::OwnedBuf}; use std::{io, path::Path}; @@ -42,7 +41,8 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))] async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> { - use crate::{fs::OpenOptions, io::uring::utils::SharedFd, runtime::driver::op::Op}; + use crate::{fs::OpenOptions, runtime::driver::op::Op}; + use std::os::fd::AsFd; let file = OpenOptions::new() .write(true) @@ -51,16 +51,16 @@ async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> { .open(path) .await?; - let fd = SharedFd::try_from(file).expect("unexpected in-flight operation detected"); + let fd = file.as_fd(); let mut pos = 0; - let mut buf = contents; + let mut buf = contents.as_ref(); while !buf.is_empty() { - let n = Op::write_at(fd.clone(), buf.clone(), pos)?.await? as usize; + let n = Op::write_at(fd, buf, pos)?.await? as usize; if n == 0 { return Err(io::ErrorKind::WriteZero.into()); } - buf.advance(n); + buf = &buf[n..]; pos += n as u64; } diff --git a/tokio/src/io/uring/utils.rs b/tokio/src/io/uring/utils.rs index e2f6d04abe6..e30e7a5ddc4 100644 --- a/tokio/src/io/uring/utils.rs +++ b/tokio/src/io/uring/utils.rs @@ -1,35 +1,6 @@ -use std::os::fd::{AsRawFd, OwnedFd, RawFd}; use std::os::unix::ffi::OsStrExt; -use std::sync::Arc; use std::{ffi::CString, io, path::Path}; pub(crate) fn cstr(p: &Path) -> io::Result { Ok(CString::new(p.as_os_str().as_bytes())?) } - -/// A reference-counted handle to the `OwnedFd`. -#[derive(Clone, Debug)] -pub(crate) struct SharedFd { - inner: Arc, -} - -impl SharedFd { - pub(crate) fn new(fd: OwnedFd) -> SharedFd { - SharedFd { - inner: Arc::new(fd), - } - } -} - -impl AsRawFd for SharedFd { - fn as_raw_fd(&self) -> RawFd { - self.inner.as_raw_fd() - } -} - -impl TryFrom for SharedFd { - type Error = crate::fs::File; - fn try_from(value: crate::fs::File) -> Result { - Ok(SharedFd::new(OwnedFd::from(value.try_into_std()?))) - } -} diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 17bba4c8bb6..334473435a3 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -1,18 +1,12 @@ -use crate::{ - io::uring::utils::SharedFd, - runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}, - util::as_ref::OwnedBuf, -}; +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; use io_uring::{opcode, types}; -use std::{io, os::fd::AsRawFd}; +use std::{ + cmp, io, + os::fd::{AsRawFd, BorrowedFd}, +}; -// This holds a strong ref to the resources such as `fd`, `buf`, -// preventing these resources from being dropped while the operation is in-flight. #[derive(Debug)] -pub(crate) struct Write { - _fd: SharedFd, - _buf: OwnedBuf, -} +pub(crate) struct Write; impl Completable for Write { // The number of bytes written. @@ -29,18 +23,18 @@ impl Cancellable for Write { } impl Op { - pub(crate) fn write_at(fd: SharedFd, buf: OwnedBuf, offset: u64) -> io::Result { + pub(crate) fn write_at(fd: BorrowedFd<'_>, buf: &[u8], offset: u64) -> io::Result { // There is a cap on how many bytes we can write in a single uring write operation. // ref: https://github.com/axboe/liburing/discussions/497 - let len: u32 = std::cmp::min(buf.len(), u32::MAX as usize) as u32; + let len: u32 = cmp::min(buf.len(), u32::MAX as usize) as u32; - let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), buf.as_ref().as_ptr(), len) + let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), buf.as_ptr(), len) .offset(offset) .build(); - // SAFETY: `fd` and `buf` are owned by this `Op`, ensuring that these params are valid - // until `Op::drop` gets called. - let op = unsafe { Op::new(sqe, Write { _buf: buf, _fd: fd }) }; + // SAFETY: `fd` and `buf` are owned by caller function, ensuring these params are + // valid until operation completes. + let op = unsafe { Op::new(sqe, Write) }; Ok(op) } } diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 12544d3a37b..81ad0493cb4 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -17,11 +17,7 @@ pub(crate) enum CancelData { // so `#[allow(dead_code)]` is needed. #[allow(dead_code)] Open, ), - Write( - // This field isn't accessed directly, but it holds cancellation data, - // so `#[allow(dead_code)]` is needed. - #[allow(dead_code)] Write, - ), + Write(Write), } #[derive(Debug)] diff --git a/tokio/src/util/as_ref.rs b/tokio/src/util/as_ref.rs index 61feca08cbd..464975d6070 100644 --- a/tokio/src/util/as_ref.rs +++ b/tokio/src/util/as_ref.rs @@ -1,69 +1,16 @@ -use std::sync::Arc; - -#[cfg(feature = "io-util")] -use bytes::Buf; - use super::typeid; -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) enum OwnedBuf { - Vec(VecBuf), + Vec(Vec), #[cfg(feature = "io-util")] Bytes(bytes::Bytes), } -#[allow(unused)] -impl OwnedBuf { - pub(crate) fn len(&self) -> usize { - match self { - OwnedBuf::Vec(vec) => vec.len(), - #[cfg(feature = "io-util")] - OwnedBuf::Bytes(b) => b.len(), - } - } - - pub(crate) fn advance(&mut self, n: usize) { - match self { - OwnedBuf::Vec(vec) => vec.advance(n), - #[cfg(feature = "io-util")] - OwnedBuf::Bytes(b) => b.advance(n), - } - } - - pub(crate) fn is_empty(&self) -> bool { - match self { - Self::Vec(vec) => vec.is_empty(), - #[cfg(feature = "io-util")] - Self::Bytes(bytes) => bytes.is_empty(), - } - } -} - -/// A Vec-like, reference-counted buffer. -#[derive(Debug, Clone)] -pub(crate) struct VecBuf { - data: Arc<[u8]>, - pos: usize, -} - -impl VecBuf { - fn len(&self) -> usize { - self.data.len().saturating_sub(self.pos) - } - - fn advance(&mut self, n: usize) { - self.pos = self.pos.saturating_add(n); - } - - fn is_empty(&self) -> bool { - self.data.len().saturating_sub(self.pos) == 0 - } -} - impl AsRef<[u8]> for OwnedBuf { fn as_ref(&self) -> &[u8] { match self { - Self::Vec(VecBuf { data, pos }) => &data[*pos..], + Self::Vec(vec) => vec, #[cfg(feature = "io-util")] Self::Bytes(bytes) => bytes, } @@ -72,22 +19,12 @@ impl AsRef<[u8]> for OwnedBuf { pub(crate) fn upgrade>(buf: B) -> OwnedBuf { let buf = match unsafe { typeid::try_transmute::>(buf) } { - Ok(vec) => { - return OwnedBuf::Vec(VecBuf { - data: vec.into(), - pos: 0, - }); - } + Ok(vec) => return OwnedBuf::Vec(vec), Err(original_buf) => original_buf, }; let buf = match unsafe { typeid::try_transmute::(buf) } { - Ok(string) => { - return OwnedBuf::Vec(VecBuf { - data: string.into_bytes().into(), - pos: 0, - }); - } + Ok(string) => return OwnedBuf::Vec(string.into_bytes()), Err(original_buf) => original_buf, }; @@ -97,8 +34,5 @@ pub(crate) fn upgrade>(buf: B) -> OwnedBuf { Err(original_buf) => original_buf, }; - OwnedBuf::Vec(VecBuf { - data: buf.as_ref().to_owned().into(), - pos: 0, - }) + OwnedBuf::Vec(buf.as_ref().to_owned()) } From 05c8c17e06ca243dadd44d1467447fe6e4ed8dc9 Mon Sep 17 00:00:00 2001 From: Motoyuki Kimura Date: Sat, 6 Sep 2025 02:03:10 +0900 Subject: [PATCH 04/14] make `write_at` unsafe fn and add more safety comments --- tokio/src/fs/write.rs | 10 +++++++++- tokio/src/io/uring/write.rs | 11 ++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 8ee4ead50a5..bf7fbd19544 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -56,7 +56,15 @@ async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> { let mut pos = 0; let mut buf = contents.as_ref(); while !buf.is_empty() { - let n = Op::write_at(fd, buf, pos)?.await? as usize; + // SAFETY: + // If the operation completes successfully, `fd` and `buf` are still + // alive within the scope of this function, so remain valid. + // + // In the case of cancellation, local variables within the scope of + // this `async fn` are dropped in the reverse order of their declaration. + // Therefore, `Op` is dropped before `fd` and `buf`, ensuring that the + // operation finishes gracefully before these resources are dropped. + let n = unsafe { Op::write_at(fd, buf, pos) }?.await? as usize; if n == 0 { return Err(io::ErrorKind::WriteZero.into()); } diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 334473435a3..95e4dce91f6 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -23,7 +23,12 @@ impl Cancellable for Write { } impl Op { - pub(crate) fn write_at(fd: BorrowedFd<'_>, buf: &[u8], offset: u64) -> io::Result { + /// # SAFETY + /// + /// The caller must ensure that `fd` and `buf` remain valid until the + /// operation finishes (or gets cancelled) and the `Op::drop` completes. + /// Otherwise, the kernel could access freed memory, breaking soundness. + pub(crate) unsafe fn write_at(fd: BorrowedFd<'_>, buf: &[u8], offset: u64) -> io::Result { // There is a cap on how many bytes we can write in a single uring write operation. // ref: https://github.com/axboe/liburing/discussions/497 let len: u32 = cmp::min(buf.len(), u32::MAX as usize) as u32; @@ -32,8 +37,8 @@ impl Op { .offset(offset) .build(); - // SAFETY: `fd` and `buf` are owned by caller function, ensuring these params are - // valid until operation completes. + // SAFETY: `fd` and `buf` valid until the operation completes or gets cancelled + // and the `Op::drop` completes. let op = unsafe { Op::new(sqe, Write) }; Ok(op) } From 508c2fffcb8ffaab0bc9e6593b7f492ef8277e09 Mon Sep 17 00:00:00 2001 From: Motoyuki Kimura Date: Mon, 8 Sep 2025 18:59:44 +0900 Subject: [PATCH 05/14] fix Buffer ownership issue --- tokio/src/fs/mocks.rs | 10 +++++++ tokio/src/fs/write.rs | 35 +++++++++++----------- tokio/src/io/uring/write.rs | 55 +++++++++++++++++++++------------- tokio/src/runtime/driver/op.rs | 6 +++- tokio/src/util/as_ref.rs | 16 ++++++++++ 5 files changed, 83 insertions(+), 39 deletions(-) diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index 5343fb90b38..537ca2b1131 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -2,6 +2,8 @@ use mockall::mock; use crate::sync::oneshot; +#[cfg(all(test, unix))] +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::{ cell::RefCell, collections::VecDeque, @@ -89,6 +91,14 @@ impl Write for &'_ MockFile { } } +#[cfg(all(test, unix))] +impl From for OwnedFd { + #[inline] + fn from(file: MockFile) -> OwnedFd { + unsafe { OwnedFd::from_raw_fd(file.as_raw_fd()) } + } +} + tokio_thread_local! { static QUEUE: RefCell>> = RefCell::new(VecDeque::new()) } diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index bf7fbd19544..2b8dd1b8d77 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -40,9 +40,9 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re } #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))] -async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> { +async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { use crate::{fs::OpenOptions, runtime::driver::op::Op}; - use std::os::fd::AsFd; + use std::os::fd::OwnedFd; let file = OpenOptions::new() .write(true) @@ -51,25 +51,26 @@ async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> { .open(path) .await?; - let fd = file.as_fd(); + let mut fd: OwnedFd = file + .try_into_std() + .expect("unexpected in-flight operation detected") + .into(); - let mut pos = 0; - let mut buf = contents.as_ref(); - while !buf.is_empty() { - // SAFETY: - // If the operation completes successfully, `fd` and `buf` are still - // alive within the scope of this function, so remain valid. - // - // In the case of cancellation, local variables within the scope of - // this `async fn` are dropped in the reverse order of their declaration. - // Therefore, `Op` is dropped before `fd` and `buf`, ensuring that the - // operation finishes gracefully before these resources are dropped. - let n = unsafe { Op::write_at(fd, buf, pos) }?.await? as usize; + let total: usize = buf.len(); + let mut offset: usize = 0; + while offset < total { + // There is a cap on how many bytes we can write in a single uring write operation. + // ref: https://github.com/axboe/liburing/discussions/497 + let len = std::cmp::min(total - offset, u32::MAX as usize) as u32; + + let (n, _buf, _fd) = Op::write_at(fd, buf, offset, len, offset as u64)?.await?; if n == 0 { return Err(io::ErrorKind::WriteZero.into()); } - buf = &buf[n..]; - pos += n as u64; + + buf = _buf; + fd = _fd; + offset += n as usize; } Ok(()) diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 95e4dce91f6..ad876d706bb 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -1,18 +1,23 @@ -use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use crate::{ + runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}, + util::as_ref::OwnedBuf, +}; use io_uring::{opcode, types}; use std::{ - cmp, io, - os::fd::{AsRawFd, BorrowedFd}, + io, + os::fd::{AsRawFd, OwnedFd}, }; #[derive(Debug)] -pub(crate) struct Write; +pub(crate) struct Write { + buf: OwnedBuf, + fd: OwnedFd, +} impl Completable for Write { - // The number of bytes written. - type Output = u32; + type Output = (u32, OwnedBuf, OwnedFd); fn complete(self, cqe: CqeResult) -> io::Result { - cqe.result + Ok((cqe.result?, self.buf, self.fd)) } } @@ -23,23 +28,31 @@ impl Cancellable for Write { } impl Op { - /// # SAFETY - /// - /// The caller must ensure that `fd` and `buf` remain valid until the - /// operation finishes (or gets cancelled) and the `Op::drop` completes. - /// Otherwise, the kernel could access freed memory, breaking soundness. - pub(crate) unsafe fn write_at(fd: BorrowedFd<'_>, buf: &[u8], offset: u64) -> io::Result { - // There is a cap on how many bytes we can write in a single uring write operation. - // ref: https://github.com/axboe/liburing/discussions/497 - let len: u32 = cmp::min(buf.len(), u32::MAX as usize) as u32; + /// Issue a write that starts at `buf_offset` within `buf` and writes `len` bytes + /// into `file` at `file_offset`. + pub(crate) fn write_at( + fd: OwnedFd, + buf: OwnedBuf, + buf_offset: usize, + len: u32, + file_offset: u64, + ) -> io::Result { + // Check if `buf_offset` stays in bounds of the allocation + debug_assert!(buf_offset + len as usize <= buf.len()); + + // SAFETY: + // - `buf_offset` stays in bounds of the allocation. + // - `buf` is derived from an actual allocation, and the entire memory + // range is in bounds of that allocation. + let ptr = unsafe { buf.as_ptr().add(buf_offset) }; - let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), buf.as_ptr(), len) - .offset(offset) + let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), ptr, len) + .offset(file_offset) .build(); - // SAFETY: `fd` and `buf` valid until the operation completes or gets cancelled - // and the `Op::drop` completes. - let op = unsafe { Op::new(sqe, Write) }; + // SAFETY: parameters of the entry, such as `fd` and `buf`, are valid + // until this operation completes. + let op = unsafe { Op::new(sqe, Write { buf, fd }) }; Ok(op) } } diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 81ad0493cb4..12544d3a37b 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -17,7 +17,11 @@ pub(crate) enum CancelData { // so `#[allow(dead_code)]` is needed. #[allow(dead_code)] Open, ), - Write(Write), + Write( + // This field isn't accessed directly, but it holds cancellation data, + // so `#[allow(dead_code)]` is needed. + #[allow(dead_code)] Write, + ), } #[derive(Debug)] diff --git a/tokio/src/util/as_ref.rs b/tokio/src/util/as_ref.rs index 464975d6070..35f7a109901 100644 --- a/tokio/src/util/as_ref.rs +++ b/tokio/src/util/as_ref.rs @@ -7,6 +7,22 @@ pub(crate) enum OwnedBuf { Bytes(bytes::Bytes), } +impl OwnedBuf { + cfg_tokio_uring! { + pub(crate) fn len(&self) -> usize { + match self { + Self::Vec(vec) => vec.len(), + #[cfg(feature = "io-util")] + Self::Bytes(bytes) => bytes.len(), + } + } + + pub(crate) fn as_ptr(&self) -> *const u8 { + self.as_ref().as_ptr() + } + } +} + impl AsRef<[u8]> for OwnedBuf { fn as_ref(&self) -> &[u8] { match self { From 4fc67646bef1b5a20c3bd474c66a09474002a49e Mon Sep 17 00:00:00 2001 From: mox692 Date: Thu, 11 Sep 2025 11:44:38 +0900 Subject: [PATCH 06/14] address review: reduce duplication of the #[allow(dead_code)] --- tokio/src/runtime/driver/op.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 12544d3a37b..413dd4a83f4 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -10,18 +10,13 @@ use std::task::Poll; use std::task::Waker; use std::{io, mem}; +// This field isn't accessed directly, but it holds cancellation data, +// so `#[allow(dead_code)]` is needed. +#[allow(dead_code)] #[derive(Debug)] pub(crate) enum CancelData { - Open( - // This field isn't accessed directly, but it holds cancellation data, - // so `#[allow(dead_code)]` is needed. - #[allow(dead_code)] Open, - ), - Write( - // This field isn't accessed directly, but it holds cancellation data, - // so `#[allow(dead_code)]` is needed. - #[allow(dead_code)] Write, - ), + Open(Open), + Write(Write), } #[derive(Debug)] From a3ea51673a18541d47371e6137b7550f86109eef Mon Sep 17 00:00:00 2001 From: mox692 Date: Thu, 11 Sep 2025 12:28:34 +0900 Subject: [PATCH 07/14] address review: remove OwnedBuf impl --- tokio/src/fs/write.rs | 2 +- tokio/src/io/uring/write.rs | 4 ++-- tokio/src/util/as_ref.rs | 16 ---------------- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 2b8dd1b8d77..26e5e749eae 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -56,7 +56,7 @@ async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { .expect("unexpected in-flight operation detected") .into(); - let total: usize = buf.len(); + let total: usize = buf.as_ref().len(); let mut offset: usize = 0; while offset < total { // There is a cap on how many bytes we can write in a single uring write operation. diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index ad876d706bb..4f95b81bde3 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -38,13 +38,13 @@ impl Op { file_offset: u64, ) -> io::Result { // Check if `buf_offset` stays in bounds of the allocation - debug_assert!(buf_offset + len as usize <= buf.len()); + debug_assert!(buf_offset + len as usize <= buf.as_ref().len()); // SAFETY: // - `buf_offset` stays in bounds of the allocation. // - `buf` is derived from an actual allocation, and the entire memory // range is in bounds of that allocation. - let ptr = unsafe { buf.as_ptr().add(buf_offset) }; + let ptr = unsafe { buf.as_ref().as_ptr().add(buf_offset) }; let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), ptr, len) .offset(file_offset) diff --git a/tokio/src/util/as_ref.rs b/tokio/src/util/as_ref.rs index 35f7a109901..464975d6070 100644 --- a/tokio/src/util/as_ref.rs +++ b/tokio/src/util/as_ref.rs @@ -7,22 +7,6 @@ pub(crate) enum OwnedBuf { Bytes(bytes::Bytes), } -impl OwnedBuf { - cfg_tokio_uring! { - pub(crate) fn len(&self) -> usize { - match self { - Self::Vec(vec) => vec.len(), - #[cfg(feature = "io-util")] - Self::Bytes(bytes) => bytes.len(), - } - } - - pub(crate) fn as_ptr(&self) -> *const u8 { - self.as_ref().as_ptr() - } - } -} - impl AsRef<[u8]> for OwnedBuf { fn as_ref(&self) -> &[u8] { match self { From ff699c05944693fc91d2270e49a078af7a5297ce Mon Sep 17 00:00:00 2001 From: mox692 Date: Thu, 11 Sep 2025 12:36:34 +0900 Subject: [PATCH 08/14] address review: remove unsafe --- tokio/src/io/uring/write.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 4f95b81bde3..8973fe7a94c 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -37,14 +37,7 @@ impl Op { len: u32, file_offset: u64, ) -> io::Result { - // Check if `buf_offset` stays in bounds of the allocation - debug_assert!(buf_offset + len as usize <= buf.as_ref().len()); - - // SAFETY: - // - `buf_offset` stays in bounds of the allocation. - // - `buf` is derived from an actual allocation, and the entire memory - // range is in bounds of that allocation. - let ptr = unsafe { buf.as_ref().as_ptr().add(buf_offset) }; + let ptr = buf.as_ref()[buf_offset..buf_offset + len as usize].as_ptr(); let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), ptr, len) .offset(file_offset) From c2ed6599b2a33c3aa5a79f14d373d341206ab8bf Mon Sep 17 00:00:00 2001 From: mox692 Date: Thu, 11 Sep 2025 13:18:46 +0900 Subject: [PATCH 09/14] address review: move clamping logic --- tokio/src/fs/write.rs | 14 ++++++-------- tokio/src/io/uring/write.rs | 5 +++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 26e5e749eae..01c9bebc881 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -57,20 +57,18 @@ async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { .into(); let total: usize = buf.as_ref().len(); - let mut offset: usize = 0; - while offset < total { - // There is a cap on how many bytes we can write in a single uring write operation. - // ref: https://github.com/axboe/liburing/discussions/497 - let len = std::cmp::min(total - offset, u32::MAX as usize) as u32; - - let (n, _buf, _fd) = Op::write_at(fd, buf, offset, len, offset as u64)?.await?; + let mut buf_offset: usize = 0; + let mut file_offset: u64 = 0; + while buf_offset < total { + let (n, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await?; if n == 0 { return Err(io::ErrorKind::WriteZero.into()); } buf = _buf; fd = _fd; - offset += n as usize; + buf_offset += n as usize; + file_offset += n as u64; } Ok(()) diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 8973fe7a94c..92acae94791 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -28,15 +28,16 @@ impl Cancellable for Write { } impl Op { - /// Issue a write that starts at `buf_offset` within `buf` and writes `len` bytes + /// Issue a write that starts at `buf_offset` within `buf` and writes some bytes /// into `file` at `file_offset`. pub(crate) fn write_at( fd: OwnedFd, buf: OwnedBuf, buf_offset: usize, - len: u32, file_offset: u64, ) -> io::Result { + let len = u32::try_from(buf.as_ref().len() - buf_offset).unwrap_or(u32::MAX); + let ptr = buf.as_ref()[buf_offset..buf_offset + len as usize].as_ptr(); let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), ptr, len) From af612ae52efadd5f15a3e5b9204d3a794ea45f90 Mon Sep 17 00:00:00 2001 From: mox692 Date: Thu, 11 Sep 2025 16:01:11 +0900 Subject: [PATCH 10/14] add comment --- tokio/src/io/uring/write.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 92acae94791..9332bd0071a 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -36,6 +36,8 @@ impl Op { buf_offset: usize, file_offset: u64, ) -> io::Result { + // There is a cap on how many bytes we can write in a single uring write operation. + // ref: https://github.com/axboe/liburing/discussions/497 let len = u32::try_from(buf.as_ref().len() - buf_offset).unwrap_or(u32::MAX); let ptr = buf.as_ref()[buf_offset..buf_offset + len as usize].as_ptr(); From 02486978d1a6c976b3e8b06051015b6df72cecf9 Mon Sep 17 00:00:00 2001 From: Qi Date: Mon, 29 Sep 2025 23:51:20 +0800 Subject: [PATCH 11/14] ci: freeze rustc on nightly-2025-01-25 in `netlify.toml` (#7652) Signed-off-by: ADD-SP --- netlify.toml | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/netlify.toml b/netlify.toml index e6e7343de24..c36d26b0864 100644 --- a/netlify.toml +++ b/netlify.toml @@ -1,6 +1,15 @@ [build] + # TODO: unfreeze toolchain + # error[E0557]: feature has been removed + # --> /opt/buildhome/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/lock_api-0.4.13/src/lib.rs:89:29 + # | + # 89 | #![cfg_attr(docsrs, feature(doc_auto_cfg))] + # | ^^^^^^^^^^^^ feature has been removed + # | + # = note: removed in 1.58.0; see Date: Tue, 30 Sep 2025 19:53:19 +0800 Subject: [PATCH 12/14] fs: emit compilation error without `tokio_unstable` for `io-uring` (#7634) Signed-off-by: ADD-SP --- .cirrus.yml | 12 ++++- .github/workflows/ci.yml | 106 ++++++++++++++++++++++----------------- tokio-macros/src/lib.rs | 6 +-- tokio/src/lib.rs | 3 ++ 4 files changed, 75 insertions(+), 52 deletions(-) diff --git a/.cirrus.yml b/.cirrus.yml index 0148ad414d8..82fd7a7436d 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -6,6 +6,8 @@ env: RUST_STABLE: stable RUST_NIGHTLY: nightly-2025-01-25 RUSTFLAGS: -D warnings + # This excludes unstable features like io_uring, which require '--cfg tokio_unstable'. + TOKIO_STABLE_FEATURES: full,test-util # Test FreeBSD in a full VM on cirrus-ci.com. Test the i686 target too, in the # same VM. The binary will be built in 32-bit mode, but will execute on a @@ -23,7 +25,13 @@ task: rustc --version test_script: - . $HOME/.cargo/env - - cargo test --all --all-features + - cargo test --all --features $TOKIO_STABLE_FEATURES + # Free the disk space before the next build, + # otherwise cirrus-ci complains about "No space left on device". + - cargo clean + # Enable all unstable features, including io_uring, because it supports + # x86_64 FreeBSD. + - RUSTFLAGS="$RUSTFLAGS --cfg tokio_unstable" RUSTDOCFLAGS="$RUSTDOCFLAGS --cfg tokio_unstable" cargo test --all --all-features task: name: FreeBSD docs @@ -55,4 +63,4 @@ task: rustc --version test_script: - . $HOME/.cargo/env - - cargo test --all --all-features --target i686-unknown-freebsd + - cargo test --all --features $TOKIO_STABLE_FEATURES --target i686-unknown-freebsd diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 911600f92b9..8d028574ed1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,6 +29,9 @@ env: # - tokio-test/Cargo.toml # - tokio-stream/Cargo.toml rust_min: '1.70' + # This excludes unstable features like io_uring, + # which require '--cfg tokio_unstable'. + TOKIO_STABLE_FEATURES: "full,test-util" defaults: run: @@ -73,15 +76,13 @@ jobs: - uses: Swatinem/rust-cache@v2 - # Run `tokio` with `full` features. This excludes testing utilities which + # Run `tokio` with stable features. This excludes testing utilities which # can alter the runtime behavior of Tokio. - name: test tokio full run: | set -euxo pipefail - # We use `--features "full,test-util"` instead of `--all-features` since - # `--all-features` includes `io_uring`, which is not available on all targets. - cargo nextest run --features full,test-util - cargo test --doc --features full,test-util + cargo nextest run --features full + cargo test --doc --features full working-directory: tokio test-workspace-all-features: @@ -107,12 +108,11 @@ jobs: - uses: Swatinem/rust-cache@v2 - # Test **all** crates in the workspace with all features. - - name: test all --all-features + - name: test --features ${{ env.TOKIO_STABLE_FEATURES }} run: | set -euxo pipefail - cargo nextest run --workspace --all-features - cargo test --doc --workspace --all-features + cargo nextest run --workspace --features $TOKIO_STABLE_FEATURES + cargo test --doc --workspace --features $TOKIO_STABLE_FEATURES test-workspace-all-features-panic-abort: needs: basics @@ -137,10 +137,15 @@ jobs: - uses: Swatinem/rust-cache@v2 - - name: test all --all-features panic=abort + - name: test --features ${{ env.TOKIO_STABLE_FEATURES }} panic=abort run: | set -euxo pipefail - RUSTFLAGS="$RUSTFLAGS -C panic=abort -Zpanic-abort-tests" cargo nextest run --workspace --exclude tokio-macros --exclude tests-build --all-features --tests + RUSTFLAGS="$RUSTFLAGS -C panic=abort -Zpanic-abort-tests" cargo nextest run \ + --workspace \ + --exclude tokio-macros \ + --exclude tests-build \ + --features $TOKIO_STABLE_FEATURES \ + --tests test-integration-tests-per-feature: needs: basics @@ -204,8 +209,9 @@ jobs: run: sed -i '/\[features\]/a plsend = ["parking_lot/send_guard"]' tokio/Cargo.toml - uses: Swatinem/rust-cache@v2 - - name: Check tests with all features enabled - run: cargo check --workspace --all-features --tests + + - name: Check tests --features ${{ env.TOKIO_STABLE_FEATURES }} + run: cargo check --workspace --tests --features $TOKIO_STABLE_FEATURES valgrind: name: valgrind @@ -247,12 +253,11 @@ jobs: strategy: matrix: include: - # We use `--features "full,test-util"` instead of `--all-features` since - # `--all-features` includes `io_uring`, which is not available on all targets. - - { os: windows-latest, features: "full,test-util" } - - { os: ubuntu-latest, features: "full,test-util" } - - { os: ubuntu-latest, features: "full,test-util,io-uring" } - - { os: macos-latest, features: "full,test-util" } + - { os: windows-latest, extra_features: "" } + - { os: ubuntu-latest, extra_features: "" } + # only Linux supports io_uring + - { os: ubuntu-latest, extra_features: io-uring } + - { os: macos-latest, extra_features: "" } steps: - uses: actions/checkout@v5 - name: Install Rust ${{ env.rust_stable }} @@ -270,8 +275,8 @@ jobs: - name: test tokio full --cfg unstable run: | set -euxo pipefail - cargo nextest run --features ${{ matrix.features }} - cargo test --doc --features ${{ matrix.features }} + cargo nextest run --features $TOKIO_STABLE_FEATURES,${{ matrix.extra_features }} + cargo test --doc --features $TOKIO_STABLE_FEATURES,${{ matrix.extra_features }} working-directory: tokio env: RUSTFLAGS: --cfg tokio_unstable -Dwarnings @@ -304,8 +309,11 @@ jobs: - name: test tokio full --cfg unstable --cfg taskdump run: | set -euxo pipefail - cargo nextest run --all-features - cargo test --doc --all-features + # taskdump is an unstable feature, but it can only be enabled + # by --cfg tokio_taskdump, not by a feature flag, so we can + # use $TOKIO_STABLE_FEATURES here. + cargo nextest run --features $TOKIO_STABLE_FEATURES + cargo test --doc --features $TOKIO_STABLE_FEATURES working-directory: tokio env: RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings @@ -332,7 +340,8 @@ jobs: with: tool: cargo-nextest - uses: Swatinem/rust-cache@v2 - # Run `tokio` with "unstable" and "taskdump" cfg flags. + # Since the internal-mt-counters feature is only for debugging purposes, + # we can enable all features including unstable. - name: check tokio full --cfg unstable --cfg internal-mt-counters run: | set -euxo pipefail @@ -405,7 +414,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: miri-doc-test run: | - cargo miri test --doc --all-features --no-fail-fast + cargo miri test --doc --features $TOKIO_STABLE_FEATURES --no-fail-fast working-directory: tokio env: MIRIFLAGS: -Zmiri-disable-isolation -Zmiri-strict-provenance -Zmiri-retag-fields @@ -426,7 +435,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: asan - run: cargo test --workspace --all-features --target x86_64-unknown-linux-gnu --tests -- --test-threads 1 --nocapture + run: cargo test --workspace --features $TOKIO_STABLE_FEATURES --target x86_64-unknown-linux-gnu --tests -- --test-threads 1 --nocapture env: RUSTFLAGS: -Z sanitizer=address --cfg tokio_no_tuning_tests # Ignore `trybuild` errors as they are irrelevant and flaky on nightly @@ -444,6 +453,9 @@ jobs: rust-toolchain: ${{ env.rust_stable }} package: tokio release-type: minor + feature-group: only-explicit-features + # We don't care about the semver of unstable tokio features. + features: ${{ env.TOKIO_STABLE_FEATURES }} - name: Check semver for rest of the workspace if: ${{ !startsWith(github.event.pull_request.base.ref, 'tokio-1.') }} uses: obi1kenobi/cargo-semver-checks-action@v2 @@ -669,9 +681,9 @@ jobs: strategy: matrix: include: - - { name: "", rustflags: "" } - - { name: "--unstable", rustflags: "--cfg tokio_unstable -Dwarnings" } - - { name: "--unstable --taskdump", rustflags: "--cfg tokio_unstable -Dwarnings --cfg tokio_taskdump" } + - { name: "", rustflags: "", exclude_features: "io-uring" } + - { name: "--unstable", rustflags: "--cfg tokio_unstable -Dwarnings", exclude_features: "" } + - { name: "--unstable --taskdump", rustflags: "--cfg tokio_unstable -Dwarnings --cfg tokio_taskdump", exclude_features: "" } steps: - uses: actions/checkout@v5 - name: Install Rust ${{ env.rust_nightly }} @@ -684,7 +696,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: check --feature-powerset ${{ matrix.name }} - run: cargo hack check --all --feature-powerset --depth 2 --keep-going + run: cargo hack check --all --feature-powerset --exclude-features "${{ matrix.exclude_features }}" --depth 2 --keep-going env: RUSTFLAGS: ${{ matrix.rustflags }} @@ -698,17 +710,19 @@ jobs: with: toolchain: ${{ env.rust_min }} - uses: Swatinem/rust-cache@v2 - - name: "check --workspace --all-features" + - name: "cargo check" run: | if [[ "${{ github.event.pull_request.base.ref }}" =~ ^tokio-1\..* ]]; then # Only check `tokio` crate as the PR is backporting to an earlier tokio release. - cargo check -p tokio --all-features + + cargo check -p tokio --features $TOKIO_STABLE_FEATURES else # Check all crates in the workspace - cargo check --workspace --all-features + + cargo check -p tokio --features $TOKIO_STABLE_FEATURES + # Other crates doesn't have unstable features, so we can use --all-features. + cargo check -p tokio-macros -p tokio-stream -p tokio-util -p tokio-test --all-features fi - env: - RUSTFLAGS: "" # remove -Dwarnings minimal-versions: name: minimal-versions @@ -724,14 +738,15 @@ jobs: uses: taiki-e/install-action@cargo-hack - uses: Swatinem/rust-cache@v2 - - name: "check --all-features -Z minimal-versions" + - name: "check -Z minimal-versions" run: | # Remove dev-dependencies from Cargo.toml to prevent the next `cargo update` # from determining minimal versions based on dev-dependencies. cargo hack --remove-dev-deps --workspace # Update Cargo.lock to minimal version dependencies. cargo update -Z minimal-versions - cargo hack check --all-features --ignore-private + cargo hack check -p tokio --features $TOKIO_STABLE_FEATURES --ignore-private + cargo hack check -p tokio-macros -p tokio-stream -p tokio-util -p tokio-test --all-features --ignore-private - name: "check --all-features --unstable -Z minimal-versions" env: RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings @@ -766,11 +781,6 @@ jobs: clippy: name: clippy runs-on: ubuntu-latest - strategy: - matrix: - rustflags: - - "" - - "--cfg tokio_unstable --cfg tokio_taskdump -Dwarnings" steps: - uses: actions/checkout@v5 - name: Install Rust ${{ env.rust_clippy }} @@ -780,10 +790,12 @@ jobs: components: clippy - uses: Swatinem/rust-cache@v2 # Run clippy - - name: "clippy --all ${{ matrix.rustflags }}" - run: cargo clippy --all --tests --all-features --no-deps + - name: "clippy --all --features ${{ env.TOKIO_STABLE_FEATURES }}" + run: cargo clippy --all --tests --no-deps --features $TOKIO_STABLE_FEATURES + - name: "clippy --all --all-features --unstable" + run: cargo clippy --all --tests --no-deps --all-features env: - RUSTFLAGS: ${{ matrix.rustflags }} + RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings docs: name: docs @@ -970,7 +982,7 @@ jobs: toolchain: ${{ env.rust_nightly }} target: x86_64-unknown-redox - name: check tokio on redox - run: cargo check --target x86_64-unknown-redox --all-features + run: cargo check --target x86_64-unknown-redox --features $TOKIO_STABLE_FEATURES working-directory: tokio wasm32-unknown-unknown: @@ -1090,7 +1102,7 @@ jobs: with: tool: cargo-check-external-types@0.1.13 - name: check-external-types - run: cargo check-external-types --all-features + run: cargo check-external-types --features $TOKIO_STABLE_FEATURES working-directory: tokio check-fuzzing: diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 207727fe15a..647888bf513 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -282,7 +282,7 @@ use proc_macro::TokenStream; /// fn main() { /// tokio::runtime::Builder::new_current_thread() /// .enable_all() -/// .unhandled_panic(UnhandledPanic::ShutdownRuntime) +/// .unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime) /// .build() /// .unwrap() /// .block_on(async { @@ -539,7 +539,7 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { /// panic!("This panic will shutdown the runtime."); /// }).await; /// } -/// # #[cfg(not(tokio_unstable))] +/// /// # fn main() { } /// ``` /// @@ -560,7 +560,7 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { /// }).await; /// }) /// } -/// # #[cfg(not(tokio_unstable))] +/// /// # fn main() { } /// ``` /// diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 33ee9eb87f2..9ee0ca94234 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -480,6 +480,9 @@ compile_error! { ))] compile_error!("Only features sync,macros,io-util,rt,time are supported on wasm."); +#[cfg(all(not(tokio_unstable), feature = "io-uring"))] +compile_error!("The `io-uring` feature requires `--cfg tokio_unstable`."); + #[cfg(all(not(tokio_unstable), tokio_taskdump))] compile_error!("The `tokio_taskdump` feature requires `--cfg tokio_unstable`."); From 95edd8515ee863ac550849af0a741c889ebdf314 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 30 Sep 2025 14:35:46 +0200 Subject: [PATCH 13/14] docs: fix some docs links (#7654) --- tokio-util/src/codec/mod.rs | 2 +- tokio/src/runtime/mod.rs | 1 - tokio/src/sync/broadcast.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tokio-util/src/codec/mod.rs b/tokio-util/src/codec/mod.rs index a03c0b943d4..43ed38c9270 100644 --- a/tokio-util/src/codec/mod.rs +++ b/tokio-util/src/codec/mod.rs @@ -313,7 +313,7 @@ //! [`AsyncWrite`]: tokio::io::AsyncWrite //! [`Stream`]: futures_core::Stream //! [`Sink`]: futures_sink::Sink -//! [`SinkExt`]: futures::sink::SinkExt +//! [`SinkExt`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close //! [`FramedRead`]: struct@crate::codec::FramedRead //! [`FramedWrite`]: struct@crate::codec::FramedWrite diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 3a74dda2d73..a70e76aa29c 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -177,7 +177,6 @@ //! [`tokio::main`]: ../attr.main.html //! [runtime builder]: crate::runtime::Builder //! [`Runtime::new`]: crate::runtime::Runtime::new -//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler //! [`Builder::enable_io`]: crate::runtime::Builder::enable_io //! [`Builder::enable_time`]: crate::runtime::Builder::enable_time //! [`Builder::enable_all`]: crate::runtime::Builder::enable_all diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 8a1cb5af55e..05dad6393d9 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1170,7 +1170,7 @@ impl Receiver { /// Returns true if there aren't any messages in the channel that the [`Receiver`] /// has yet to receive. /// - /// [`Receiver]: create::sync::broadcast::Receiver + /// [`Receiver`]: crate::sync::broadcast::Receiver /// /// # Examples /// From 9f0b8f3243f538bf930a8c18ce605b1a740ccaf7 Mon Sep 17 00:00:00 2001 From: Motoyuki Kimura Date: Tue, 30 Sep 2025 21:52:55 +0900 Subject: [PATCH 14/14] fix cfg --- tokio/src/fs/write.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 01c9bebc881..543f97fd40a 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -27,7 +27,13 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re let path = path.as_ref(); let contents = crate::util::as_ref::upgrade(contents); - #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))] + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] { let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); @@ -39,7 +45,13 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re write_spawn_blocking(path, contents).await } -#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))] +#[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { use crate::{fs::OpenOptions, runtime::driver::op::Op}; use std::os::fd::OwnedFd;