Skip to content

Commit 6151be6

Browse files
committed
refactor(pm): use read streaming write
1 parent 0adbc71 commit 6151be6

File tree

1 file changed

+56
-160
lines changed

1 file changed

+56
-160
lines changed

crates/pm/src/util/downloader.rs

Lines changed: 56 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ use once_cell::sync::Lazy;
55
use reqwest::StatusCode;
66
use reqwest::{Client, Response};
77
use std::{fs::Permissions, os::unix::fs::PermissionsExt, path::Path};
8-
use tokio::{
9-
fs::{File, set_permissions},
10-
io::AsyncReadExt,
11-
};
8+
use tokio::fs::{File, set_permissions};
129
use tokio_retry::RetryIf;
1310
use tokio_tar::Archive;
1411
use tokio_util::io::StreamReader;
@@ -103,8 +100,8 @@ pub async fn download(url: &str, dest: &Path) -> Result<()> {
103100

104101
// Stream-based unpacking directly from HTTP Response
105102
async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()> {
103+
use dashmap::DashSet;
106104
use std::sync::Arc;
107-
use tokio::sync::{Semaphore, mpsc};
108105

109106
tokio::fs::create_dir_all(dest)
110107
.await
@@ -116,164 +113,71 @@ async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()>
116113
.map(|result| result.map_err(std::io::Error::other));
117114
let stream_reader = StreamReader::new(stream);
118115

119-
// Create pipeline processing channels
120-
let (entry_tx, mut entry_rx) = mpsc::channel::<ExtractedEntry>(500);
121-
122116
let dest = dest.to_path_buf();
123117

124-
// Stage 1: Streaming tar extraction
125-
let extraction_task = {
126-
let entry_tx = entry_tx.clone();
127-
let dest = dest.clone();
128-
129-
tokio::spawn(async move {
130-
// Create streaming gzip decoder
131-
let gzip_decoder = GzipDecoder::new(stream_reader);
132-
let mut tar_archive = Archive::new(gzip_decoder);
133-
let mut entries = tar_archive.entries()?;
134-
135-
while let Some(entry_result) = entries.next().await {
136-
let mut entry = entry_result.with_context(|| "Failed to read tar entry")?;
137-
let path = entry
138-
.path()
139-
.with_context(|| "Failed to get entry path")?
140-
.into_owned();
141-
let full_path = dest.join(&path);
142-
let is_dir = entry.header().entry_type().is_dir();
143-
144-
// Only process files, skip directories (they'll be created when writing files)
145-
if !is_dir {
146-
// Stream file content
147-
let mut content = Vec::new();
148-
entry.read_to_end(&mut content).await.with_context(|| {
149-
format!(
150-
"Failed to read file content from tar entry: {}",
151-
path.display()
152-
)
153-
})?;
154-
155-
// Extract file permission mode
156-
let mode = entry.header().mode().unwrap_or(0o644);
157-
158-
let size = content.len();
159-
let extracted_entry = ExtractedEntry {
160-
path: full_path,
161-
content,
162-
size,
163-
mode,
164-
};
165-
166-
if entry_tx.send(extracted_entry).await.is_err() {
167-
break;
168-
}
169-
}
170-
}
171-
172-
Ok::<(), anyhow::Error>(())
173-
})
174-
};
175-
176-
// Stage 2: Concurrent file writing with cached directory creation
177-
let file_writing_task = {
178-
tokio::spawn(async move {
179-
use dashmap::DashSet;
180-
181-
let semaphore = Arc::new(Semaphore::new(16));
182-
let created_dirs = Arc::new(DashSet::<std::path::PathBuf>::new());
183-
let mut write_tasks = Vec::new();
184-
let mut batch_size = 0;
185-
let mut total_bytes = 0;
186-
const MAX_BATCH_SIZE: usize = 100;
187-
const MAX_BATCH_BYTES: usize = 50 * 1024 * 1024; // 50MB
188-
189-
while let Some(entry) = entry_rx.recv().await {
190-
let semaphore = Arc::clone(&semaphore);
191-
let created_dirs = Arc::clone(&created_dirs);
192-
batch_size += 1;
193-
total_bytes += entry.size;
194-
195-
let task = tokio::spawn(async move {
196-
let _permit = semaphore.acquire().await.unwrap();
197-
198-
// Ensure parent directory exists using cache
199-
if let Some(parent) = entry.path.parent() {
200-
let parent_path = parent.to_path_buf();
201-
202-
// Check cache first to avoid duplicate directory creation
203-
if !created_dirs.contains(&parent_path) {
204-
if let Err(e) = tokio::fs::create_dir_all(&parent_path).await {
205-
log_verbose(&format!(
206-
"Failed to create parent dir {}: {}",
207-
parent_path.display(),
208-
e
209-
));
210-
return Err(anyhow::anyhow!(
211-
"Failed to create parent directory: {}",
212-
e
213-
)
214-
.context(format!("Parent directory: {}", parent_path.display())));
215-
}
216-
217-
created_dirs.insert(parent_path);
218-
}
219-
}
220-
221-
// Write file content
222-
if let Err(e) = tokio::fs::write(&entry.path, &entry.content).await {
223-
log_verbose(&format!(
224-
"Failed to write file {}: {}",
225-
entry.path.display(),
226-
e
227-
));
228-
return Err(anyhow::anyhow!("Write failed: {}", e)
229-
.context(format!("File path: {}", entry.path.display())));
230-
}
231-
232-
// Set original file permissions from tar entry
233-
let permissions = Permissions::from_mode(entry.mode);
234-
if let Err(e) = tokio::fs::set_permissions(&entry.path, permissions).await {
118+
let created_dirs = Arc::new(DashSet::<std::path::PathBuf>::new());
119+
120+
// Create streaming gzip decoder
121+
let gzip_decoder = GzipDecoder::new(stream_reader);
122+
let mut tar_archive = Archive::new(gzip_decoder);
123+
let mut entries = tar_archive.entries()?;
124+
125+
while let Some(entry_result) = entries.next().await {
126+
let mut entry = entry_result.with_context(|| "Failed to read tar entry")?;
127+
let path = entry
128+
.path()
129+
.with_context(|| "Failed to get entry path")?
130+
.into_owned();
131+
let full_path = &dest.join(&path);
132+
let is_dir = entry.header().entry_type().is_dir();
133+
134+
// Only process files, skip directories (they'll be created when writing files)
135+
if !is_dir {
136+
// Extract file permission mode
137+
let mode = entry.header().mode().unwrap_or(0o644);
138+
139+
if let Some(parent) = full_path.parent() {
140+
let parent_path = parent.to_path_buf();
141+
142+
// Check cache first to avoid duplicate directory creation
143+
if !created_dirs.contains(&parent_path) {
144+
if let Err(e) = tokio::fs::create_dir_all(&parent_path).await {
235145
log_verbose(&format!(
236-
"Failed to set permissions {}: {}",
237-
entry.path.display(),
146+
"Failed to create parent dir {}: {}",
147+
parent_path.display(),
238148
e
239149
));
150+
return Err(anyhow::anyhow!("Failed to create parent directory: {}", e)
151+
.context(format!("Parent directory: {}", parent_path.display())));
240152
}
241153

242-
Ok::<(), anyhow::Error>(())
243-
});
244-
245-
write_tasks.push(task);
246-
247-
// Process in batches to manage memory and concurrency
248-
if batch_size >= MAX_BATCH_SIZE
249-
|| total_bytes >= MAX_BATCH_BYTES
250-
|| entry_rx.is_empty()
251-
{
252-
for task in write_tasks.drain(..) {
253-
task.await??;
254-
}
255-
batch_size = 0;
256-
total_bytes = 0;
154+
created_dirs.insert(parent_path);
257155
}
258156
}
259157

260-
// Wait for remaining tasks
261-
for task in write_tasks {
262-
task.await??;
263-
}
264-
265-
Ok::<(), anyhow::Error>(())
266-
})
267-
};
268-
269-
// Close sender channel
270-
drop(entry_tx);
271-
272-
// Wait for both stages to complete
273-
let (extract_result, write_result) = tokio::try_join!(extraction_task, file_writing_task)?;
158+
let mut file = tokio::fs::OpenOptions::new()
159+
.create(true)
160+
.truncate(true)
161+
.write(true)
162+
.open(&full_path)
163+
.await
164+
.context(format!("Failed to open file {}", full_path.display()))?;
274165

275-
extract_result?;
276-
write_result?;
166+
tokio::io::copy(&mut entry, &mut file)
167+
.await
168+
.context(format!("Failed to write file {}", full_path.display()))?;
169+
170+
// Set original file permissions from tar entry
171+
let permissions = Permissions::from_mode(mode);
172+
if let Err(e) = tokio::fs::set_permissions(&full_path, permissions).await {
173+
log_verbose(&format!(
174+
"Failed to set permissions {}: {}",
175+
full_path.display(),
176+
e
177+
));
178+
}
179+
}
180+
}
277181

278182
// Set directory permissions and create resolution marker
279183
set_permissions(&dest, Permissions::from_mode(0o755))
@@ -286,14 +190,6 @@ async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()>
286190
Ok(())
287191
}
288192

289-
#[derive(Debug)]
290-
struct ExtractedEntry {
291-
path: std::path::PathBuf,
292-
content: Vec<u8>,
293-
size: usize,
294-
mode: u32, // File permission mode
295-
}
296-
297193
#[cfg(test)]
298194
mod tests {
299195
use super::*;

0 commit comments

Comments
 (0)