Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PEX - sharing peers with othes #261

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 99 additions & 3 deletions crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use librqbit_core::{
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer_binary_protocol::{
extended::{
handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage,
self, handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage,
},
Handshake, Message, MessageOwned, Piece, Request,
};
Expand Down Expand Up @@ -838,6 +838,93 @@ impl TorrentStateLive {
.take_while(|r| r.is_ok())
.last();
}

async fn task_send_pex_to_peer(
self: Arc<Self>,
peer_addr: SocketAddr,
tx: PeerTx,
) -> anyhow::Result<()> {
let mut sent_peers_live: HashSet<SocketAddr> = HashSet::new();
const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more)
const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); // As per BEP 11 recommended interval is min 60 seconds
let mut delay = Duration::from_secs(10); // Wait 10 seconds before sending the first message to assure that peer will stay with us
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio::time::interval would be cleaner as it serves the same purpose as sleeping in a loop but is a bit nicer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main reasons for using sleep:

  • initial wait - do not want to start immediately after peer connection, as peer can quickly disconnect. Only want to send PEX to 'stable' peers. interval will fire immediately.
  • changing wait period - now it's 10, then 60 secs. But can be extended - I was thinking about some extension of the interval - up to 600 secs or something like this.

From this perspective sleep seems easier to use.


loop {
tokio::select! {
_ = tx.closed() => return Ok(()),
_ = tokio::time::sleep(delay) => {},

}
delay = PEX_MESSAGE_INTERVAL;

let addrs_live_to_sent = self
.peers
.states
.iter()
.filter_map(|e| {
let peer = e.value();
let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key());

if *addr != peer_addr {
let has_outgoing_connections = peer
.stats
.counters
.outgoing_connections
.load(Ordering::Relaxed)
> 0; // As per BEP 11 share only those we were able to connect
if peer.state.is_live()
&& has_outgoing_connections
&& !sent_peers_live.contains(addr)
{
Some(*addr)
} else {
None
}
} else {
None
}
})
.take(MAX_SENT_PEERS)
.collect::<HashSet<_>>();

let addrs_closed_to_sent = sent_peers_live
.iter()
.filter(|addr| {
self.peers
.states
.get(addr)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're locking the state twice, and it may race - e.g. it'll be live in first pass where you are computing "addrs_live_to_sent" and then not live later, or the other way around.

It's hard to reason about what will happen in this case, esp. when below you're trying to provide strong guarantees like "it's assured by mutual exclusion of two above sets".

So overall, how about refactor this for simplicity:

  1. store live peer's view in "sent_peers_live" as you are doing now.
  2. for each loop iteration
    1. collect all live addrs into HashSet (not bounded by 50, but bounded by the semaphore anyway, 128 if I remember correctly). Let's name it "all_live"
    2. connected = (all - sent).take(50)
    3. dropped = (sent - all).take(50)
    4. sent = sent + connected - dropped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - but I think it's not the case here because liveness check actually happens only once, depending on content of sent_peers_live - which reflects already sent peers addresses:

  • if address is in sent_peers_live, then its effectively checked for liveness only on line 853 .map(|p| !p.value().state.is_live()) (for closed peers), because of line 834 && !sent_peers_live.contains(addr) which excludes it ultimately from addrs_live_to_sent set.
  • if address is not in sent_peers_live - it's checked only for addrs_live_to_sent on line 832 if peer.state.is_live() and second check for closed peers is not happening at all, because it's in iteration over sent_peers_live.

So in any case in each loop run only one liveness check happens, which is relevant for logic.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main point was that the logic is hard to read, and I proposed a simpler way to write it. All the state that you need to keep in your mind to understand why the liveness check is happening only once might serve as proof of that :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that logic is difficult - it's just all about sent_peers_live, which we need anyhow - if address is not in it, we check if we can add new address, which is live and not in it, if address is in it, we check if we should remove if it's not live. That's it.

Proposed change will work exactly same, I think. Only we will need always to collect all live peers, which is not "needed", we just need those, that are not in sent_peers_live.

.map(|p| !p.value().state.is_live())
.unwrap_or(true)
})
.copied()
.take(MAX_SENT_PEERS)
.collect::<HashSet<_>>();

// BEP 11 - Dont send closed if they are now in live
// it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent,
// and addrs_closed_to_sent are only filtered addresses from sent_peers_live

if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() {
debug!(
"sending PEX with {} live ({:?})and {} closed peers",
addrs_live_to_sent.len(),
addrs_live_to_sent,
addrs_closed_to_sent.len()
);
let pex_msg =
extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent);
let ext_msg = extended::ExtendedMessage::UtPex(pex_msg);
let msg = Message::Extended(ext_msg);

