Skip to content

Commit 5f5ec19

Browse files
FranciscoTGouveiarami3l
authored andcommitted
fix(downloads): honor the RUSTUP_CONCURRENT_DOWNLOADS by always having "n" concurrent downloads
Using this environment variable with a value between 2 and 5 would mean that concurrency was not being totally maximized as there could be less than n futures running at a some point in time. See: https://docs.rs/futures-util/0.3.31/futures_util/stream/trait.StreamExt.html#method.buffered Adding a semaphore allows US to control how many futures are running at a time, fixing this problem and ensuring that the env var is always honored and the downloads are maximizing concurrency.
1 parent 2d429e8 commit 5f5ec19

File tree

2 files changed

+31
-15
lines changed

2 files changed

+31
-15
lines changed

src/cli/rustup_mode.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
path::{Path, PathBuf},
77
process::ExitStatus,
88
str::FromStr,
9+
sync::Arc,
910
time::Duration,
1011
};
1112

@@ -16,6 +17,7 @@ use console::style;
1617
use futures_util::stream::StreamExt;
1718
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
1819
use itertools::Itertools;
20+
use tokio::sync::Semaphore;
1921
use tracing::{info, trace, warn};
2022
use tracing_subscriber::{EnvFilter, Registry, reload::Handle};
2123

@@ -799,13 +801,15 @@ async fn check_updates(cfg: &Cfg<'_>, opts: CheckOpts) -> Result<utils::ExitCode
799801
let use_colors = matches!(t.color_choice(), ColorChoice::Auto | ColorChoice::Always);
800802
let mut update_available = false;
801803
let channels = cfg.list_channels()?;
802-
let num_channels = cfg.process.concurrent_downloads().unwrap_or(channels.len());
804+
let channels_len = channels.len();
805+
let num_channels = cfg.process.concurrent_downloads().unwrap_or(channels_len);
803806

804807
// Ensure that `.buffered()` is never called with 0 as this will cause a hang.
805808
// See: https://github.com/rust-lang/futures-rs/pull/1194#discussion_r209501774
806809
if num_channels > 0 {
807810
let multi_progress_bars =
808811
MultiProgress::with_draw_target(cfg.process.progress_draw_target());
812+
let semaphore = Arc::new(Semaphore::new(num_channels));
809813
let channels = tokio_stream::iter(channels.into_iter()).map(|(name, distributable)| {
810814
let pb = multi_progress_bars.add(ProgressBar::new(1));
811815
pb.set_style(
@@ -815,7 +819,10 @@ async fn check_updates(cfg: &Cfg<'_>, opts: CheckOpts) -> Result<utils::ExitCode
815819
);
816820
pb.set_message(format!("{name}"));
817821
pb.enable_steady_tick(Duration::from_millis(100));
822+
823+
let sem = semaphore.clone();
818824
async move {
825+
let _permit = sem.acquire().await.unwrap();
819826
let current_version = distributable.show_version()?;
820827
let dist_version = distributable.show_dist_version().await?;
821828
let mut update_a = false;
@@ -867,7 +874,7 @@ async fn check_updates(cfg: &Cfg<'_>, opts: CheckOpts) -> Result<utils::ExitCode
867874
// `indicatif`.
868875
let channels = if !multi_progress_bars.is_hidden() {
869876
channels
870-
.buffer_unordered(num_channels)
877+
.buffer_unordered(channels_len)
871878
.collect::<Vec<_>>()
872879
.await
873880
} else {

src/dist/manifestation.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use std::path::Path;
88

99
use anyhow::{Context, Result, anyhow, bail};
1010
use futures_util::stream::StreamExt;
11+
use std::sync::Arc;
12+
use tokio::sync::Semaphore;
1113
use tracing::info;
1214

1315
use crate::dist::component::{
@@ -154,10 +156,11 @@ impl Manifestation {
154156
let mut things_to_install: Vec<(Component, CompressionKind, File)> = Vec::new();
155157
let mut things_downloaded: Vec<String> = Vec::new();
156158
let components = update.components_urls_and_hashes(new_manifest)?;
159+
let components_len = components.len();
157160
let num_channels = download_cfg
158161
.process
159162
.concurrent_downloads()
160-
.unwrap_or(components.len());
163+
.unwrap_or(components_len);
161164

162165
const DEFAULT_MAX_RETRIES: usize = 3;
163166
let max_retries: usize = download_cfg
@@ -177,23 +180,29 @@ impl Manifestation {
177180
));
178181
}
179182

183+
let semaphore = Arc::new(Semaphore::new(num_channels));
180184
let component_stream =
181185
tokio_stream::iter(components.into_iter()).map(|(component, format, url, hash)| {
182-
self.download_component(
183-
component,
184-
format,
185-
url,
186-
hash,
187-
altered,
188-
tmp_cx,
189-
download_cfg,
190-
max_retries,
191-
new_manifest,
192-
)
186+
let sem = semaphore.clone();
187+
async move {
188+
let _permit = sem.acquire().await.unwrap();
189+
self.download_component(
190+
component,
191+
format,
192+
url,
193+
hash,
194+
altered,
195+
tmp_cx,
196+
download_cfg,
197+
max_retries,
198+
new_manifest,
199+
)
200+
.await
201+
}
193202
});
194203
if num_channels > 0 {
195204
let results = component_stream
196-
.buffered(num_channels)
205+
.buffered(components_len)
197206
.collect::<Vec<_>>()
198207
.await;
199208
for result in results {

0 commit comments

Comments
 (0)