From 0529d6077a2fd1eefbf535df7e672304fe08b888 Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 21 Oct 2024 13:12:05 +0200 Subject: [PATCH 1/5] Sending PEX - initial concept --- crates/librqbit/src/torrent_state/live/mod.rs | 83 ++++++++++++++++++- .../src/extended/ut_pex.rs | 83 ++++++++++++++++++- 2 files changed, 161 insertions(+), 5 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 220c6371..65db14b7 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -68,7 +68,7 @@ use librqbit_core::{ use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ extended::{ - handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, + self, handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, }, Handshake, Message, MessageOwned, Piece, Request, }; @@ -795,6 +795,74 @@ impl TorrentStateLive { .take_while(|r| r.is_ok()) .last(); } + + async fn task_send_pex_to_peer( + self: Arc, + peer_addr: SocketAddr, + tx: PeerTx, + ) -> anyhow::Result<()> { + let mut sent_peers_live: HashSet = HashSet::new(); + const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more) + loop { + let addrs_live_to_sent = self + .peers + .states + .iter() + .filter_map(|e| { + let peer = e.value(); + let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key()); + + if *addr != peer_addr { + if peer.state.is_live() && !sent_peers_live.contains(addr) { + Some(*addr) + } else { + None + } + } else { + None + } + }) + .take(50) + .collect::>(); + + let addrs_closed_to_sent = sent_peers_live + .iter() + .filter(|addr| { + self.peers + .states + .get(addr) + .map(|p| !p.value().state.is_live()) + .unwrap_or(true) + }) + .copied() + .take(MAX_SENT_PEERS) + .collect::>(); + + // BEP 11 - Dont send closed if they are now in live + // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, + // and addrs_closed_to_sent are only filtered addresses from sent_peers_live + + if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { + let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); + let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); + let msg = Message::Extended(ext_msg); + + tx.send(WriterRequest::Message(msg))?; + + debug!(peer=?peer_addr, "sending PEX with {} live and {} closed peers", addrs_live_to_sent.len(), addrs_closed_to_sent.len()); + sent_peers_live.extend(&addrs_live_to_sent); + sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + + + } + + tokio::select! { + _ = tx.closed() => return Ok(()), + _ = tokio::time::sleep(Duration::from_secs(60)) => {}, + + } + } + } } struct PeerHandlerLocked { @@ -920,8 +988,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } fn on_extended_handshake(&self, hs: &ExtendedHandshake) -> anyhow::Result<()> { - if let Some(peer_pex_msg_id) = hs.ut_pex() { - trace!("peer supports pex at {peer_pex_msg_id}"); + if let Some(_peer_pex_msg_id) = hs.ut_pex() { + self.state.clone().spawn( + error_span!( + parent: self.state.torrent.span.clone(), + "sending_pex_to_peer", + peer = self.addr.to_string() + ), + self.state + .clone() + .task_send_pex_to_peer(self.addr, self.tx.clone()), + ); } // Lets update outgoing Socket address for incoming connection if self.incoming { diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 8e1b0a8a..0dc6b15b 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -1,7 +1,8 @@ use std::net::{IpAddr, SocketAddr}; +use buffers::ByteBufOwned; use byteorder::{ByteOrder, BE}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Serialize}; @@ -121,9 +122,58 @@ where } } +impl UtPex { + + pub fn from_addrs<'a, I,J>(addrs_live: I, addrs_closed: J) -> Self + where + I: IntoIterator, + J: IntoIterator, + { + + + fn addrs_to_bytes<'a,I>(addrs: I) -> (Option, Option) + where + I: IntoIterator, + { + let mut ipv4_addrs = BytesMut::new(); + let mut ipv6_addrs = BytesMut::new(); + for addr in addrs { + match addr { + SocketAddr::V4(v4) => { + ipv4_addrs.extend_from_slice(&v4.ip().octets()); + ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); + } + SocketAddr::V6(v6) => { + ipv6_addrs.extend_from_slice(&v6.ip().octets()); + ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + } + } + } + + let freeze = |buf: BytesMut| -> Option { if !buf.is_empty() {Some(buf.freeze().into())} else {None} }; + + (freeze(ipv4_addrs), freeze(ipv6_addrs)) + } + + let (added, added6) = addrs_to_bytes(addrs_live); + let (dropped, dropped6) = addrs_to_bytes(addrs_closed); + + + Self { + added, + added6, + dropped, + dropped6, + ..Default::default() + } + + } + +} + #[cfg(test)] mod tests { - use bencode::from_bytes; + use bencode::{bencode_serialize_to_writer, from_bytes}; use buffers::ByteBuf; use super::*; @@ -154,4 +204,33 @@ mod tests { ); assert_eq!(0, addrs[1].flags); } + + #[test] + fn test_pex_roundtrip() { + let a1 = "185.159.157.20:46439".parse::().unwrap(); + let a2 = "151.249.105.134:4240".parse::().unwrap(); + //IPV6 + let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439".parse::().unwrap(); + let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240".parse::().unwrap(); + + let addrs = vec![a1, aa1, a2, aa2]; + let pex = UtPex::from_addrs(&addrs, &addrs); + let mut bytes = Vec::new(); + bencode_serialize_to_writer(&pex, &mut bytes).unwrap(); + let pex2 = from_bytes::>(&bytes).unwrap(); + assert_eq!(4, pex2.added_peers().count()); + assert_eq!(pex.added_peers().count(), pex2.added_peers().count()); + let addrs2: Vec<_> = pex2.added_peers().collect(); + assert_eq!(a1, addrs2[0].addr); + assert_eq!(a2, addrs2[1].addr); + assert_eq!(aa1, addrs2[2].addr); + assert_eq!(aa2, addrs2[3].addr); + let addrs2: Vec<_> = pex2.dropped_peers().collect(); + assert_eq!(a1, addrs2[0].addr); + assert_eq!(a2, addrs2[1].addr); + assert_eq!(aa1, addrs2[2].addr); + assert_eq!(aa2, addrs2[3].addr); + + + } } From b8d2c2f22c2e7a1011e515a32cd0a04bb9599d06 Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 21 Oct 2024 17:05:52 +0200 Subject: [PATCH 2/5] Fix missing fn --- crates/librqbit/src/torrent_state/live/peer/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index b0895f8e..3c6e6af0 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -148,6 +148,10 @@ impl PeerStateNoMut { } } + pub fn is_live(&self) -> bool { + matches!(&self.0, PeerState::Live(_)) + } + pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { match &mut self.0 { PeerState::Live(l) => Some(l), From 94b4a135d470292252666690eaffc9a23afb3c6c Mon Sep 17 00:00:00 2001 From: Ivan Date: Wed, 23 Oct 2024 12:57:29 +0200 Subject: [PATCH 3/5] Include only addresses with known working connections --- crates/librqbit/src/torrent_state/live/mod.rs | 48 +++++++++------ .../src/extended/ut_pex.rs | 58 ++++++++++--------- 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 65db14b7..4b52d377 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -803,7 +803,17 @@ impl TorrentStateLive { ) -> anyhow::Result<()> { let mut sent_peers_live: HashSet = HashSet::new(); const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more) + const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); // As per BEP 11 recommended interval is min 60 seconds + let mut delay = Duration::from_secs(10); // Wait 10 seconds before sending the first message to assure that peer will stay with us + loop { + tokio::select! { + _ = tx.closed() => return Ok(()), + _ = tokio::time::sleep(delay) => {}, + + } + delay = PEX_MESSAGE_INTERVAL; + let addrs_live_to_sent = self .peers .states @@ -813,7 +823,16 @@ impl TorrentStateLive { let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key()); if *addr != peer_addr { - if peer.state.is_live() && !sent_peers_live.contains(addr) { + let has_outgoing_connections = peer + .stats + .counters + .outgoing_connections + .load(Ordering::Relaxed) + > 0; // As per BEP 11 share only those we were able to connect + if peer.state.is_live() + && has_outgoing_connections + && !sent_peers_live.contains(addr) + { Some(*addr) } else { None @@ -822,7 +841,7 @@ impl TorrentStateLive { None } }) - .take(50) + .take(50) .collect::>(); let addrs_closed_to_sent = sent_peers_live @@ -838,28 +857,21 @@ impl TorrentStateLive { .take(MAX_SENT_PEERS) .collect::>(); - // BEP 11 - Dont send closed if they are now in live - // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, + // BEP 11 - Dont send closed if they are now in live + // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, // and addrs_closed_to_sent are only filtered addresses from sent_peers_live if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { - let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); + debug!(peer=?peer_addr, "sending PEX with {} live ({:?})and {} closed peers", addrs_live_to_sent.len(), addrs_live_to_sent,addrs_closed_to_sent.len()); + let pex_msg = + extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); let msg = Message::Extended(ext_msg); - tx.send(WriterRequest::Message(msg))?; - - debug!(peer=?peer_addr, "sending PEX with {} live and {} closed peers", addrs_live_to_sent.len(), addrs_closed_to_sent.len()); - sent_peers_live.extend(&addrs_live_to_sent); - sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); - - - } - - tokio::select! { - _ = tx.closed() => return Ok(()), - _ = tokio::time::sleep(Duration::from_secs(60)) => {}, - + if tx.send(WriterRequest::Message(msg)).is_ok() { + sent_peers_live.extend(&addrs_live_to_sent); + sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + } } } } diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 0dc6b15b..a7c24b9f 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -123,42 +123,44 @@ where } impl UtPex { - - pub fn from_addrs<'a, I,J>(addrs_live: I, addrs_closed: J) -> Self + pub fn from_addrs<'a, I, J>(addrs_live: I, addrs_closed: J) -> Self where I: IntoIterator, J: IntoIterator, { - - - fn addrs_to_bytes<'a,I>(addrs: I) -> (Option, Option) + fn addrs_to_bytes<'a, I>(addrs: I) -> (Option, Option) where I: IntoIterator, - { - let mut ipv4_addrs = BytesMut::new(); - let mut ipv6_addrs = BytesMut::new(); - for addr in addrs { - match addr { - SocketAddr::V4(v4) => { - ipv4_addrs.extend_from_slice(&v4.ip().octets()); - ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); - } - SocketAddr::V6(v6) => { - ipv6_addrs.extend_from_slice(&v6.ip().octets()); - ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + { + let mut ipv4_addrs = BytesMut::new(); + let mut ipv6_addrs = BytesMut::new(); + for addr in addrs { + match addr { + SocketAddr::V4(v4) => { + ipv4_addrs.extend_from_slice(&v4.ip().octets()); + ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); + } + SocketAddr::V6(v6) => { + ipv6_addrs.extend_from_slice(&v6.ip().octets()); + ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + } } } - } - let freeze = |buf: BytesMut| -> Option { if !buf.is_empty() {Some(buf.freeze().into())} else {None} }; + let freeze = |buf: BytesMut| -> Option { + if !buf.is_empty() { + Some(buf.freeze().into()) + } else { + None + } + }; - (freeze(ipv4_addrs), freeze(ipv6_addrs)) - } + (freeze(ipv4_addrs), freeze(ipv6_addrs)) + } let (added, added6) = addrs_to_bytes(addrs_live); let (dropped, dropped6) = addrs_to_bytes(addrs_closed); - Self { added, added6, @@ -166,9 +168,7 @@ impl UtPex { dropped6, ..Default::default() } - } - } #[cfg(test)] @@ -210,8 +210,12 @@ mod tests { let a1 = "185.159.157.20:46439".parse::().unwrap(); let a2 = "151.249.105.134:4240".parse::().unwrap(); //IPV6 - let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439".parse::().unwrap(); - let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240".parse::().unwrap(); + let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439" + .parse::() + .unwrap(); + let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240" + .parse::() + .unwrap(); let addrs = vec![a1, aa1, a2, aa2]; let pex = UtPex::from_addrs(&addrs, &addrs); @@ -230,7 +234,5 @@ mod tests { assert_eq!(a2, addrs2[1].addr); assert_eq!(aa1, addrs2[2].addr); assert_eq!(aa2, addrs2[3].addr); - - } } From 82914bac27226b1d10157832ee66aeb7af67ca18 Mon Sep 17 00:00:00 2001 From: Ivan Date: Wed, 23 Oct 2024 13:42:52 +0200 Subject: [PATCH 4/5] Logging --- crates/librqbit/src/torrent_state/live/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 4b52d377..72adc960 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -862,7 +862,7 @@ impl TorrentStateLive { // and addrs_closed_to_sent are only filtered addresses from sent_peers_live if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { - debug!(peer=?peer_addr, "sending PEX with {} live ({:?})and {} closed peers", addrs_live_to_sent.len(), addrs_live_to_sent,addrs_closed_to_sent.len()); + debug!("sending PEX with {} live ({:?})and {} closed peers", addrs_live_to_sent.len(), addrs_live_to_sent,addrs_closed_to_sent.len()); let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); @@ -1806,6 +1806,7 @@ impl PeerHandler { B: AsRef<[u8]> + std::fmt::Debug, { // TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ... + debug!("received PEX message with {} added peers and {} dropped peers", msg.added_peers().count(), msg.dropped_peers().count()); msg.dropped_peers() .chain(msg.added_peers()) .for_each(|peer| { From ff1959b6801ab0bb388e8486813578436c4cdc09 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 9 Nov 2024 11:42:48 +0100 Subject: [PATCH 5/5] Small fixes base on comments in PR #261 --- crates/librqbit/src/torrent_state/live/mod.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 72adc960..9ff93566 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -841,7 +841,7 @@ impl TorrentStateLive { None } }) - .take(50) + .take(MAX_SENT_PEERS) .collect::>(); let addrs_closed_to_sent = sent_peers_live @@ -862,16 +862,23 @@ impl TorrentStateLive { // and addrs_closed_to_sent are only filtered addresses from sent_peers_live if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { - debug!("sending PEX with {} live ({:?})and {} closed peers", addrs_live_to_sent.len(), addrs_live_to_sent,addrs_closed_to_sent.len()); + debug!( + "sending PEX with {} live ({:?})and {} closed peers", + addrs_live_to_sent.len(), + addrs_live_to_sent, + addrs_closed_to_sent.len() + ); let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); let msg = Message::Extended(ext_msg); - if tx.send(WriterRequest::Message(msg)).is_ok() { - sent_peers_live.extend(&addrs_live_to_sent); - sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + if tx.send(WriterRequest::Message(msg)).is_err() { + return Ok(()); // Peer disconnected } + + sent_peers_live.extend(&addrs_live_to_sent); + sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); } } } @@ -1806,7 +1813,6 @@ impl PeerHandler { B: AsRef<[u8]> + std::fmt::Debug, { // TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ... - debug!("received PEX message with {} added peers and {} dropped peers", msg.added_peers().count(), msg.dropped_peers().count()); msg.dropped_peers() .chain(msg.added_peers()) .for_each(|peer| {