Skip to content

Commit d688de4

Browse files
committed
refactor(pm): use read streaming write
1 parent 0adbc71 commit d688de4

File tree

1 file changed

+56
-162
lines changed

1 file changed

+56
-162
lines changed

crates/pm/src/util/downloader.rs

Lines changed: 56 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ use futures::StreamExt;
44
use once_cell::sync::Lazy;
55
use reqwest::StatusCode;
66
use reqwest::{Client, Response};
7+
use std::collections::HashSet;
78
use std::{fs::Permissions, os::unix::fs::PermissionsExt, path::Path};
8-
use tokio::{
9-
fs::{File, set_permissions},
10-
io::AsyncReadExt,
11-
};
9+
use tokio::fs::{File, set_permissions};
1210
use tokio_retry::RetryIf;
1311
use tokio_tar::Archive;
1412
use tokio_util::io::StreamReader;
@@ -103,9 +101,6 @@ pub async fn download(url: &str, dest: &Path) -> Result<()> {
103101

104102
// Stream-based unpacking directly from HTTP Response
105103
async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()> {
106-
use std::sync::Arc;
107-
use tokio::sync::{Semaphore, mpsc};
108-
109104
tokio::fs::create_dir_all(dest)
110105
.await
111106
.with_context(|| format!("Failed to create destination directory: {}", dest.display()))?;
@@ -116,164 +111,71 @@ async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()>
116111
.map(|result| result.map_err(std::io::Error::other));
117112
let stream_reader = StreamReader::new(stream);
118113

119-
// Create pipeline processing channels
120-
let (entry_tx, mut entry_rx) = mpsc::channel::<ExtractedEntry>(500);
121-
122114
let dest = dest.to_path_buf();
123115

124-
// Stage 1: Streaming tar extraction
125-
let extraction_task = {
126-
let entry_tx = entry_tx.clone();
127-
let dest = dest.clone();
128-
129-
tokio::spawn(async move {
130-
// Create streaming gzip decoder
131-
let gzip_decoder = GzipDecoder::new(stream_reader);
132-
let mut tar_archive = Archive::new(gzip_decoder);
133-
let mut entries = tar_archive.entries()?;
134-
135-
while let Some(entry_result) = entries.next().await {
136-
let mut entry = entry_result.with_context(|| "Failed to read tar entry")?;
137-
let path = entry
138-
.path()
139-
.with_context(|| "Failed to get entry path")?
140-
.into_owned();
141-
let full_path = dest.join(&path);
142-
let is_dir = entry.header().entry_type().is_dir();
143-
144-
// Only process files, skip directories (they'll be created when writing files)
145-
if !is_dir {
146-
// Stream file content
147-
let mut content = Vec::new();
148-
entry.read_to_end(&mut content).await.with_context(|| {
149-
format!(
150-
"Failed to read file content from tar entry: {}",
151-
path.display()
152-
)
153-
})?;
154-
155-
// Extract file permission mode
156-
let mode = entry.header().mode().unwrap_or(0o644);
157-
158-
let size = content.len();
159-
let extracted_entry = ExtractedEntry {
160-
path: full_path,
161-
content,
162-
size,
163-
mode,
164-
};
165-
166-
if entry_tx.send(extracted_entry).await.is_err() {
167-
break;
168-
}
169-
}
170-
}
171-
172-
Ok::<(), anyhow::Error>(())
173-
})
174-
};
175-
176-
// Stage 2: Concurrent file writing with cached directory creation
177-
let file_writing_task = {
178-
tokio::spawn(async move {
179-
use dashmap::DashSet;
180-
181-
let semaphore = Arc::new(Semaphore::new(16));
182-
let created_dirs = Arc::new(DashSet::<std::path::PathBuf>::new());
183-
let mut write_tasks = Vec::new();
184-
let mut batch_size = 0;
185-
let mut total_bytes = 0;
186-
const MAX_BATCH_SIZE: usize = 100;
187-
const MAX_BATCH_BYTES: usize = 50 * 1024 * 1024; // 50MB
188-
189-
while let Some(entry) = entry_rx.recv().await {
190-
let semaphore = Arc::clone(&semaphore);
191-
let created_dirs = Arc::clone(&created_dirs);
192-
batch_size += 1;
193-
total_bytes += entry.size;
194-
195-
let task = tokio::spawn(async move {
196-
let _permit = semaphore.acquire().await.unwrap();
197-
198-
// Ensure parent directory exists using cache
199-
if let Some(parent) = entry.path.parent() {
200-
let parent_path = parent.to_path_buf();
201-
202-
// Check cache first to avoid duplicate directory creation
203-
if !created_dirs.contains(&parent_path) {
204-
if let Err(e) = tokio::fs::create_dir_all(&parent_path).await {
205-
log_verbose(&format!(
206-
"Failed to create parent dir {}: {}",
207-
parent_path.display(),
208-
e
209-
));
210-
return Err(anyhow::anyhow!(
211-
"Failed to create parent directory: {}",
212-
e
213-
)
214-
.context(format!("Parent directory: {}", parent_path.display())));
215-
}
216-
217-
created_dirs.insert(parent_path);
218-
}
219-
}
220-
221-
// Write file content
222-
if let Err(e) = tokio::fs::write(&entry.path, &entry.content).await {
223-
log_verbose(&format!(
224-
"Failed to write file {}: {}",
225-
entry.path.display(),
226-
e
227-
));
228-
return Err(anyhow::anyhow!("Write failed: {}", e)
229-
.context(format!("File path: {}", entry.path.display())));
230-
}
231-
232-
// Set original file permissions from tar entry
233-
let permissions = Permissions::from_mode(entry.mode);
234-
if let Err(e) = tokio::fs::set_permissions(&entry.path, permissions).await {
116+
let mut created_dirs = HashSet::<std::path::PathBuf>::new();
117+
118+
// Create streaming gzip decoder
119+
let gzip_decoder = GzipDecoder::new(stream_reader);
120+
let mut tar_archive = Archive::new(gzip_decoder);
121+
let mut entries = tar_archive.entries()?;
122+
123+
while let Some(entry_result) = entries.next().await {
124+
let mut entry = entry_result.with_context(|| "Failed to read tar entry")?;
125+
let path = entry
126+
.path()
127+
.with_context(|| "Failed to get entry path")?
128+
.into_owned();
129+
let full_path = &dest.join(&path);
130+
let is_dir = entry.header().entry_type().is_dir();
131+
132+
// Only process files, skip directories (they'll be created when writing files)
133+
if !is_dir {
134+
// Extract file permission mode
135+
let mode = entry.header().mode().unwrap_or(0o644);
136+
137+
if let Some(parent) = full_path.parent() {
138+
let parent_path = parent.to_path_buf();
139+
140+
// Check cache first to avoid duplicate directory creation
141+
if !created_dirs.contains(&parent_path) {
142+
if let Err(e) = tokio::fs::create_dir_all(&parent_path).await {
235143
log_verbose(&format!(
236-
"Failed to set permissions {}: {}",
237-
entry.path.display(),
144+
"Failed to create parent dir {}: {}",
145+
parent_path.display(),
238146
e
239147
));
148+
return Err(anyhow::anyhow!("Failed to create parent directory: {}", e)
149+
.context(format!("Parent directory: {}", parent_path.display())));
240150
}
241151

242-
Ok::<(), anyhow::Error>(())
243-
});
244-
245-
write_tasks.push(task);
246-
247-
// Process in batches to manage memory and concurrency
248-
if batch_size >= MAX_BATCH_SIZE
249-
|| total_bytes >= MAX_BATCH_BYTES
250-
|| entry_rx.is_empty()
251-
{
252-
for task in write_tasks.drain(..) {
253-
task.await??;
254-
}
255-
batch_size = 0;
256-
total_bytes = 0;
152+
created_dirs.insert(parent_path);
257153
}
258154
}
259155

260-
// Wait for remaining tasks
261-
for task in write_tasks {
262-
task.await??;
263-
}
264-
265-
Ok::<(), anyhow::Error>(())
266-
})
267-
};
268-
269-
// Close sender channel
270-
drop(entry_tx);
271-
272-
// Wait for both stages to complete
273-
let (extract_result, write_result) = tokio::try_join!(extraction_task, file_writing_task)?;
156+
let mut file = tokio::fs::OpenOptions::new()
157+
.create(true)
158+
.truncate(true)
159+
.write(true)
160+
.open(&full_path)
161+
.await
162+
.context(format!("Failed to open file {}", full_path.display()))?;
274163

275-
extract_result?;
276-
write_result?;
164+
tokio::io::copy(&mut entry, &mut file)
165+
.await
166+
.context(format!("Failed to write file {}", full_path.display()))?;
167+
168+
// Set original file permissions from tar entry
169+
let permissions = Permissions::from_mode(mode);
170+
if let Err(e) = tokio::fs::set_permissions(&full_path, permissions).await {
171+
log_verbose(&format!(
172+
"Failed to set permissions {}: {}",
173+
full_path.display(),
174+
e
175+
));
176+
}
177+
}
178+
}
277179

278180
// Set directory permissions and create resolution marker
279181
set_permissions(&dest, Permissions::from_mode(0o755))
@@ -286,14 +188,6 @@ async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()>
286188
Ok(())
287189
}
288190

289-
#[derive(Debug)]
290-
struct ExtractedEntry {
291-
path: std::path::PathBuf,
292-
content: Vec<u8>,
293-
size: usize,
294-
mode: u32, // File permission mode
295-
}
296-
297191
#[cfg(test)]
298192
mod tests {
299193
use super::*;

0 commit comments

Comments
 (0)