Skip to content

Commit 5e8b2df

Browse files
committed
restructure code
1 parent a96556c commit 5e8b2df

File tree

6 files changed

+138
-24
lines changed

6 files changed

+138
-24
lines changed

tokio/src/fs/mocks.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
use mockall::mock;
33

44
use crate::sync::oneshot;
5+
#[cfg(all(test, unix))]
6+
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
57
use std::{
68
cell::RefCell,
79
collections::VecDeque,
@@ -89,6 +91,14 @@ impl Write for &'_ MockFile {
8991
}
9092
}
9193

94+
#[cfg(all(test, unix))]
95+
impl From<MockFile> for OwnedFd {
96+
#[inline]
97+
fn from(file: MockFile) -> OwnedFd {
98+
unsafe { OwnedFd::from_raw_fd(file.as_raw_fd()) }
99+
}
100+
}
101+
92102
tokio_thread_local! {
93103
static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new())
94104
}

tokio/src/fs/write.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ pub async fn write(path: impl AsRef<Path>, contents: impl AsRef<[u8]>) -> io::Re
4242

4343
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
4444
async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> {
45-
use crate::{fs::OpenOptions, runtime::driver::op::Op};
46-
use std::os::fd::AsFd;
45+
use crate::{fs::OpenOptions, io::uring::utils::SharedFd, runtime::driver::op::Op};
4746

4847
let file = OpenOptions::new()
4948
.write(true)
@@ -52,15 +51,16 @@ async fn write_uring(path: &Path, contents: OwnedBuf) -> io::Result<()> {
5251
.open(path)
5352
.await?;
5453

54+
let fd = SharedFd::try_from(file).expect("unexpected in-flight operation detected");
55+
5556
let mut pos = 0;
56-
let fd = file.as_fd();
57-
let mut buf = contents.as_ref();
57+
let mut buf = contents;
5858
while !buf.is_empty() {
59-
let n = Op::write_at(fd, buf, pos)?.await? as usize;
59+
let n = Op::write_at(fd.clone(), buf.clone(), pos)?.await? as usize;
6060
if n == 0 {
6161
return Err(io::ErrorKind::WriteZero.into());
6262
}
63-
buf = &buf[n..];
63+
buf.advance(n);
6464
pos += n as u64;
6565
}
6666

tokio/src/io/uring/utils.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,35 @@
1+
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
12
use std::os::unix::ffi::OsStrExt;
3+
use std::sync::Arc;
24
use std::{ffi::CString, io, path::Path};
35

46
pub(crate) fn cstr(p: &Path) -> io::Result<CString> {
57
Ok(CString::new(p.as_os_str().as_bytes())?)
68
}
9+
10+
/// A reference-counted file descriptor.
11+
#[derive(Clone, Debug)]
12+
pub(crate) struct SharedFd {
13+
inner: Arc<OwnedFd>,
14+
}
15+
16+
impl SharedFd {
17+
pub(crate) fn new(fd: OwnedFd) -> SharedFd {
18+
SharedFd {
19+
inner: Arc::new(fd),
20+
}
21+
}
22+
}
23+
24+
impl AsRawFd for SharedFd {
25+
fn as_raw_fd(&self) -> RawFd {
26+
self.inner.as_raw_fd()
27+
}
28+
}
29+
30+
impl TryFrom<crate::fs::File> for SharedFd {
31+
type Error = crate::fs::File;
32+
fn try_from(value: crate::fs::File) -> Result<Self, Self::Error> {
33+
Ok(SharedFd::new(OwnedFd::from(value.try_into_std()?)))
34+
}
35+
}

tokio/src/io/uring/write.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
2-
use io_uring::{opcode, types};
3-
use std::{
4-
cmp, io,
5-
os::fd::{AsRawFd, BorrowedFd},
1+
use crate::{
2+
io::uring::utils::SharedFd,
3+
runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op},
4+
util::as_ref::OwnedBuf,
65
};
6+
use io_uring::{opcode, types};
7+
use std::{io, os::fd::AsRawFd};
78

89
#[derive(Debug)]
9-
pub(crate) struct Write;
10+
pub(crate) struct Write {
11+
_fd: SharedFd,
12+
_buf: OwnedBuf,
13+
}
1014

1115
impl Completable for Write {
1216
// The number of bytes written.
@@ -24,17 +28,18 @@ impl Cancellable for Write {
2428
}
2529

2630
impl Op<Write> {
27-
pub(crate) fn write_at(fd: BorrowedFd<'_>, buf: &[u8], offset: u64) -> io::Result<Self> {
31+
pub(crate) fn write_at(fd: SharedFd, buf: OwnedBuf, offset: u64) -> io::Result<Self> {
2832
// There is a cap on how many bytes we can write in a single uring write operation.
2933
// ref: https://github.com/axboe/liburing/discussions/497
30-
let len: u32 = cmp::min(buf.len(), u32::MAX as usize) as u32;
34+
let len: u32 = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
3135

32-
let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), buf.as_ptr(), len)
36+
let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), buf.as_ref().as_ptr(), len)
3337
.offset(offset)
3438
.build();
3539

36-
// SAFETY: TODO
37-
let op = unsafe { Op::new(sqe, Write) };
40+
// SAFETY: `fd` and `buf` are owned by this `Op`, ensuring that these params are valid
41+
// until `Op::drop` gets called.
42+
let op = unsafe { Op::new(sqe, Write { _buf: buf, _fd: fd }) };
3843
Ok(op)
3944
}
4045
}

