Skip to content

Commit

Permalink
Initial support for interface binding
Browse files Browse the repository at this point in the history
  • Loading branch information
Quackdoc committed Jan 20, 2025
1 parent b34c246 commit a57a951
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 20 deletions.
27 changes: 20 additions & 7 deletions crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,7 @@ pub struct DhtConfig {
pub bootstrap_addrs: Option<Vec<String>>,
pub routing_table: Option<RoutingTable>,
pub listen_addr: Option<SocketAddr>,
pub interface_name: Option<String>,
pub peer_store: Option<PeerStore>,
pub cancellation_token: Option<CancellationToken>,
}
Expand All @@ -1153,14 +1154,26 @@ impl DhtState {
#[inline(never)]
pub fn with_config(mut config: DhtConfig) -> BoxFuture<'static, anyhow::Result<Arc<Self>>> {
async move {
let socket = match config.listen_addr {
Some(addr) => UdpSocket::bind(addr)
let socket: UdpSocket = match config.listen_addr {
Some(addr) => {
let sock = UdpSocket::bind(addr)
.await
.with_context(|| format!("error binding socket, address {addr}")),
None => UdpSocket::bind("0.0.0.0:0")
.await
.context("error binding socket, address 0.0.0.0:0"),
}?;
.with_context(|| format!("error binding socket, address {addr}"))?;
if let Some(iname) = config.interface_name {
sock.bind_device(Some(iname.as_bytes()))?;
}
sock
},
None => {
let sock = UdpSocket::bind("0.0.0.0:0")
.await
.context("error binding socket, address 0.0.0.0:0")?;
if let Some(iname) = config.interface_name {
sock.bind_device(Some(iname.as_bytes()))?;
}
sock
},
};

let listen_addr = socket
.local_addr()
Expand Down
68 changes: 59 additions & 9 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
borrow::Cow,
collections::{HashMap, HashSet},
io::Read,
net::SocketAddr,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::{atomic::AtomicUsize, Arc},
time::Duration,
Expand Down Expand Up @@ -55,7 +55,7 @@ use parking_lot::RwLock;
use peer_binary_protocol::Handshake;
use serde::{Deserialize, Serialize};
use tokio::{
net::{TcpListener, TcpStream},
net::{TcpListener, TcpStream, TcpSocket},
sync::Notify,
};
use tokio_util::sync::{CancellationToken, DropGuard};
Expand Down Expand Up @@ -396,6 +396,9 @@ pub struct SessionOptions {
pub peer_opts: Option<PeerConnectionOptions>,

pub listen_port_range: Option<std::ops::Range<u16>>,
/// Configure the interface to bind to on linux and fuscia systems
pub interface_bind: Option<String>,

pub enable_upnp_port_forwarding: bool,

// If you set this to something, all writes to disk will happen in background and be
Expand Down Expand Up @@ -423,16 +426,55 @@ pub struct SessionOptions {

async fn create_tcp_listener(
port_range: std::ops::Range<u16>,
interface_bind: Option<String>,
) -> anyhow::Result<(TcpListener, u16)> {
for port in port_range.clone() {
match TcpListener::bind(("0.0.0.0", port)).await {
Ok(l) => return Ok((l, port)),
Err(e) => {
debug!("error listening on port {port}: {e:#}")
#[cfg(any(target_os = "linux", target_os = "fuchsia", target_os = "android"))]
{
match interface_bind {
Some(interface) => {
warn!("running on specific interface");
for port in port_range.clone() {
let addr: SocketAddr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 35213
);
let socket = TcpSocket::new_v4()?;
socket.bind_device(Some(interface.as_bytes()))?;
socket.bind(addr)?;
match socket.listen(1024) {
Ok(l) => return Ok((l, port)),
Err(e) => {
error!("error listening on port {port}: {e:#}")
}
}
}
bail!("no free TCP ports in range {port_range:?}");
},
None => {
for port in port_range.clone() {
match TcpListener::bind(("0.0.0.0", 35213)).await {
Ok(l) => return Ok((l, port)),
Err(e) => {
debug!("error listening on port {port}: {e:#}")
}
}
}
bail!("no free TCP ports in range {port_range:?}");
}
}
}

#[cfg(not(any(target_os = "linux", target_os = "fuchsia", target_os = "android")))]
{
for port in port_range.clone() {
match TcpListener::bind(("0.0.0.0", port)).await {
Ok(l) => return Ok((l, port)),
Err(e) => {
debug!("error listening on port {port}: {e:#}")
}
}
}
bail!("no free TCP ports in range {port_range:?}");
}
bail!("no free TCP ports in range {port_range:?}");
}

fn torrent_file_from_info_bytes(info_bytes: &[u8], trackers: &[String]) -> anyhow::Result<Bytes> {
Expand Down Expand Up @@ -498,7 +540,7 @@ impl Session {

let (tcp_listener, tcp_listen_port) =
if let Some(port_range) = opts.listen_port_range.clone() {
let (l, p) = create_tcp_listener(port_range)
let (l, p) = create_tcp_listener(port_range, opts.interface_bind.clone())
.await
.context("error listening on TCP")?;
info!("Listening on 0.0.0.0:{p} for incoming peer connections");
Expand All @@ -513,6 +555,7 @@ impl Session {
let dht = if opts.disable_dht_persistence {
DhtBuilder::with_config(DhtConfig {
cancellation_token: Some(token.child_token()),
interface_name: opts.interface_bind.clone(),
..Default::default()
})
.await
Expand Down Expand Up @@ -599,6 +642,12 @@ impl Session {
.context("error creating socks5 proxy for HTTP")?;
reqwest::Client::builder().proxy(proxy)
} else {
#[cfg(any(target_os = "linux", target_os = "fuchsia", target_os = "android"))]
match opts.interface_bind {
Some(interface) => reqwest::Client::builder().interface(&interface),
None => reqwest::Client::builder()
}
#[cfg(not(any(target_os = "linux", target_os = "fuchsia", target_os = "android")))]
reqwest::Client::builder()
};

Expand Down Expand Up @@ -1321,6 +1370,7 @@ impl Session {
force_tracker_interval,
announce_port,
self.reqwest_client.clone(),
//interface_name
);

let initial_peers_rx = if initial_peers.is_empty() {
Expand Down
15 changes: 14 additions & 1 deletion crates/rqbit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ struct Opts {
env = "RQBIT_TCP_LISTEN_MAX_PORT"
)]
tcp_listen_max_port: u16,

/// Configure the interface for rqbit to bind to..
/// Currently only usable on Linux, Android, and Fuscia
#[arg(
long = "interface-name",
env = "RQBIT_BIND_INTERFACE"
)]
interface_bind: Option<String>,

/// If set, will try to publish the chosen port through upnp on your router.
#[arg(
Expand Down Expand Up @@ -458,10 +466,15 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()>
..Default::default()
}),
listen_port_range: if !opts.disable_tcp_listen {
Some(opts.tcp_listen_min_port..opts.tcp_listen_max_port)
if opts.tcp_listen_min_port == opts.tcp_listen_max_port || opts.tcp_listen_min_port > opts.tcp_listen_max_port {
Some(opts.tcp_listen_min_port..opts.tcp_listen_min_port+1)
} else {
Some(opts.tcp_listen_min_port..opts.tcp_listen_max_port)
}
} else {
None
},
interface_bind: opts.interface_bind,
enable_upnp_port_forwarding: !opts.disable_upnp_port_forward,
defer_writes_up_to: opts.defer_writes_up_to,
default_storage_factory: Some({
Expand Down
8 changes: 5 additions & 3 deletions crates/tracker_comms/src/tracker_comms_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,13 @@ impl UdpTrackerRequester {
pub async fn new(addr: impl ToSocketAddrs) -> anyhow::Result<Self> {
let sock = tokio::net::UdpSocket::bind("0.0.0.0:0")
.await
.context("error binding UDP socket")?;
.context("error binding UDP socket for tracker")?;

//TODO: Figure out how to set this nicely
//sock.bind_device(Some(b"INTERFACE-NAME")).context("error setting UDP socket to interface")?;
sock.connect(addr)
.await
.context("error connecting UDP socket")?;

.context("error connecting UDP socket for tracker")?;
let tid = new_transaction_id();
let mut write_buf = Vec::new();
let mut read_buf = vec![0u8; 4096];
Expand Down
7 changes: 7 additions & 0 deletions crates/upnp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ pub async fn discover_once(
tx: &UnboundedSender<UpnpDiscoverResponse>,
kind: &str,
timeout: Duration,
interface_name: Option<String>,
) -> anyhow::Result<()> {
let socket = tokio::net::UdpSocket::bind("0.0.0.0:0")
.await
Expand All @@ -343,6 +344,9 @@ pub async fn discover_once(
.send_to(message.as_bytes(), SSDP_MULTICAST_IP)
.await
.context("failed to send SSDP search request")?;
if let Some(iname) = interface_name {
socket.bind_device(Some(iname.as_bytes()))?;
}

let mut buffer = [0; 2048];

Expand Down Expand Up @@ -378,6 +382,7 @@ pub struct UpnpPortForwarderOptions {
pub lease_duration: Duration,
pub discover_interval: Duration,
pub discover_timeout: Duration,
pub interface_name: Option<String>,
}

impl Default for UpnpPortForwarderOptions {
Expand All @@ -386,6 +391,7 @@ impl Default for UpnpPortForwarderOptions {
discover_interval: Duration::from_secs(60),
discover_timeout: Duration::from_secs(10),
lease_duration: Duration::from_secs(60),
interface_name: None,
}
}
}
Expand Down Expand Up @@ -425,6 +431,7 @@ impl UpnpPortForwarder {
tx,
SSDP_SEARCH_WAN_IPCONNECTION_ST,
self.opts.discover_timeout,
self.opts.interface_name.clone(),
)
.await
}
Expand Down

0 comments on commit a57a951

Please sign in to comment.