Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions io-uring-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ fn test<S: squeue::EntryMarker, C: cqueue::EntryMarker>(

// fs
tests::fs::test_file_write_read(&mut ring, &test)?;
tests::fs::test_pipe_write_read_multi(&mut ring, &test)?;
tests::fs::test_file_writev_readv(&mut ring, &test)?;
tests::fs::test_file_cur_pos(&mut ring, &test)?;
tests::fs::test_file_fsync(&mut ring, &test)?;
Expand Down
105 changes: 105 additions & 0 deletions io-uring-test/src/tests/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,111 @@ pub fn test_file_write_read<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
Ok(())
}

pub fn test_pipe_write_read_multi<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
ring: &mut IoUring<S, C>,
test: &Test,
) -> anyhow::Result<()> {
require!(
test;
test.probe.is_supported(opcode::Write::CODE);
test.probe.is_supported(opcode::ReadMulti::CODE);
test.probe.is_supported(opcode::ProvideBuffers::CODE);
);

println!("test pipe_write_read_multi");

// Create a pipe for testing
let mut pipefds = [0; 2];
let res = unsafe { libc::pipe(pipefds.as_mut_ptr()) };
if res < 0 {
return Err(anyhow::anyhow!("Failed to create pipe"));
}

let read_fd = types::Fd(pipefds[0]);
let write_fd = types::Fd(pipefds[1]);

// Prepare data to write and buffers to read into
let mut input = vec![0xde; 1024];
input.extend_from_slice(&[0xad; 256]);

// NOTE: we use 3 buffers for the last EOF is also taking up a buffer
let mut bufs = vec![0; 3 * 1024];

// Provide buffers for multi-shot reads
let provide_bufs_e = opcode::ProvideBuffers::new(bufs.as_mut_ptr(), 1024, 3, 0xbeef, 0);

unsafe {
ring.submission()
.push(&provide_bufs_e.build().user_data(0x31).into())
.expect("queue is full");
}

ring.submit_and_wait(1)?;

let cqe: cqueue::Entry = ring.completion().next().expect("cqueue is empty").into();
assert_eq!(cqe.user_data(), 0x31);
assert_eq!(cqe.result(), 0);

// Write data to the pipe
let write_e = opcode::Write::new(write_fd, input.as_ptr(), input.len() as _);

unsafe {
ring.submission()
.push(&write_e.build().user_data(0x32).into())
.expect("queue is full");
}

ring.submit_and_wait(1)?;

let cqe: cqueue::Entry = ring.completion().next().expect("cqueue is empty").into();
assert_eq!(cqe.user_data(), 0x32);
assert_eq!(cqe.result(), input.len() as i32);

// Issue multi-shot read using a buf_group with 1024 length buffers
let read_multi_e = opcode::ReadMulti::new(read_fd, 0xbeef)
.build()
.user_data(0x33)
.into();

unsafe {
ring.submission()
.push(&read_multi_e)
.expect("queue is full");
}

// Close the write end to ensure we get an EOF completion
let cres = unsafe { libc::close(write_fd.0) };
if cres < 0 {
return Err(anyhow::anyhow!("Failed to close file descriptor"));
}

ring.submit_and_wait(3)?;

let cqes: Vec<cqueue::Entry> = ring.completion().map(Into::into).collect();
assert_eq!(cqes.len(), 3);

// First read should get 1024 bytes
assert_eq!(cqes[0].user_data(), 0x33);
assert_eq!(cqes[0].result(), 1024);
assert!(cqueue::more(cqes[0].flags()));
assert_eq!(cqueue::buffer_select(cqes[0].flags()), Some(0));
assert_eq!(&bufs[..1024], &input[..1024]);

// Second read should get the remaining 256 bytes
assert_eq!(cqes[1].user_data(), 0x33);
assert_eq!(cqes[1].result(), 256);
assert!(cqueue::more(cqes[1].flags()));
assert_eq!(cqueue::buffer_select(cqes[1].flags()), Some(1));
assert_eq!(&bufs[1024..][..256], &input[1024..][..256]);

// Final completion should indicate EOF
assert_eq!(cqes[2].user_data(), 0x33);
assert!(!cqueue::more(cqes[2].flags()));
assert_eq!(cqes[2].result(), 0); // 0 indicates EOF for read operations

Ok(())
}

pub fn test_file_writev_readv<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
ring: &mut IoUring<S, C>,
test: &Test,
Expand Down
41 changes: 41 additions & 0 deletions src/opcode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,47 @@ opcode! {
}
}

opcode! {
/// Read multiple times from a file, equivalent to `pread(2)`.
///
/// Parameter:
/// buf_group: The id of the provided buffer pool to use for each received chunk.
///
/// MSG_WAITALL should not be set in flags.
///
/// The multishot version allows the application to issue a single read request, which
/// repeatedly posts a CQE when data is available. Each CQE will take a buffer out of a
/// provided buffer pool for receiving. The application should check the flags of each CQE,
/// regardless of its result. If a posted CQE does not have the IORING_CQE_F_MORE flag set then
/// the multishot read will be done and the application should issue a new request.
///
/// Multishot read is available since kernel 6.7.
/// Multishot read is suggested since kernel 6.7.2, see: https://github.com/axboe/liburing/issues/1041

pub struct ReadMulti {
fd: { impl sealed::UseFixed },
buf_group: { u16 },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to provide a way to specify len and offset?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it should act the exact same as RecvMulti - you cannot specify anything other then which buffer-group to use, fd and flags (which I think none are not used at kernel-side right now)

Copy link
Member

@quininer quininer Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

;;
ioprio: u16 = 0,
flags: i32 = 0
}

pub const CODE = sys::IORING_OP_READ_MULTISHOT;

pub fn build(self) -> Entry {
let ReadMulti { fd, buf_group, flags, ioprio } = self;

let mut sqe = sqe_zeroed();
sqe.opcode = Self::CODE;
assign_fd!(sqe.fd = fd);
sqe.__bindgen_anon_3.msg_flags = flags as _;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this flags mean and why does it write to msg_flags?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above - the API is similar to RecvMulti
flags are here in case kernel maintainers support any custom flags and we would like to use them, but default to 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the kernel doesn't read msg_flags, right?

sqe.__bindgen_anon_4.buf_group = buf_group;
sqe.flags |= crate::squeue::Flags::BUFFER_SELECT.bits();
sqe.ioprio = ioprio;
Entry(sqe)
}
}

opcode! {
/// Issue the equivalent of a `pread(2)` or `pwrite(2)` system call
///
Expand Down
Loading