tokio/src/runtime/driver/op.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ pub(crate) enum CancelData {
1717
// so `#[allow(dead_code)]` is needed.
1818
#[allow(dead_code)] Open,
1919
),
20-
Write(Write),
20+
Write(
21+
// This field isn't accessed directly, but it holds cancellation data,
22+
// so `#[allow(dead_code)]` is needed.
23+
#[allow(dead_code)] Write,
24+
),
2125
}
2226

2327
#[derive(Debug)]

tokio/src/util/as_ref.rs

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,69 @@
1+
use std::sync::Arc;
2+
3+
#[cfg(feature = "io-util")]
4+
use bytes::Buf;
5+
16
use super::typeid;
27

3-
#[derive(Debug)]
8+
#[derive(Debug, Clone)]
49
pub(crate) enum OwnedBuf {
5-
Vec(Vec<u8>),
10+
Vec(VecBuf),
611
#[cfg(feature = "io-util")]
712
Bytes(bytes::Bytes),
813
}
914

15+
#[allow(unused)]
16+
impl OwnedBuf {
17+
pub(crate) fn len(&self) -> usize {
18+
match self {
19+
OwnedBuf::Vec(vec) => vec.len(),
20+
#[cfg(feature = "io-util")]
21+
OwnedBuf::Bytes(b) => b.len(),
22+
}
23+
}
24+
25+
pub(crate) fn advance(&mut self, n: usize) {
26+
match self {
27+
OwnedBuf::Vec(vec) => vec.advance(n),
28+
#[cfg(feature = "io-util")]
29+
OwnedBuf::Bytes(b) => b.advance(n),
30+
}
31+
}
32+
33+
pub(crate) fn is_empty(&self) -> bool {
34+
match self {
35+
Self::Vec(vec) => vec.is_empty(),
36+
#[cfg(feature = "io-util")]
37+
Self::Bytes(bytes) => bytes.is_empty(),
38+
}
39+
}
40+
}
41+
42+
/// A Vec-like, reference-counted buffer.
43+
#[derive(Debug, Clone)]
44+
pub(crate) struct VecBuf {
45+
data: Arc<[u8]>,
46+
pos: usize,
47+
}
48+
49+
impl VecBuf {
50+
fn len(&self) -> usize {
51+
self.data.len().saturating_sub(self.pos)
52+
}
53+
54+
fn advance(&mut self, n: usize) {
55+
self.pos = self.pos.saturating_add(n);
56+
}
57+
58+
fn is_empty(&self) -> bool {
59+
self.data.len().saturating_sub(self.pos) == 0
60+
}
61+
}
62+
1063
impl AsRef<[u8]> for OwnedBuf {
1164
fn as_ref(&self) -> &[u8] {
1265
match self {
13-
Self::Vec(vec) => vec,
66+
Self::Vec(VecBuf { data, pos }) => &data[*pos..],
1467
#[cfg(feature = "io-util")]
1568
Self::Bytes(bytes) => bytes,
1669
}
@@ -19,12 +72,22 @@ impl AsRef<[u8]> for OwnedBuf {
1972

2073
pub(crate) fn upgrade<B: AsRef<[u8]>>(buf: B) -> OwnedBuf {
2174
let buf = match unsafe { typeid::try_transmute::<B, Vec<u8>>(buf) } {
22-
Ok(vec) => return OwnedBuf::Vec(vec),
75+
Ok(vec) => {
76+
return OwnedBuf::Vec(VecBuf {
77+
data: vec.into(),
78+
pos: 0,
79+
});
80+
}
2381
Err(original_buf) => original_buf,
2482
};
2583

2684
let buf = match unsafe { typeid::try_transmute::<B, String>(buf) } {
27-
Ok(string) => return OwnedBuf::Vec(string.into_bytes()),
85+
Ok(string) => {
86+
return OwnedBuf::Vec(VecBuf {
87+
data: string.into_bytes().into(),
88+
pos: 0,
89+
});
90+
}
2891
Err(original_buf) => original_buf,
2992
};
3093

@@ -34,5 +97,8 @@ pub(crate) fn upgrade<B: AsRef<[u8]>>(buf: B) -> OwnedBuf {
3497
Err(original_buf) => original_buf,
3598
};
3699

37-
OwnedBuf::Vec(buf.as_ref().to_owned())
100+
OwnedBuf::Vec(VecBuf {
101+
data: buf.as_ref().to_owned().into(),
102+
pos: 0,
103+
})
38104
}

0 commit comments

Comments
 (0)