diff --git a/crates/pm/src/util/downloader.rs b/crates/pm/src/util/downloader.rs index 5ac1a8652..1ac2465af 100644 --- a/crates/pm/src/util/downloader.rs +++ b/crates/pm/src/util/downloader.rs @@ -4,11 +4,9 @@ use futures::StreamExt; use once_cell::sync::Lazy; use reqwest::StatusCode; use reqwest::{Client, Response}; +use std::collections::HashSet; use std::{fs::Permissions, os::unix::fs::PermissionsExt, path::Path}; -use tokio::{ - fs::{File, set_permissions}, - io::AsyncReadExt, -}; +use tokio::fs::{File, set_permissions}; use tokio_retry::RetryIf; use tokio_tar::Archive; use tokio_util::io::StreamReader; @@ -103,9 +101,7 @@ pub async fn download(url: &str, dest: &Path) -> Result<()> { // Stream-based unpacking directly from HTTP Response async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()> { - use std::sync::Arc; - use tokio::sync::{Semaphore, mpsc}; - + let mut join_set = tokio::task::JoinSet::new(); tokio::fs::create_dir_all(dest) .await .with_context(|| format!("Failed to create destination directory: {}", dest.display()))?; @@ -116,164 +112,79 @@ async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()> .map(|result| result.map_err(std::io::Error::other)); let stream_reader = StreamReader::new(stream); - // Create pipeline processing channels - let (entry_tx, mut entry_rx) = mpsc::channel::(500); - let dest = dest.to_path_buf(); - // Stage 1: Streaming tar extraction - let extraction_task = { - let entry_tx = entry_tx.clone(); - let dest = dest.clone(); - - tokio::spawn(async move { - // Create streaming gzip decoder - let gzip_decoder = GzipDecoder::new(stream_reader); - let mut tar_archive = Archive::new(gzip_decoder); - let mut entries = tar_archive.entries()?; - - while let Some(entry_result) = entries.next().await { - let mut entry = entry_result.with_context(|| "Failed to read tar entry")?; - let path = entry - .path() - .with_context(|| "Failed to get entry path")? - .into_owned(); - let full_path = dest.join(&path); - let is_dir = entry.header().entry_type().is_dir(); - - // Only process files, skip directories (they'll be created when writing files) - if !is_dir { - // Stream file content - let mut content = Vec::new(); - entry.read_to_end(&mut content).await.with_context(|| { - format!( - "Failed to read file content from tar entry: {}", - path.display() - ) - })?; - - // Extract file permission mode - let mode = entry.header().mode().unwrap_or(0o644); - - let size = content.len(); - let extracted_entry = ExtractedEntry { - path: full_path, - content, - size, - mode, - }; - - if entry_tx.send(extracted_entry).await.is_err() { - break; - } - } - } - - Ok::<(), anyhow::Error>(()) - }) - }; - - // Stage 2: Concurrent file writing with cached directory creation - let file_writing_task = { - tokio::spawn(async move { - use dashmap::DashSet; - - let semaphore = Arc::new(Semaphore::new(16)); - let created_dirs = Arc::new(DashSet::::new()); - let mut write_tasks = Vec::new(); - let mut batch_size = 0; - let mut total_bytes = 0; - const MAX_BATCH_SIZE: usize = 100; - const MAX_BATCH_BYTES: usize = 50 * 1024 * 1024; // 50MB - - while let Some(entry) = entry_rx.recv().await { - let semaphore = Arc::clone(&semaphore); - let created_dirs = Arc::clone(&created_dirs); - batch_size += 1; - total_bytes += entry.size; - - let task = tokio::spawn(async move { - let _permit = semaphore.acquire().await.unwrap(); - - // Ensure parent directory exists using cache - if let Some(parent) = entry.path.parent() { - let parent_path = parent.to_path_buf(); - - // Check cache first to avoid duplicate directory creation - if !created_dirs.contains(&parent_path) { - if let Err(e) = tokio::fs::create_dir_all(&parent_path).await { - log_verbose(&format!( - "Failed to create parent dir {}: {}", - parent_path.display(), - e - )); - return Err(anyhow::anyhow!( - "Failed to create parent directory: {}", - e - ) - .context(format!("Parent directory: {}", parent_path.display()))); - } - - created_dirs.insert(parent_path); - } - } - - // Write file content - if let Err(e) = tokio::fs::write(&entry.path, &entry.content).await { - log_verbose(&format!( - "Failed to write file {}: {}", - entry.path.display(), - e - )); - return Err(anyhow::anyhow!("Write failed: {}", e) - .context(format!("File path: {}", entry.path.display()))); - } - - // Set original file permissions from tar entry - let permissions = Permissions::from_mode(entry.mode); - if let Err(e) = tokio::fs::set_permissions(&entry.path, permissions).await { + let mut created_dirs = HashSet::::new(); + + // Create streaming gzip decoder + let gzip_decoder = GzipDecoder::new(stream_reader); + let mut tar_archive = Archive::new(gzip_decoder); + let mut entries = tar_archive.entries()?; + + while let Some(entry_result) = entries.next().await { + let mut entry = entry_result.with_context(|| "Failed to read tar entry")?; + let path = entry + .path() + .with_context(|| "Failed to get entry path")? + .into_owned(); + let full_path = &dest.join(&path); + let is_dir = entry.header().entry_type().is_dir(); + + // Only process files, skip directories (they'll be created when writing files) + if !is_dir { + // Extract file permission mode + let mode = entry.header().mode().unwrap_or(0o644); + + if let Some(parent) = full_path.parent() { + let parent_path = parent.to_path_buf(); + + // Check cache first to avoid duplicate directory creation + if !created_dirs.contains(&parent_path) { + if let Err(e) = tokio::fs::create_dir_all(&parent_path).await { log_verbose(&format!( - "Failed to set permissions {}: {}", - entry.path.display(), + "Failed to create parent dir {}: {}", + parent_path.display(), e )); + return Err(anyhow::anyhow!("Failed to create parent directory: {}", e) + .context(format!("Parent directory: {}", parent_path.display()))); } - Ok::<(), anyhow::Error>(()) - }); - - write_tasks.push(task); - - // Process in batches to manage memory and concurrency - if batch_size >= MAX_BATCH_SIZE - || total_bytes >= MAX_BATCH_BYTES - || entry_rx.is_empty() - { - for task in write_tasks.drain(..) { - task.await??; - } - batch_size = 0; - total_bytes = 0; + created_dirs.insert(parent_path); } } - // Wait for remaining tasks - for task in write_tasks { - task.await??; + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&full_path) + .await + .context(format!("Failed to open file {}", full_path.display()))?; + + { + let full_path = full_path.clone(); + join_set.spawn(async move { + if let Err(err) = tokio::io::copy(&mut entry, &mut file).await { + log_verbose(&format!("Failed to write file {}", full_path.display())); + panic!("{err}") + } + }); } - Ok::<(), anyhow::Error>(()) - }) - }; - - // Close sender channel - drop(entry_tx); - - // Wait for both stages to complete - let (extract_result, write_result) = tokio::try_join!(extraction_task, file_writing_task)?; + // Set original file permissions from tar entry + let permissions = Permissions::from_mode(mode); + if let Err(e) = tokio::fs::set_permissions(&full_path, permissions).await { + log_verbose(&format!( + "Failed to set permissions {}: {}", + full_path.display(), + e + )); + } + } + } - extract_result?; - write_result?; + join_set.join_all().await; // Set directory permissions and create resolution marker set_permissions(&dest, Permissions::from_mode(0o755)) @@ -286,14 +197,6 @@ async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()> Ok(()) } -#[derive(Debug)] -struct ExtractedEntry { - path: std::path::PathBuf, - content: Vec, - size: usize, - mode: u32, // File permission mode -} - #[cfg(test)] mod tests { use super::*;