diff --git a/src/operation/mod.rs b/src/operation/mod.rs index ce2ac6d1..d28b28ae 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -29,6 +29,7 @@ pub mod reader; use self::recursive::{Context, Method}; pub mod recursive; +pub mod reflink; async fn handle_replace( msg_tx: Arc>>, diff --git a/src/operation/recursive.rs b/src/operation/recursive.rs index bc2fdc33..0048801f 100644 --- a/src/operation/recursive.rs +++ b/src/operation/recursive.rs @@ -7,6 +7,7 @@ use std::time::Instant; use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc}; use walkdir::WalkDir; +use super::reflink::reflink_copy; use super::{copy_unique_path, Controller, OperationSelection, ReplaceResult}; pub enum Method { @@ -270,82 +271,88 @@ impl Op { } } - let (from_file, metadata, mut to_file) = futures::try_join!( - async { - compio::fs::OpenOptions::new() - .read(true) - .open(&self.from) - .await - }, - compio::fs::metadata(&self.from), - // This is atomic and ensures `to` is not created by any other process - async { - compio::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&self.to) - .await - } - )?; - - progress.total_bytes = Some(metadata.len()); - (ctx.on_progress)(self, &progress); - if let Err(err) = to_file.set_permissions(metadata.permissions()).await { - // This error is not propagated upwards as some filesystems do not support setting permissions - log::warn!("failed to set permissions for {:?}: {}", self.to, err); - } + // Try reflink copy first (copy-on-write), fall back to standard copy + match reflink_copy(&self.from, &self.to).await { + Ok(_) => {} + Err(_) => { + let (from_file, metadata, mut to_file) = futures::try_join!( + async { + compio::fs::OpenOptions::new() + .read(true) + .open(&self.from) + .await + }, + compio::fs::metadata(&self.from), + // This is atomic and ensures `to` is not created by any other process + async { + compio::fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&self.to) + .await + } + )?; - // Prevent spamming the progress callbacks. - let mut last_progress_update = Instant::now(); - // io_uring/IOCP requires transferring ownership of the buffer to the kernel. - let mut buf_in = std::mem::take(&mut ctx.buf); - // Track where the current read/write position is at. - let mut pos = 0; + progress.total_bytes = Some(metadata.len()); + (ctx.on_progress)(self, &progress); + if let Err(err) = to_file.set_permissions(metadata.permissions()).await { + // This error is not propagated upwards as some filesystems do not support setting permissions + log::warn!("failed to set permissions for {:?}: {}", self.to, err); + } - loop { - let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await; + // Prevent spamming the progress callbacks. + let mut last_progress_update = Instant::now(); + // io_uring/IOCP requires transferring ownership of the buffer to the kernel. + let mut buf_in = std::mem::take(&mut ctx.buf); + // Track where the current read/write position is at. + let mut pos = 0; + + loop { + let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await; + + let count = match result { + Ok(0) => { + ctx.buf = buf_out; + break; + } + Ok(count) => count, + Err(why) => { + ctx.buf = buf_out; + return Err(why.into()); + } + }; - let count = match result { - Ok(0) => { - ctx.buf = buf_out; - break; - } - Ok(count) => count, - Err(why) => { - ctx.buf = buf_out; - return Err(why.into()); - } - }; + let BufResult(result, buf_out_slice) = + to_file.write_at(buf_out.slice(..count), pos).await; + let buf_out = buf_out_slice.into_inner(); - let BufResult(result, buf_out_slice) = - to_file.write_at(buf_out.slice(..count), pos).await; - let buf_out = buf_out_slice.into_inner(); + if let Err(why) = result { + ctx.buf = buf_out; + return Err(why.into()); + } - if let Err(why) = result { - ctx.buf = buf_out; - return Err(why.into()); - } + progress.current_bytes += count as u64; + pos += count as u64; - progress.current_bytes += count as u64; - pos += count as u64; + // Avoid spamming progress messages too early. + let current = Instant::now(); + if current.duration_since(last_progress_update).as_millis() > 49 { + last_progress_update = current; + (ctx.on_progress)(self, &progress); - // Avoid spamming progress messages too early. - let current = Instant::now(); - if current.duration_since(last_progress_update).as_millis() > 49 { - last_progress_update = current; - (ctx.on_progress)(self, &progress); + // Also check if the progress was cancelled. + if let Err(why) = ctx.controller.check().await { + ctx.buf = buf_out; + return Err(why.into()); + } + } - // Also check if the progress was cancelled. - if let Err(why) = ctx.controller.check().await { - ctx.buf = buf_out; - return Err(why.into()); + buf_in = buf_out; } - } - buf_in = buf_out; + to_file.sync_all().await?; + } } - - to_file.sync_all().await?; } OpKind::Move { cross_device_copy } => { // Remove `to` if overwriting and it is an existing file diff --git a/src/operation/reflink.rs b/src/operation/reflink.rs new file mode 100644 index 00000000..c6134520 --- /dev/null +++ b/src/operation/reflink.rs @@ -0,0 +1,47 @@ +use std::io; +use std::path::Path; + +/// Attempt to perform a reflink (copy-on-write) copy of a file +pub async fn reflink_copy(from: &Path, to: &Path) -> Result<(), io::Error> { + // Use blocking IO for the reflink operation since compio doesn't expose ioctl + let from_path = from.to_path_buf(); + let to_path = to.to_path_buf(); + + tokio::task::spawn_blocking(move || { + #[cfg(target_os = "linux")] + { + use libc::{ioctl, FICLONE}; + use std::fs::File; + use std::os::unix::io::AsRawFd; + + let src_file = File::open(&from_path)?; + let dst_file = File::create(&to_path)?; + + let src_fd = src_file.as_raw_fd(); + let dst_fd = dst_file.as_raw_fd(); + + // Call the FICLONE ioctl to perform the reflink copy + let result = unsafe { ioctl(dst_fd, FICLONE, src_fd) }; + + if result == 0 { + Ok(()) + } else { + // Clean up on failure - remove the empty file + let _ = std::fs::remove_file(&to_path); + let errno = unsafe { *libc::__errno_location() }; + Err(io::Error::from_raw_os_error(errno)) + } + } + + // For non-Linux systems, return an error indicating reflink is not supported + #[cfg(not(target_os = "linux"))] + { + Err(io::Error::new( + io::ErrorKind::Unsupported, + "Reflink copy not supported on this platform", + )) + } + }) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? +}