Skip to content

Commit f4e41d5

Browse files
feat(downloads): concurrently download components
Some notifications needed to be updated to include the download URL, enabling the identification of the component being downloaded. This was necessary for accurate progress reporting of each component.
1 parent e43263a commit f4e41d5

File tree

8 files changed

+141
-76
lines changed

8 files changed

+141
-76
lines changed

src/cli/download_tracker.rs

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
2-
use std::time::Duration;
2+
use std::collections::HashMap;
33

44
use crate::dist::Notification as In;
55
use crate::notifications::Notification;
@@ -13,8 +13,8 @@ use crate::utils::Notification as Un;
1313
pub(crate) struct DownloadTracker {
1414
/// MultiProgress bar for the downloads.
1515
multi_progress_bars: MultiProgress,
16-
/// ProgressBar for the current download.
17-
progress_bar: ProgressBar,
16+
/// Mapping of URLs being downloaded to their corresponding progress bars.
17+
file_progress_bars: HashMap<String, ProgressBar>,
1818
}
1919

2020
impl DownloadTracker {
@@ -28,22 +28,35 @@ impl DownloadTracker {
2828

2929
Self {
3030
multi_progress_bars,
31-
progress_bar: ProgressBar::hidden(),
31+
file_progress_bars: HashMap::new(),
3232
}
3333
}
3434

3535
pub(crate) fn handle_notification(&mut self, n: &Notification<'_>) -> bool {
3636
match *n {
37-
Notification::Install(In::Utils(Un::DownloadContentLengthReceived(content_len))) => {
38-
self.content_length_received(content_len);
37+
Notification::Install(In::Utils(Un::DownloadContentLengthReceived(
38+
content_len,
39+
url,
40+
))) => {
41+
if let Some(url) = url {
42+
self.content_length_received(content_len, url);
43+
}
3944
true
4045
}
41-
Notification::Install(In::Utils(Un::DownloadDataReceived(data))) => {
42-
self.data_received(data.len());
46+
Notification::Install(In::Utils(Un::DownloadDataReceived(data, url))) => {
47+
if let Some(url) = url {
48+
self.data_received(data.len(), url);
49+
}
4350
true
4451
}
45-
Notification::Install(In::Utils(Un::DownloadFinished)) => {
46-
self.download_finished();
52+
Notification::Install(In::Utils(Un::DownloadFinished(url))) => {
53+
if let Some(url) = url {
54+
self.download_finished(url);
55+
}
56+
true
57+
}
58+
Notification::Install(In::DownloadingComponent(component, _, _, url)) => {
59+
self.create_progress_bar(component.to_string(), url.to_string());
4760
true
4861
}
4962
Notification::Install(In::Utils(Un::DownloadPushUnit(_))) => true,
@@ -53,30 +66,44 @@ impl DownloadTracker {
5366
}
5467
}
5568

56-
/// Sets the length for a new ProgressBar and gives it a style.
57-
pub(crate) fn content_length_received(&mut self, content_len: u64) {
58-
self.progress_bar.set_length(content_len);
59-
self.progress_bar.set_style(
69+
/// Creates a new ProgressBar for the given component.
70+
pub(crate) fn create_progress_bar(&mut self, component: String, url: String) {
71+
let pb = ProgressBar::hidden();
72+
pb.set_style(
6073
ProgressStyle::with_template(
61-
"[{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})",
74+
"{msg:>12.bold} [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})",
6275
)
6376
.unwrap()
6477
.progress_chars("## "),
6578
);
79+
pb.set_message(component);
80+
self.multi_progress_bars.add(pb.clone());
81+
self.file_progress_bars.insert(url, pb);
82+
}
83+
84+
/// Sets the length for a new ProgressBar and gives it a style.
85+
pub(crate) fn content_length_received(&mut self, content_len: u64, url: &str) {
86+
if let Some(pb) = self.file_progress_bars.get(url) {
87+
pb.set_length(content_len);
88+
}
6689
}
6790

6891
/// Notifies self that data of size `len` has been received.
69-
pub(crate) fn data_received(&mut self, len: usize) {
70-
if self.progress_bar.is_hidden() && self.progress_bar.elapsed() >= Duration::from_secs(1) {
71-
self.multi_progress_bars.add(self.progress_bar.clone());
92+
pub(crate) fn data_received(&mut self, len: usize, url: &str) {
93+
if let Some(pb) = self.file_progress_bars.get(url) {
94+
pb.inc(len as u64);
7295
}
73-
self.progress_bar.inc(len as u64);
7496
}
7597

7698
/// Notifies self that the download has finished.
77-
pub(crate) fn download_finished(&mut self) {
78-
self.progress_bar.finish_and_clear();
79-
self.multi_progress_bars.remove(&self.progress_bar);
80-
self.progress_bar = ProgressBar::hidden();
99+
pub(crate) fn download_finished(&mut self, url: &str) {
100+
let Some(pb) = self.file_progress_bars.get(url) else {
101+
return;
102+
};
103+
pb.set_style(
104+
ProgressStyle::with_template("{msg:>12.bold} downloaded {total_bytes} in {elapsed}")
105+
.unwrap(),
106+
);
107+
pb.finish();
81108
}
82109
}

src/cli/self_update/windows.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,10 @@ pub(crate) async fn try_install_msvc(
274274
let download_tracker = Arc::new(Mutex::new(DownloadTracker::new_with_display_progress(
275275
true, process,
276276
)));
277-
download_tracker.lock().unwrap().download_finished();
277+
download_tracker
278+
.lock()
279+
.unwrap()
280+
.download_finished(visual_studio_url.as_str());
278281

279282
info!("downloading Visual Studio installer");
280283
download_file(

src/diskio/threaded.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,11 @@ impl Executor for Threaded<'_> {
263263
// pretend to have bytes to deliver.
264264
let mut prev_files = self.n_files.load(Ordering::Relaxed);
265265
if let Some(handler) = self.notify_handler {
266-
handler(Notification::DownloadFinished);
266+
handler(Notification::DownloadFinished(None));
267267
handler(Notification::DownloadPushUnit(Unit::IO));
268268
handler(Notification::DownloadContentLengthReceived(
269269
prev_files as u64,
270+
None,
270271
));
271272
}
272273
if prev_files > 50 {
@@ -284,12 +285,15 @@ impl Executor for Threaded<'_> {
284285
current_files = self.n_files.load(Ordering::Relaxed);
285286
let step_count = prev_files - current_files;
286287
if let Some(handler) = self.notify_handler {
287-
handler(Notification::DownloadDataReceived(&buf[0..step_count]));
288+
handler(Notification::DownloadDataReceived(
289+
&buf[0..step_count],
290+
None,
291+
));
288292
}
289293
}
290294
self.pool.join();
291295
if let Some(handler) = self.notify_handler {
292-
handler(Notification::DownloadFinished);
296+
handler(Notification::DownloadFinished(None));
293297
handler(Notification::DownloadPopUnit);
294298
}
295299
// close the feedback channel so that blocking reads on it can

src/dist/manifestation.rs

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ mod tests;
66

77
use std::path::Path;
88

9-
use anyhow::{Context, Result, anyhow, bail};
9+
use anyhow::{Context, Error, Result, anyhow, bail};
10+
use futures_util::stream::StreamExt;
1011
use tokio_retry::{RetryIf, strategy::FixedInterval};
12+
use tracing::info;
1113

1214
use crate::dist::component::{
1315
Components, Package, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction,
@@ -153,6 +155,7 @@ impl Manifestation {
153155
let mut things_to_install: Vec<(Component, CompressionKind, File)> = Vec::new();
154156
let mut things_downloaded: Vec<String> = Vec::new();
155157
let components = update.components_urls_and_hashes(new_manifest)?;
158+
let components_len = components.len();
156159

157160
const DEFAULT_MAX_RETRIES: usize = 3;
158161
let max_retries: usize = download_cfg
@@ -162,41 +165,61 @@ impl Manifestation {
162165
.and_then(|s| s.parse().ok())
163166
.unwrap_or(DEFAULT_MAX_RETRIES);
164167

165-
for (component, format, url, hash) in components {
168+
info!("downloading component(s)");
169+
for (component, _, url, _) in components.clone() {
166170
(download_cfg.notify_handler)(Notification::DownloadingComponent(
167171
&component.short_name(new_manifest),
168172
&self.target_triple,
169173
component.target.as_ref(),
174+
&url,
170175
));
171-
let url = if altered {
172-
url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str())
173-
} else {
174-
url
175-
};
176-
177-
let url_url = utils::parse_url(&url)?;
178-
179-
let downloaded_file = RetryIf::spawn(
180-
FixedInterval::from_millis(0).take(max_retries),
181-
|| download_cfg.download(&url_url, &hash),
182-
|e: &anyhow::Error| {
183-
// retry only known retriable cases
184-
match e.downcast_ref::<RustupError>() {
185-
Some(RustupError::BrokenPartialFile)
186-
| Some(RustupError::DownloadingFile { .. }) => {
187-
(download_cfg.notify_handler)(Notification::RetryingDownload(&url));
188-
true
189-
}
190-
_ => false,
191-
}
192-
},
193-
)
194-
.await
195-
.with_context(|| RustupError::ComponentDownloadFailed(component.name(new_manifest)))?;
196-
197-
things_downloaded.push(hash);
176+
}
198177

199-
things_to_install.push((component, format, downloaded_file));
178+
let component_stream =
179+
tokio_stream::iter(components.into_iter()).map(|(component, format, url, hash)| {
180+
async move {
181+
let url = if altered {
182+
url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str())
183+
} else {
184+
url
185+
};
186+
187+
let url_url = utils::parse_url(&url)?;
188+
189+
let downloaded_file = RetryIf::spawn(
190+
FixedInterval::from_millis(0).take(max_retries),
191+
|| download_cfg.download(&url_url, &hash),
192+
|e: &anyhow::Error| {
193+
// retry only known retriable cases
194+
match e.downcast_ref::<RustupError>() {
195+
Some(RustupError::BrokenPartialFile)
196+
| Some(RustupError::DownloadingFile { .. }) => {
197+
(download_cfg.notify_handler)(Notification::RetryingDownload(
198+
&url,
199+
));
200+
true
201+
}
202+
_ => false,
203+
}
204+
},
205+
)
206+
.await
207+
.with_context(|| {
208+
RustupError::ComponentDownloadFailed(component.name(new_manifest))
209+
})?;
210+
Ok::<_, Error>((component, format, downloaded_file, hash))
211+
}
212+
});
213+
if components_len > 0 {
214+
let results = component_stream
215+
.buffered(components_len)
216+
.collect::<Vec<_>>()
217+
.await;
218+
for result in results {
219+
let (component, format, downloaded_file, hash) = result?;
220+
things_downloaded.push(hash);
221+
things_to_install.push((component, format, downloaded_file));
222+
}
200223
}
201224

202225
// Begin transaction
@@ -452,6 +475,7 @@ impl Manifestation {
452475
"rust",
453476
&self.target_triple,
454477
Some(&self.target_triple),
478+
&url,
455479
));
456480

457481
use std::path::PathBuf;

src/dist/notifications.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub enum Notification<'a> {
2323
ExtensionNotInstalled(&'a str),
2424
NonFatalError(&'a anyhow::Error),
2525
MissingInstalledComponent(&'a str),
26-
DownloadingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>),
26+
DownloadingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>, &'a str),
2727
InstallingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>),
2828
RemovingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>),
2929
RemovingOldComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>),
@@ -61,7 +61,7 @@ impl Notification<'_> {
6161
| FileAlreadyDownloaded
6262
| DownloadingLegacyManifest => NotificationLevel::Debug,
6363
Extracting(_, _)
64-
| DownloadingComponent(_, _, _)
64+
| DownloadingComponent(_, _, _, _)
6565
| InstallingComponent(_, _, _)
6666
| RemovingComponent(_, _, _)
6767
| RemovingOldComponent(_, _, _)
@@ -107,7 +107,7 @@ impl Display for Notification<'_> {
107107
MissingInstalledComponent(c) => {
108108
write!(f, "during uninstall component {c} was not found")
109109
}
110-
DownloadingComponent(c, h, t) => {
110+
DownloadingComponent(c, h, t, _) => {
111111
if Some(h) == t.as_ref() || t.is_none() {
112112
write!(f, "downloading component '{c}'")
113113
} else {

src/download/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,13 @@ async fn download_file_(
108108

109109
match msg {
110110
Event::DownloadContentLengthReceived(len) => {
111-
notify_handler(Notification::DownloadContentLengthReceived(len));
111+
notify_handler(Notification::DownloadContentLengthReceived(
112+
len,
113+
Some(url.as_str()),
114+
));
112115
}
113116
Event::DownloadDataReceived(data) => {
114-
notify_handler(Notification::DownloadDataReceived(data));
117+
notify_handler(Notification::DownloadDataReceived(data, Some(url.as_str())));
115118
}
116119
Event::ResumingPartialDownload => {
117120
notify_handler(Notification::ResumingPartialDownload);
@@ -205,7 +208,7 @@ async fn download_file_(
205208
.download_to_path(url, path, resume_from_partial, Some(callback))
206209
.await;
207210

208-
notify_handler(Notification::DownloadFinished);
211+
notify_handler(Notification::DownloadFinished(Some(url.as_str())));
209212

210213
res
211214
}

src/utils/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ impl<'a> FileReaderWithProgress<'a> {
506506

507507
// Inform the tracker of the file size
508508
let flen = fh.metadata()?.len();
509-
(notify_handler)(Notification::DownloadContentLengthReceived(flen));
509+
(notify_handler)(Notification::DownloadContentLengthReceived(flen, None));
510510

511511
let fh = BufReader::with_capacity(8 * 1024 * 1024, fh);
512512

@@ -525,10 +525,13 @@ impl io::Read for FileReaderWithProgress<'_> {
525525
Ok(nbytes) => {
526526
self.nbytes += nbytes as u64;
527527
if nbytes != 0 {
528-
(self.notify_handler)(Notification::DownloadDataReceived(&buf[0..nbytes]));
528+
(self.notify_handler)(Notification::DownloadDataReceived(
529+
&buf[0..nbytes],
530+
None,
531+
));
529532
}
530533
if (nbytes == 0) || (self.flen == self.nbytes) {
531-
(self.notify_handler)(Notification::DownloadFinished);
534+
(self.notify_handler)(Notification::DownloadFinished(None));
532535
}
533536
Ok(nbytes)
534537
}

0 commit comments

Comments
 (0)