From a57a9518476b75d20be157a04cce669a36f8224c Mon Sep 17 00:00:00 2001 From: Mitchel Stewart Date: Mon, 20 Jan 2025 02:30:42 -0500 Subject: [PATCH] Initial support for interface binding --- crates/dht/src/dht.rs | 27 ++++++-- crates/librqbit/src/session.rs | 68 ++++++++++++++++--- crates/rqbit/src/main.rs | 15 +++- crates/tracker_comms/src/tracker_comms_udp.rs | 8 ++- crates/upnp/src/lib.rs | 7 ++ 5 files changed, 105 insertions(+), 20 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 72d6297f..b8a415ac 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -1138,6 +1138,7 @@ pub struct DhtConfig { pub bootstrap_addrs: Option>, pub routing_table: Option, pub listen_addr: Option, + pub interface_name: Option, pub peer_store: Option, pub cancellation_token: Option, } @@ -1153,14 +1154,26 @@ impl DhtState { #[inline(never)] pub fn with_config(mut config: DhtConfig) -> BoxFuture<'static, anyhow::Result>> { async move { - let socket = match config.listen_addr { - Some(addr) => UdpSocket::bind(addr) + let socket: UdpSocket = match config.listen_addr { + Some(addr) => { + let sock = UdpSocket::bind(addr) .await - .with_context(|| format!("error binding socket, address {addr}")), - None => UdpSocket::bind("0.0.0.0:0") - .await - .context("error binding socket, address 0.0.0.0:0"), - }?; + .with_context(|| format!("error binding socket, address {addr}"))?; + if let Some(iname) = config.interface_name { + sock.bind_device(Some(iname.as_bytes()))?; + } + sock + }, + None => { + let sock = UdpSocket::bind("0.0.0.0:0") + .await + .context("error binding socket, address 0.0.0.0:0")?; + if let Some(iname) = config.interface_name { + sock.bind_device(Some(iname.as_bytes()))?; + } + sock + }, + }; let listen_addr = socket .local_addr() diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 6606f7ae..03b1202c 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -2,7 +2,7 @@ use std::{ borrow::Cow, collections::{HashMap, HashSet}, io::Read, - net::SocketAddr, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, sync::{atomic::AtomicUsize, Arc}, time::Duration, @@ -55,7 +55,7 @@ use parking_lot::RwLock; use peer_binary_protocol::Handshake; use serde::{Deserialize, Serialize}; use tokio::{ - net::{TcpListener, TcpStream}, + net::{TcpListener, TcpStream, TcpSocket}, sync::Notify, }; use tokio_util::sync::{CancellationToken, DropGuard}; @@ -396,6 +396,9 @@ pub struct SessionOptions { pub peer_opts: Option, pub listen_port_range: Option>, + /// Configure the interface to bind to on linux and fuscia systems + pub interface_bind: Option, + pub enable_upnp_port_forwarding: bool, // If you set this to something, all writes to disk will happen in background and be @@ -423,16 +426,55 @@ pub struct SessionOptions { async fn create_tcp_listener( port_range: std::ops::Range, + interface_bind: Option, ) -> anyhow::Result<(TcpListener, u16)> { - for port in port_range.clone() { - match TcpListener::bind(("0.0.0.0", port)).await { - Ok(l) => return Ok((l, port)), - Err(e) => { - debug!("error listening on port {port}: {e:#}") + #[cfg(any(target_os = "linux", target_os = "fuchsia", target_os = "android"))] + { + match interface_bind { + Some(interface) => { + warn!("running on specific interface"); + for port in port_range.clone() { + let addr: SocketAddr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 35213 + ); + let socket = TcpSocket::new_v4()?; + socket.bind_device(Some(interface.as_bytes()))?; + socket.bind(addr)?; + match socket.listen(1024) { + Ok(l) => return Ok((l, port)), + Err(e) => { + error!("error listening on port {port}: {e:#}") + } + } + } + bail!("no free TCP ports in range {port_range:?}"); + }, + None => { + for port in port_range.clone() { + match TcpListener::bind(("0.0.0.0", 35213)).await { + Ok(l) => return Ok((l, port)), + Err(e) => { + debug!("error listening on port {port}: {e:#}") + } + } + } + bail!("no free TCP ports in range {port_range:?}"); + } + } + } + + #[cfg(not(any(target_os = "linux", target_os = "fuchsia", target_os = "android")))] + { + for port in port_range.clone() { + match TcpListener::bind(("0.0.0.0", port)).await { + Ok(l) => return Ok((l, port)), + Err(e) => { + debug!("error listening on port {port}: {e:#}") + } } } + bail!("no free TCP ports in range {port_range:?}"); } - bail!("no free TCP ports in range {port_range:?}"); } fn torrent_file_from_info_bytes(info_bytes: &[u8], trackers: &[String]) -> anyhow::Result { @@ -498,7 +540,7 @@ impl Session { let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range.clone() { - let (l, p) = create_tcp_listener(port_range) + let (l, p) = create_tcp_listener(port_range, opts.interface_bind.clone()) .await .context("error listening on TCP")?; info!("Listening on 0.0.0.0:{p} for incoming peer connections"); @@ -513,6 +555,7 @@ impl Session { let dht = if opts.disable_dht_persistence { DhtBuilder::with_config(DhtConfig { cancellation_token: Some(token.child_token()), + interface_name: opts.interface_bind.clone(), ..Default::default() }) .await @@ -599,6 +642,12 @@ impl Session { .context("error creating socks5 proxy for HTTP")?; reqwest::Client::builder().proxy(proxy) } else { + #[cfg(any(target_os = "linux", target_os = "fuchsia", target_os = "android"))] + match opts.interface_bind { + Some(interface) => reqwest::Client::builder().interface(&interface), + None => reqwest::Client::builder() + } + #[cfg(not(any(target_os = "linux", target_os = "fuchsia", target_os = "android")))] reqwest::Client::builder() }; @@ -1321,6 +1370,7 @@ impl Session { force_tracker_interval, announce_port, self.reqwest_client.clone(), + //interface_name ); let initial_peers_rx = if initial_peers.is_empty() { diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 3089ad87..594182b7 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -146,6 +146,14 @@ struct Opts { env = "RQBIT_TCP_LISTEN_MAX_PORT" )] tcp_listen_max_port: u16, + + /// Configure the interface for rqbit to bind to.. + /// Currently only usable on Linux, Android, and Fuscia + #[arg( + long = "interface-name", + env = "RQBIT_BIND_INTERFACE" + )] + interface_bind: Option, /// If set, will try to publish the chosen port through upnp on your router. #[arg( @@ -458,10 +466,15 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> ..Default::default() }), listen_port_range: if !opts.disable_tcp_listen { - Some(opts.tcp_listen_min_port..opts.tcp_listen_max_port) + if opts.tcp_listen_min_port == opts.tcp_listen_max_port || opts.tcp_listen_min_port > opts.tcp_listen_max_port { + Some(opts.tcp_listen_min_port..opts.tcp_listen_min_port+1) + } else { + Some(opts.tcp_listen_min_port..opts.tcp_listen_max_port) + } } else { None }, + interface_bind: opts.interface_bind, enable_upnp_port_forwarding: !opts.disable_upnp_port_forward, defer_writes_up_to: opts.defer_writes_up_to, default_storage_factory: Some({ diff --git a/crates/tracker_comms/src/tracker_comms_udp.rs b/crates/tracker_comms/src/tracker_comms_udp.rs index bbfa5e86..2dad5216 100644 --- a/crates/tracker_comms/src/tracker_comms_udp.rs +++ b/crates/tracker_comms/src/tracker_comms_udp.rs @@ -184,11 +184,13 @@ impl UdpTrackerRequester { pub async fn new(addr: impl ToSocketAddrs) -> anyhow::Result { let sock = tokio::net::UdpSocket::bind("0.0.0.0:0") .await - .context("error binding UDP socket")?; + .context("error binding UDP socket for tracker")?; + + //TODO: Figure out how to set this nicely + //sock.bind_device(Some(b"INTERFACE-NAME")).context("error setting UDP socket to interface")?; sock.connect(addr) .await - .context("error connecting UDP socket")?; - + .context("error connecting UDP socket for tracker")?; let tid = new_transaction_id(); let mut write_buf = Vec::new(); let mut read_buf = vec![0u8; 4096]; diff --git a/crates/upnp/src/lib.rs b/crates/upnp/src/lib.rs index 04dd94e3..27ec96ac 100644 --- a/crates/upnp/src/lib.rs +++ b/crates/upnp/src/lib.rs @@ -334,6 +334,7 @@ pub async fn discover_once( tx: &UnboundedSender, kind: &str, timeout: Duration, + interface_name: Option, ) -> anyhow::Result<()> { let socket = tokio::net::UdpSocket::bind("0.0.0.0:0") .await @@ -343,6 +344,9 @@ pub async fn discover_once( .send_to(message.as_bytes(), SSDP_MULTICAST_IP) .await .context("failed to send SSDP search request")?; + if let Some(iname) = interface_name { + socket.bind_device(Some(iname.as_bytes()))?; + } let mut buffer = [0; 2048]; @@ -378,6 +382,7 @@ pub struct UpnpPortForwarderOptions { pub lease_duration: Duration, pub discover_interval: Duration, pub discover_timeout: Duration, + pub interface_name: Option, } impl Default for UpnpPortForwarderOptions { @@ -386,6 +391,7 @@ impl Default for UpnpPortForwarderOptions { discover_interval: Duration::from_secs(60), discover_timeout: Duration::from_secs(10), lease_duration: Duration::from_secs(60), + interface_name: None, } } } @@ -425,6 +431,7 @@ impl UpnpPortForwarder { tx, SSDP_SEARCH_WAN_IPCONNECTION_ST, self.opts.discover_timeout, + self.opts.interface_name.clone(), ) .await }