-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PEX - sharing peers with othes #261
Changes from all commits
0529d60
b8d2c2f
94b4a13
82914ba
051ada4
ff1959b
b989f01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,7 +69,7 @@ use librqbit_core::{ | |
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; | ||
use peer_binary_protocol::{ | ||
extended::{ | ||
handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, | ||
self, handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, | ||
}, | ||
Handshake, Message, MessageOwned, Piece, Request, | ||
}; | ||
|
@@ -838,6 +838,93 @@ impl TorrentStateLive { | |
.take_while(|r| r.is_ok()) | ||
.last(); | ||
} | ||
|
||
async fn task_send_pex_to_peer( | ||
self: Arc<Self>, | ||
peer_addr: SocketAddr, | ||
tx: PeerTx, | ||
) -> anyhow::Result<()> { | ||
let mut sent_peers_live: HashSet<SocketAddr> = HashSet::new(); | ||
const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more) | ||
const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); // As per BEP 11 recommended interval is min 60 seconds | ||
let mut delay = Duration::from_secs(10); // Wait 10 seconds before sending the first message to assure that peer will stay with us | ||
|
||
loop { | ||
tokio::select! { | ||
_ = tx.closed() => return Ok(()), | ||
_ = tokio::time::sleep(delay) => {}, | ||
|
||
} | ||
delay = PEX_MESSAGE_INTERVAL; | ||
|
||
let addrs_live_to_sent = self | ||
.peers | ||
.states | ||
.iter() | ||
.filter_map(|e| { | ||
let peer = e.value(); | ||
let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key()); | ||
|
||
if *addr != peer_addr { | ||
let has_outgoing_connections = peer | ||
.stats | ||
.counters | ||
.outgoing_connections | ||
.load(Ordering::Relaxed) | ||
> 0; // As per BEP 11 share only those we were able to connect | ||
if peer.state.is_live() | ||
&& has_outgoing_connections | ||
&& !sent_peers_live.contains(addr) | ||
{ | ||
Some(*addr) | ||
} else { | ||
None | ||
} | ||
} else { | ||
None | ||
} | ||
}) | ||
.take(MAX_SENT_PEERS) | ||
.collect::<HashSet<_>>(); | ||
|
||
let addrs_closed_to_sent = sent_peers_live | ||
.iter() | ||
.filter(|addr| { | ||
self.peers | ||
.states | ||
.get(addr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're locking the state twice, and it may race - e.g. it'll be live in first pass where you are computing "addrs_live_to_sent" and then not live later, or the other way around. It's hard to reason about what will happen in this case, esp. when below you're trying to provide strong guarantees like "it's assured by mutual exclusion of two above sets". So overall, how about refactor this for simplicity:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see - but I think it's not the case here because liveness check actually happens only once, depending on content of
So in any case in each loop run only one liveness check happens, which is relevant for logic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My main point was that the logic is hard to read, and I proposed a simpler way to write it. All the state that you need to keep in your mind to understand why the liveness check is happening only once might serve as proof of that :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think that logic is difficult - it's just all about Proposed change will work exactly same, I think. Only we will need always to collect all live peers, which is not "needed", we just need those, that are not in |
||
.map(|p| !p.value().state.is_live()) | ||
.unwrap_or(true) | ||
}) | ||
.copied() | ||
.take(MAX_SENT_PEERS) | ||
.collect::<HashSet<_>>(); | ||
|
||
// BEP 11 - Dont send closed if they are now in live | ||
// it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, | ||
// and addrs_closed_to_sent are only filtered addresses from sent_peers_live | ||
|
||
if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { | ||
debug!( | ||
"sending PEX with {} live ({:?})and {} closed peers", | ||
addrs_live_to_sent.len(), | ||
addrs_live_to_sent, | ||
addrs_closed_to_sent.len() | ||
); | ||
let pex_msg = | ||
extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); | ||
let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); | ||
let msg = Message::Extended(ext_msg); | ||
|
||
if tx.send(WriterRequest::Message(msg)).is_err() { | ||
return Ok(()); // Peer disconnected | ||
} | ||
|
||
sent_peers_live.extend(&addrs_live_to_sent); | ||
sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); | ||
} | ||
} | ||
} | ||
} | ||
|
||
struct PeerHandlerLocked { | ||
|
@@ -963,8 +1050,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { | |
} | ||
|
||
fn on_extended_handshake(&self, hs: &ExtendedHandshake<ByteBuf>) -> anyhow::Result<()> { | ||
if let Some(peer_pex_msg_id) = hs.ut_pex() { | ||
trace!("peer supports pex at {peer_pex_msg_id}"); | ||
if let Some(_peer_pex_msg_id) = hs.ut_pex() { | ||
self.state.clone().spawn( | ||
error_span!( | ||
parent: self.state.torrent.span.clone(), | ||
"sending_pex_to_peer", | ||
peer = self.addr.to_string() | ||
), | ||
self.state | ||
.clone() | ||
.task_send_pex_to_peer(self.addr, self.tx.clone()), | ||
); | ||
} | ||
// Lets update outgoing Socket address for incoming connection | ||
if self.incoming { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::time::interval would be cleaner as it serves the same purpose as sleeping in a loop but is a bit nicer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main reasons for using
sleep
:From this perspective sleep seems easier to use.