Skip to content

Commit 62d8fe9

Browse files
committed
fix Buffer ownership issue
1 parent 05c8c17 commit 62d8fe9

File tree

5 files changed

+82
-39
lines changed

5 files changed

+82
-39
lines changed

tokio/src/fs/mocks.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
use mockall::mock;
33

44
use crate::sync::oneshot;
5+
#[cfg(all(test, unix))]
6+
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
57
use std::{
68
cell::RefCell,
79
collections::VecDeque,
@@ -89,6 +91,14 @@ impl Write for &'_ MockFile {
8991
}
9092
}
9193

94+
#[cfg(all(test, unix))]
95+
impl From<MockFile> for OwnedFd {
96+
#[inline]
97+
fn from(file: MockFile) -> OwnedFd {
98+
unsafe { OwnedFd::from_raw_fd(file.as_raw_fd()) }
99+
}
100+
}
101+
92102
tokio_thread_local! {
93103
static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new())
94104
}

tokio/src/fs/write.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ pub async fn write(path: impl AsRef<Path>, contents: impl AsRef<[u8]>) -> io::Re
4040
}
4141

4242
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
43-
async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> {
43+
async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> {
4444
use crate::{fs::OpenOptions, runtime::driver::op::Op};
45-
use std::os::fd::AsFd;
45+
use std::os::fd::OwnedFd;
4646

4747
let file = OpenOptions::new()
4848
.write(true)
@@ -51,25 +51,27 @@ async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> {
5151
.open(path)
5252
.await?;
5353

54-
let fd = file.as_fd();
54+
let mut fd: OwnedFd = file
55+
.try_into_std()
56+
.expect("unexpected in-flight operation detected")
57+
.into();
5558

56-
let mut pos = 0;
57-
let mut buf = contents.as_ref();
58-
while !buf.is_empty() {
59-
// SAFETY:
60-
// If the operation completes successfully, `fd` and `buf` are still
61-
// alive within the scope of this function, so remain valid.
62-
//
63-
// In the case of cancellation, local variables within the scope of
64-
// this `async fn` are dropped in the reverse order of their declaration.
65-
// Therefore, `Op` is dropped before `fd` and `buf`, ensuring that the
66-
// operation finishes gracefully before these resources are dropped.
67-
let n = unsafe { Op::write_at(fd, buf, pos) }?.await? as usize;
59+
let total: usize = buf.len();
60+
let mut offset: usize = 0;
61+
62+
while offset < total {
63+
// There is a cap on how many bytes we can write in a single uring write operation.
64+
// ref: https://github.com/axboe/liburing/discussions/497
65+
let len = std::cmp::min(total - offset, u32::MAX as usize) as u32;
66+
67+
let (n, _buf, _fd) = Op::write_at(fd, buf, offset, len, offset as u64)?.await?;
6868
if n == 0 {
6969
return Err(io::ErrorKind::WriteZero.into());
7070
}
71-
buf = &buf[n..];
72-
pos += n as u64;
71+
72+
buf = _buf;
73+
fd = _fd;
74+
offset += n as usize;
7375
}
7476

7577
Ok(())

tokio/src/io/uring/write.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1-
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
1+
use crate::{
2+
runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op},
3+
util::as_ref::OwnedBuf,
4+
};
25
use io_uring::{opcode, types};
36
use std::{
4-
cmp, io,
5-
os::fd::{AsRawFd, BorrowedFd},
7+
io,
8+
os::fd::{AsRawFd, OwnedFd},
69
};
710

811
#[derive(Debug)]
9-
pub(crate) struct Write;
12+
pub(crate) struct Write {
13+
buf: OwnedBuf,
14+
fd: OwnedFd,
15+
}
1016

1117
impl Completable for Write {
12-
// The number of bytes written.
13-
type Output = u32;
18+
type Output = (u32, OwnedBuf, OwnedFd);
1419
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output> {
15-
cqe.result
20+
Ok((cqe.result?, self.buf, self.fd))
1621
}
1722
}
1823

@@ -23,23 +28,29 @@ impl Cancellable for Write {
2328
}
2429

2530
impl Op<Write> {
26-
/// # SAFETY
27-
///
28-
/// The caller must ensure that `fd` and `buf` remain valid until the
29-
/// operation finishes (or gets cancelled) and the `Op::drop` completes.
30-
/// Otherwise, the kernel could access freed memory, breaking soundness.
31-
pub(crate) unsafe fn write_at(fd: BorrowedFd<'_>, buf: &[u8], offset: u64) -> io::Result<Self> {
32-
// There is a cap on how many bytes we can write in a single uring write operation.
33-
// ref: https://github.com/axboe/liburing/discussions/497
34-
let len: u32 = cmp::min(buf.len(), u32::MAX as usize) as u32;
31+
/// Issue a write that starts at `buf_offset` within `buf` and writes `len` bytes
32+
/// into `file` at `file_offset`.
33+
pub(crate) fn write_at(
34+
fd: OwnedFd,
35+
buf: OwnedBuf,
36+
buf_offset: usize,
37+
len: u32,
38+
file_offset: u64,
39+
) -> io::Result<Self> {
40+
// Check if `buf_offset` stays in bounds of the allocation
41+
debug_assert!(buf_offset + len as usize <= buf.len());
3542

36-
let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), buf.as_ptr(), len)
37-
.offset(offset)
43+
// SAFETY:
44+
// - `buf_offset` stays in bounds of the allocation.
45+
// - `buf` is derived from an actual allocation, and the entire memory
46+
// range is in bounds of that allocation.
47+
let ptr = unsafe { buf.as_ptr().add(buf_offset) };
48+
let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), ptr, len)
49+
.offset(file_offset)
3850
.build();
3951

40-
// SAFETY: `fd` and `buf` valid until the operation completes or gets cancelled
41-
// and the `Op::drop` completes.
42-
let op = unsafe { Op::new(sqe, Write) };
52+
// SAFETY: parameters of the entry is valid until this operation completes.
53+
let op = unsafe { Op::new(sqe, Write { buf, fd }) };
4354
Ok(op)
4455
}
4556
}

tokio/src/runtime/driver/op.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ pub(crate) enum CancelData {
1717
// so `#[allow(dead_code)]` is needed.
1818
#[allow(dead_code)] Open,
1919
),
20-
Write(Write),
20+
Write(
21+
// This field isn't accessed directly, but it holds cancellation data,
22+
// so `#[allow(dead_code)]` is needed.
23+
#[allow(dead_code)] Write,
24+
),
2125
}
2226

2327
#[derive(Debug)]

tokio/src/util/as_ref.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,22 @@ pub(crate) enum OwnedBuf {
77
Bytes(bytes::Bytes),
88
}
99

10+
impl OwnedBuf {
11+
cfg_tokio_uring! {
12+
pub(crate) fn len(&self) -> usize {
13+
match self {
14+
Self::Vec(vec) => vec.len(),
15+
#[cfg(feature = "io-util")]
16+
Self::Bytes(bytes) => bytes.len(),
17+
}
18+
}
19+
20+
pub(crate) fn as_ptr(&self) -> *const u8 {
21+
self.as_ref().as_ptr()
22+
}
23+
}
24+
}
25+
1026
impl AsRef<[u8]> for OwnedBuf {
1127
fn as_ref(&self) -> &[u8] {
1228
match self {

0 commit comments

Comments
 (0)