if tx.send(WriterRequest::Message(msg)).is_err() {
return Ok(()); // Peer disconnected
}

sent_peers_live.extend(&addrs_live_to_sent);
sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr));
}
}
}
}

struct PeerHandlerLocked {
Expand Down Expand Up @@ -963,8 +1050,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
}

fn on_extended_handshake(&self, hs: &ExtendedHandshake<ByteBuf>) -> anyhow::Result<()> {
if let Some(peer_pex_msg_id) = hs.ut_pex() {
trace!("peer supports pex at {peer_pex_msg_id}");
if let Some(_peer_pex_msg_id) = hs.ut_pex() {
self.state.clone().spawn(
error_span!(
parent: self.state.torrent.span.clone(),
"sending_pex_to_peer",
peer = self.addr.to_string()
),
self.state
.clone()
.task_send_pex_to_peer(self.addr, self.tx.clone()),
);
}
// Lets update outgoing Socket address for incoming connection
if self.incoming {
Expand Down
4 changes: 4 additions & 0 deletions crates/librqbit/src/torrent_state/live/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ impl PeerStateNoMut {
}
}

pub fn is_live(&self) -> bool {
matches!(&self.0, PeerState::Live(_))
}

pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match &mut self.0 {
PeerState::Live(l) => Some(l),
Expand Down
85 changes: 83 additions & 2 deletions crates/peer_binary_protocol/src/extended/ut_pex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::net::{IpAddr, SocketAddr};

use buffers::ByteBufOwned;
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use clone_to_owned::CloneToOwned;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -121,9 +122,58 @@ where
}
}

impl UtPex<ByteBufOwned> {
pub fn from_addrs<'a, I, J>(addrs_live: I, addrs_closed: J) -> Self
where
I: IntoIterator<Item = &'a SocketAddr>,
J: IntoIterator<Item = &'a SocketAddr>,
{
fn addrs_to_bytes<'a, I>(addrs: I) -> (Option<ByteBufOwned>, Option<ByteBufOwned>)
where
I: IntoIterator<Item = &'a SocketAddr>,
{
let mut ipv4_addrs = BytesMut::new();
let mut ipv6_addrs = BytesMut::new();
for addr in addrs {
match addr {
SocketAddr::V4(v4) => {
ipv4_addrs.extend_from_slice(&v4.ip().octets());
ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes());
}
SocketAddr::V6(v6) => {
ipv6_addrs.extend_from_slice(&v6.ip().octets());
ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes());
}
}
}

let freeze = |buf: BytesMut| -> Option<ByteBufOwned> {
if !buf.is_empty() {
Some(buf.freeze().into())
} else {
None
}
};

(freeze(ipv4_addrs), freeze(ipv6_addrs))
}

let (added, added6) = addrs_to_bytes(addrs_live);
let (dropped, dropped6) = addrs_to_bytes(addrs_closed);

Self {
added,
added6,
dropped,
dropped6,
..Default::default()
}
}
}

#[cfg(test)]
mod tests {
use bencode::from_bytes;
use bencode::{bencode_serialize_to_writer, from_bytes};
use buffers::ByteBuf;

use super::*;
Expand Down Expand Up @@ -154,4 +204,35 @@ mod tests {
);
assert_eq!(0, addrs[1].flags);
}

#[test]
fn test_pex_roundtrip() {
let a1 = "185.159.157.20:46439".parse::<SocketAddr>().unwrap();
let a2 = "151.249.105.134:4240".parse::<SocketAddr>().unwrap();
//IPV6
let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439"
.parse::<SocketAddr>()
.unwrap();
let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240"
.parse::<SocketAddr>()
.unwrap();

let addrs = vec![a1, aa1, a2, aa2];
let pex = UtPex::from_addrs(&addrs, &addrs);
let mut bytes = Vec::new();
bencode_serialize_to_writer(&pex, &mut bytes).unwrap();
let pex2 = from_bytes::<UtPex<ByteBuf>>(&bytes).unwrap();
assert_eq!(4, pex2.added_peers().count());
assert_eq!(pex.added_peers().count(), pex2.added_peers().count());
let addrs2: Vec<_> = pex2.added_peers().collect();
assert_eq!(a1, addrs2[0].addr);
assert_eq!(a2, addrs2[1].addr);
assert_eq!(aa1, addrs2[2].addr);
assert_eq!(aa2, addrs2[3].addr);
let addrs2: Vec<_> = pex2.dropped_peers().collect();
assert_eq!(a1, addrs2[0].addr);
assert_eq!(a2, addrs2[1].addr);
assert_eq!(aa1, addrs2[2].addr);
assert_eq!(aa2, addrs2[3].addr);
}
}