Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 63 additions & 160 deletions crates/pm/src/util/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ use futures::StreamExt;
use once_cell::sync::Lazy;
use reqwest::StatusCode;
use reqwest::{Client, Response};
use std::collections::HashSet;
use std::{fs::Permissions, os::unix::fs::PermissionsExt, path::Path};
use tokio::{
fs::{File, set_permissions},
io::AsyncReadExt,
};
use tokio::fs::{File, set_permissions};
use tokio_retry::RetryIf;
use tokio_tar::Archive;
use tokio_util::io::StreamReader;
Expand Down Expand Up @@ -103,9 +101,7 @@ pub async fn download(url: &str, dest: &Path) -> Result<()> {

// Stream-based unpacking directly from HTTP Response
async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()> {
use std::sync::Arc;
use tokio::sync::{Semaphore, mpsc};

let mut join_set = tokio::task::JoinSet::new();
tokio::fs::create_dir_all(dest)
.await
.with_context(|| format!("Failed to create destination directory: {}", dest.display()))?;
Expand All @@ -116,164 +112,79 @@ async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()>
.map(|result| result.map_err(std::io::Error::other));
let stream_reader = StreamReader::new(stream);

// Create pipeline processing channels
let (entry_tx, mut entry_rx) = mpsc::channel::<ExtractedEntry>(500);

let dest = dest.to_path_buf();

// Stage 1: Streaming tar extraction
let extraction_task = {
let entry_tx = entry_tx.clone();
let dest = dest.clone();

tokio::spawn(async move {
// Create streaming gzip decoder
let gzip_decoder = GzipDecoder::new(stream_reader);
let mut tar_archive = Archive::new(gzip_decoder);
let mut entries = tar_archive.entries()?;

while let Some(entry_result) = entries.next().await {
let mut entry = entry_result.with_context(|| "Failed to read tar entry")?;
let path = entry
.path()
.with_context(|| "Failed to get entry path")?
.into_owned();
let full_path = dest.join(&path);
let is_dir = entry.header().entry_type().is_dir();

// Only process files, skip directories (they'll be created when writing files)
if !is_dir {
// Stream file content
let mut content = Vec::new();
entry.read_to_end(&mut content).await.with_context(|| {
format!(
"Failed to read file content from tar entry: {}",
path.display()
)
})?;

// Extract file permission mode
let mode = entry.header().mode().unwrap_or(0o644);

let size = content.len();
let extracted_entry = ExtractedEntry {
path: full_path,
content,
size,
mode,
};

if entry_tx.send(extracted_entry).await.is_err() {
break;
}
}
}

Ok::<(), anyhow::Error>(())
})
};

// Stage 2: Concurrent file writing with cached directory creation
let file_writing_task = {
tokio::spawn(async move {
use dashmap::DashSet;

let semaphore = Arc::new(Semaphore::new(16));
let created_dirs = Arc::new(DashSet::<std::path::PathBuf>::new());
let mut write_tasks = Vec::new();
let mut batch_size = 0;
let mut total_bytes = 0;
const MAX_BATCH_SIZE: usize = 100;
const MAX_BATCH_BYTES: usize = 50 * 1024 * 1024; // 50MB

while let Some(entry) = entry_rx.recv().await {
let semaphore = Arc::clone(&semaphore);
let created_dirs = Arc::clone(&created_dirs);
batch_size += 1;
total_bytes += entry.size;

let task = tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();

// Ensure parent directory exists using cache
if let Some(parent) = entry.path.parent() {
let parent_path = parent.to_path_buf();

// Check cache first to avoid duplicate directory creation
if !created_dirs.contains(&parent_path) {
if let Err(e) = tokio::fs::create_dir_all(&parent_path).await {
log_verbose(&format!(
"Failed to create parent dir {}: {}",
parent_path.display(),
e
));
return Err(anyhow::anyhow!(
"Failed to create parent directory: {}",
e
)
.context(format!("Parent directory: {}", parent_path.display())));
}

created_dirs.insert(parent_path);
}
}

// Write file content
if let Err(e) = tokio::fs::write(&entry.path, &entry.content).await {
log_verbose(&format!(
"Failed to write file {}: {}",
entry.path.display(),
e
));
return Err(anyhow::anyhow!("Write failed: {}", e)
.context(format!("File path: {}", entry.path.display())));
}

// Set original file permissions from tar entry
let permissions = Permissions::from_mode(entry.mode);
if let Err(e) = tokio::fs::set_permissions(&entry.path, permissions).await {
let mut created_dirs = HashSet::<std::path::PathBuf>::new();

// Create streaming gzip decoder
let gzip_decoder = GzipDecoder::new(stream_reader);
let mut tar_archive = Archive::new(gzip_decoder);
let mut entries = tar_archive.entries()?;

while let Some(entry_result) = entries.next().await {
let mut entry = entry_result.with_context(|| "Failed to read tar entry")?;
let path = entry
.path()
.with_context(|| "Failed to get entry path")?
.into_owned();
let full_path = &dest.join(&path);
let is_dir = entry.header().entry_type().is_dir();

// Only process files, skip directories (they'll be created when writing files)
if !is_dir {
// Extract file permission mode
let mode = entry.header().mode().unwrap_or(0o644);

if let Some(parent) = full_path.parent() {
let parent_path = parent.to_path_buf();

// Check cache first to avoid duplicate directory creation
if !created_dirs.contains(&parent_path) {
if let Err(e) = tokio::fs::create_dir_all(&parent_path).await {
log_verbose(&format!(
"Failed to set permissions {}: {}",
entry.path.display(),
"Failed to create parent dir {}: {}",
parent_path.display(),
e
));
return Err(anyhow::anyhow!("Failed to create parent directory: {}", e)
.context(format!("Parent directory: {}", parent_path.display())));
}

Ok::<(), anyhow::Error>(())
});

write_tasks.push(task);

// Process in batches to manage memory and concurrency
if batch_size >= MAX_BATCH_SIZE
|| total_bytes >= MAX_BATCH_BYTES
|| entry_rx.is_empty()
{
for task in write_tasks.drain(..) {
task.await??;
}
batch_size = 0;
total_bytes = 0;
created_dirs.insert(parent_path);
}
}

// Wait for remaining tasks
for task in write_tasks {
task.await??;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&full_path)
.await
.context(format!("Failed to open file {}", full_path.display()))?;

{
let full_path = full_path.clone();
join_set.spawn(async move {
if let Err(err) = tokio::io::copy(&mut entry, &mut file).await {
log_verbose(&format!("Failed to write file {}", full_path.display()));
panic!("{err}")
}
});
}

Ok::<(), anyhow::Error>(())
})
};

// Close sender channel
drop(entry_tx);

// Wait for both stages to complete
let (extract_result, write_result) = tokio::try_join!(extraction_task, file_writing_task)?;
// Set original file permissions from tar entry
let permissions = Permissions::from_mode(mode);
if let Err(e) = tokio::fs::set_permissions(&full_path, permissions).await {
log_verbose(&format!(
"Failed to set permissions {}: {}",
full_path.display(),
e
));
}
}
}

extract_result?;
write_result?;
join_set.join_all().await;

// Set directory permissions and create resolution marker
set_permissions(&dest, Permissions::from_mode(0o755))
Expand All @@ -286,14 +197,6 @@ async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()>
Ok(())
}

#[derive(Debug)]
struct ExtractedEntry {
path: std::path::PathBuf,
content: Vec<u8>,
size: usize,
mode: u32, // File permission mode
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading