From 94c37f55157e6fb08e8792f012a8dfcb5494ae08 Mon Sep 17 00:00:00 2001 From: Omer Shamash Date: Sun, 23 Feb 2025 13:51:34 +0200 Subject: [PATCH 1/2] feat: add support for ReadMulti opcode (6.7+) --- src/opcode.rs | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/opcode.rs b/src/opcode.rs index 72ee8464..ab820a74 100644 --- a/src/opcode.rs +++ b/src/opcode.rs @@ -900,6 +900,47 @@ opcode! { } } +opcode! { + /// Read multiple times from a file, equivalent to `pread(2)`. + /// + /// Parameter: + /// buf_group: The id of the provided buffer pool to use for each received chunk. + /// + /// MSG_WAITALL should not be set in flags. + /// + /// The multishot version allows the application to issue a single read request, which + /// repeatedly posts a CQE when data is available. Each CQE will take a buffer out of a + /// provided buffer pool for receiving. The application should check the flags of each CQE, + /// regardless of its result. If a posted CQE does not have the IORING_CQE_F_MORE flag set then + /// the multishot read will be done and the application should issue a new request. + /// + /// Multishot read is available since kernel 6.7. + /// Multishot read is suggested since kernel 6.7.2, see: https://github.com/axboe/liburing/issues/1041 + + pub struct ReadMulti { + fd: { impl sealed::UseFixed }, + buf_group: { u16 }, + ;; + ioprio: u16 = 0, + flags: i32 = 0 + } + + pub const CODE = sys::IORING_OP_READ_MULTISHOT; + + pub fn build(self) -> Entry { + let ReadMulti { fd, buf_group, flags, ioprio } = self; + + let mut sqe = sqe_zeroed(); + sqe.opcode = Self::CODE; + assign_fd!(sqe.fd = fd); + sqe.__bindgen_anon_3.msg_flags = flags as _; + sqe.__bindgen_anon_4.buf_group = buf_group; + sqe.flags |= crate::squeue::Flags::BUFFER_SELECT.bits(); + sqe.ioprio = ioprio; + Entry(sqe) + } +} + opcode! { /// Issue the equivalent of a `pread(2)` or `pwrite(2)` system call /// From 403d61b3b002298c1f5dd533831e5fe6450ed96f Mon Sep 17 00:00:00 2001 From: Omer Shamash Date: Tue, 25 Feb 2025 10:45:23 +0200 Subject: [PATCH 2/2] feat: add test for ReadMulti opcode --- io-uring-test/src/main.rs | 1 + io-uring-test/src/tests/fs.rs | 105 ++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/io-uring-test/src/main.rs b/io-uring-test/src/main.rs index 088c774f..4ac7dd9f 100644 --- a/io-uring-test/src/main.rs +++ b/io-uring-test/src/main.rs @@ -91,6 +91,7 @@ fn test( // fs tests::fs::test_file_write_read(&mut ring, &test)?; + tests::fs::test_pipe_write_read_multi(&mut ring, &test)?; tests::fs::test_file_writev_readv(&mut ring, &test)?; tests::fs::test_file_cur_pos(&mut ring, &test)?; tests::fs::test_file_fsync(&mut ring, &test)?; diff --git a/io-uring-test/src/tests/fs.rs b/io-uring-test/src/tests/fs.rs index ad12901a..d6fb2d46 100644 --- a/io-uring-test/src/tests/fs.rs +++ b/io-uring-test/src/tests/fs.rs @@ -27,6 +27,111 @@ pub fn test_file_write_read( Ok(()) } +pub fn test_pipe_write_read_multi( + ring: &mut IoUring, + test: &Test, +) -> anyhow::Result<()> { + require!( + test; + test.probe.is_supported(opcode::Write::CODE); + test.probe.is_supported(opcode::ReadMulti::CODE); + test.probe.is_supported(opcode::ProvideBuffers::CODE); + ); + + println!("test pipe_write_read_multi"); + + // Create a pipe for testing + let mut pipefds = [0; 2]; + let res = unsafe { libc::pipe(pipefds.as_mut_ptr()) }; + if res < 0 { + return Err(anyhow::anyhow!("Failed to create pipe")); + } + + let read_fd = types::Fd(pipefds[0]); + let write_fd = types::Fd(pipefds[1]); + + // Prepare data to write and buffers to read into + let mut input = vec![0xde; 1024]; + input.extend_from_slice(&[0xad; 256]); + + // NOTE: we use 3 buffers for the last EOF is also taking up a buffer + let mut bufs = vec![0; 3 * 1024]; + + // Provide buffers for multi-shot reads + let provide_bufs_e = opcode::ProvideBuffers::new(bufs.as_mut_ptr(), 1024, 3, 0xbeef, 0); + + unsafe { + ring.submission() + .push(&provide_bufs_e.build().user_data(0x31).into()) + .expect("queue is full"); + } + + ring.submit_and_wait(1)?; + + let cqe: cqueue::Entry = ring.completion().next().expect("cqueue is empty").into(); + assert_eq!(cqe.user_data(), 0x31); + assert_eq!(cqe.result(), 0); + + // Write data to the pipe + let write_e = opcode::Write::new(write_fd, input.as_ptr(), input.len() as _); + + unsafe { + ring.submission() + .push(&write_e.build().user_data(0x32).into()) + .expect("queue is full"); + } + + ring.submit_and_wait(1)?; + + let cqe: cqueue::Entry = ring.completion().next().expect("cqueue is empty").into(); + assert_eq!(cqe.user_data(), 0x32); + assert_eq!(cqe.result(), input.len() as i32); + + // Issue multi-shot read using a buf_group with 1024 length buffers + let read_multi_e = opcode::ReadMulti::new(read_fd, 0xbeef) + .build() + .user_data(0x33) + .into(); + + unsafe { + ring.submission() + .push(&read_multi_e) + .expect("queue is full"); + } + + // Close the write end to ensure we get an EOF completion + let cres = unsafe { libc::close(write_fd.0) }; + if cres < 0 { + return Err(anyhow::anyhow!("Failed to close file descriptor")); + } + + ring.submit_and_wait(3)?; + + let cqes: Vec = ring.completion().map(Into::into).collect(); + assert_eq!(cqes.len(), 3); + + // First read should get 1024 bytes + assert_eq!(cqes[0].user_data(), 0x33); + assert_eq!(cqes[0].result(), 1024); + assert!(cqueue::more(cqes[0].flags())); + assert_eq!(cqueue::buffer_select(cqes[0].flags()), Some(0)); + assert_eq!(&bufs[..1024], &input[..1024]); + + // Second read should get the remaining 256 bytes + assert_eq!(cqes[1].user_data(), 0x33); + assert_eq!(cqes[1].result(), 256); + assert!(cqueue::more(cqes[1].flags())); + assert_eq!(cqueue::buffer_select(cqes[1].flags()), Some(1)); + assert_eq!(&bufs[1024..][..256], &input[1024..][..256]); + + // Final completion should indicate EOF + assert_eq!(cqes[2].user_data(), 0x33); + assert!(!cqueue::more(cqes[2].flags())); + assert_eq!(cqes[2].result(), 0); // 0 indicates EOF for read operations + + Ok(()) +} + pub fn test_file_writev_readv( ring: &mut IoUring, test: &Test,