Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod reader;

use self::recursive::{Context, Method};
pub mod recursive;
pub mod reflink;

async fn handle_replace(
msg_tx: Arc<TokioMutex<Sender<Message>>>,
Expand Down
137 changes: 72 additions & 65 deletions src/operation/recursive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Instant;
use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc};
use walkdir::WalkDir;

use super::reflink::reflink_copy;
use super::{copy_unique_path, Controller, OperationSelection, ReplaceResult};

pub enum Method {
Expand Down Expand Up @@ -270,82 +271,88 @@ impl Op {
}
}

let (from_file, metadata, mut to_file) = futures::try_join!(
async {
compio::fs::OpenOptions::new()
.read(true)
.open(&self.from)
.await
},
compio::fs::metadata(&self.from),
// This is atomic and ensures `to` is not created by any other process
async {
compio::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&self.to)
.await
}
)?;

progress.total_bytes = Some(metadata.len());
(ctx.on_progress)(self, &progress);
if let Err(err) = to_file.set_permissions(metadata.permissions()).await {
// This error is not propagated upwards as some filesystems do not support setting permissions
log::warn!("failed to set permissions for {:?}: {}", self.to, err);
}
// Try reflink copy first (copy-on-write), fall back to standard copy
match reflink_copy(&self.from, &self.to).await {
Ok(_) => {}
Err(_) => {
let (from_file, metadata, mut to_file) = futures::try_join!(
async {
compio::fs::OpenOptions::new()
.read(true)
.open(&self.from)
.await
},
compio::fs::metadata(&self.from),
// This is atomic and ensures `to` is not created by any other process
async {
compio::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&self.to)
.await
}
)?;

// Prevent spamming the progress callbacks.
let mut last_progress_update = Instant::now();
// io_uring/IOCP requires transferring ownership of the buffer to the kernel.
let mut buf_in = std::mem::take(&mut ctx.buf);
// Track where the current read/write position is at.
let mut pos = 0;
progress.total_bytes = Some(metadata.len());
(ctx.on_progress)(self, &progress);
if let Err(err) = to_file.set_permissions(metadata.permissions()).await {
// This error is not propagated upwards as some filesystems do not support setting permissions
log::warn!("failed to set permissions for {:?}: {}", self.to, err);
}

loop {
let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await;
// Prevent spamming the progress callbacks.
let mut last_progress_update = Instant::now();
// io_uring/IOCP requires transferring ownership of the buffer to the kernel.
let mut buf_in = std::mem::take(&mut ctx.buf);
// Track where the current read/write position is at.
let mut pos = 0;

loop {
let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await;

let count = match result {
Ok(0) => {
ctx.buf = buf_out;
break;
}
Ok(count) => count,
Err(why) => {
ctx.buf = buf_out;
return Err(why.into());
}
};

let count = match result {
Ok(0) => {
ctx.buf = buf_out;
break;
}
Ok(count) => count,
Err(why) => {
ctx.buf = buf_out;
return Err(why.into());
}
};
let BufResult(result, buf_out_slice) =
to_file.write_at(buf_out.slice(..count), pos).await;
let buf_out = buf_out_slice.into_inner();

let BufResult(result, buf_out_slice) =
to_file.write_at(buf_out.slice(..count), pos).await;
let buf_out = buf_out_slice.into_inner();
if let Err(why) = result {
ctx.buf = buf_out;
return Err(why.into());
}

if let Err(why) = result {
ctx.buf = buf_out;
return Err(why.into());
}
progress.current_bytes += count as u64;
pos += count as u64;

progress.current_bytes += count as u64;
pos += count as u64;
// Avoid spamming progress messages too early.
let current = Instant::now();
if current.duration_since(last_progress_update).as_millis() > 49 {
last_progress_update = current;
(ctx.on_progress)(self, &progress);

// Avoid spamming progress messages too early.
let current = Instant::now();
if current.duration_since(last_progress_update).as_millis() > 49 {
last_progress_update = current;
(ctx.on_progress)(self, &progress);
// Also check if the progress was cancelled.
if let Err(why) = ctx.controller.check().await {
ctx.buf = buf_out;
return Err(why.into());
}
}

// Also check if the progress was cancelled.
if let Err(why) = ctx.controller.check().await {
ctx.buf = buf_out;
return Err(why.into());
buf_in = buf_out;
}
}

buf_in = buf_out;
to_file.sync_all().await?;
}
}

to_file.sync_all().await?;
}
OpKind::Move { cross_device_copy } => {
// Remove `to` if overwriting and it is an existing file
Expand Down
47 changes: 47 additions & 0 deletions src/operation/reflink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::io;
use std::path::Path;

/// Attempt to perform a reflink (copy-on-write) copy of a file
pub async fn reflink_copy(from: &Path, to: &Path) -> Result<(), io::Error> {
// Use blocking IO for the reflink operation since compio doesn't expose ioctl
let from_path = from.to_path_buf();
let to_path = to.to_path_buf();

tokio::task::spawn_blocking(move || {
#[cfg(target_os = "linux")]
{
use libc::{ioctl, FICLONE};
use std::fs::File;
use std::os::unix::io::AsRawFd;

let src_file = File::open(&from_path)?;
let dst_file = File::create(&to_path)?;

let src_fd = src_file.as_raw_fd();
let dst_fd = dst_file.as_raw_fd();

// Call the FICLONE ioctl to perform the reflink copy
let result = unsafe { ioctl(dst_fd, FICLONE, src_fd) };

if result == 0 {
Ok(())
} else {
// Clean up on failure - remove the empty file
let _ = std::fs::remove_file(&to_path);
let errno = unsafe { *libc::__errno_location() };
Err(io::Error::from_raw_os_error(errno))
}
}

// For non-Linux systems, return an error indicating reflink is not supported
#[cfg(not(target_os = "linux"))]
{
Err(io::Error::new(
io::ErrorKind::Unsupported,
"Reflink copy not supported on this platform",
))
}
})
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
}