diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index f645156f..767b8182 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -112,6 +112,8 @@ pub struct Session { reqwest_client: reqwest::Client, connector: Arc, + concurrent_initialize_semaphore: Arc, + // This is stored for all tasks to stop when session is dropped. _cancellation_token_drop_guard: DropGuard, } @@ -375,6 +377,9 @@ pub struct SessionOptions { // socks5://[username:password@]host:port pub socks_proxy_url: Option, + + // how many concurrent torrent initializations can happen + pub concurrent_init_limit: Option, } async fn create_tcp_listener( @@ -562,6 +567,7 @@ impl Session { default_storage_factory: opts.default_storage_factory, reqwest_client, connector: stream_connector, + concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) }); if let Some(mut disk_write_rx) = disk_write_rx { @@ -1085,7 +1091,12 @@ impl Session { let _ = span.enter(); managed_torrent - .start(peer_rx, opts.paused, self.cancellation_token.child_token()) + .start( + peer_rx, + opts.paused, + self.cancellation_token.child_token(), + self.concurrent_initialize_semaphore.clone(), + ) .context("error starting torrent")?; } @@ -1233,7 +1244,12 @@ impl Session { self.tcp_listen_port, handle.info().options.force_tracker_interval, )?; - handle.start(peer_rx, false, self.cancellation_token.child_token())?; + handle.start( + peer_rx, + false, + self.cancellation_token.child_token(), + self.concurrent_initialize_semaphore.clone(), + )?; self.try_update_persistence_metadata(handle).await; Ok(()) } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 8f75dff1..0fe1a818 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -208,6 +208,7 @@ impl ManagedTorrent { peer_rx: Option, start_paused: bool, live_cancellation_token: CancellationToken, + init_semaphore: Arc, ) -> anyhow::Result<()> { let mut g = self.locked.write(); @@ -283,10 +284,16 @@ impl ManagedTorrent { let t = self.clone(); let span = self.info().span.clone(); let token = live_cancellation_token.clone(); + spawn_with_cancel( error_span!(parent: span.clone(), "initialize_and_start"), token.clone(), async move { + let _permit = init_semaphore + .acquire() + .await + .context("bug: concurrent init semaphore was closed")?; + match init.check().await { Ok(paused) => { let mut g = t.locked.write(); @@ -344,7 +351,12 @@ impl ManagedTorrent { drop(g); // Recurse. - self.start(peer_rx, start_paused, live_cancellation_token) + self.start( + peer_rx, + start_paused, + live_cancellation_token, + init_semaphore, + ) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 19fedb7e..870a056e 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -122,6 +122,10 @@ struct Opts { /// Alternatively, set this as an environment variable RQBIT_SOCKS_PROXY_URL #[arg(long)] socks_url: Option, + + /// How many torrents can be initializing (rehashing) at the same time + #[arg(long, default_value = "5")] + concurrent_init_limit: usize, } #[derive(Parser)] @@ -335,6 +339,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { } }), socks_proxy_url: socks_url, + concurrent_init_limit: Some(opts.concurrent_init_limit), }; let stats_printer = |session: Arc| async move {