Skip to content

Commit

Permalink
Refactor for readability
Browse files Browse the repository at this point in the history
  • Loading branch information
ikatson committed Nov 28, 2024
1 parent 14e7fe0 commit 9665d00
Showing 1 changed file with 56 additions and 63 deletions.
119 changes: 56 additions & 63 deletions crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,91 +841,84 @@ impl TorrentStateLive {

async fn task_send_pex_to_peer(
self: Arc<Self>,
peer_addr: SocketAddr,
_peer_addr: SocketAddr,
tx: PeerTx,
) -> anyhow::Result<()> {
let mut sent_peers_live: HashSet<SocketAddr> = HashSet::new();
// As per BEP 11 we should not send more than 50 peers at once
// (here it also applies to fist message, should be OK as we anyhow really have more)
const MAX_SENT_PEERS: usize = 50;
// As per BEP 11 recommended interval is min 60 seconds
const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60);

let mut live_peers = HashSet::new();
let mut connected = Vec::with_capacity(MAX_SENT_PEERS);
let mut dropped = Vec::with_capacity(MAX_SENT_PEERS);
let mut peer_view_of_live_peers = HashSet::new();

// Wait 10 seconds before sending the first message to assure that peer will stay with us
let mut delay = Duration::from_secs(10);
tokio::time::sleep(Duration::from_secs(10)).await;

loop {
tokio::select! {
_ = tx.closed() => return Ok(()),
_ = tokio::time::sleep(delay) => {},
let mut interval = tokio::time::interval(PEX_MESSAGE_INTERVAL);

loop {
interval.tick().await;

// TODO: store them in a shared place
// Fill in live_peers
for ps in self.peers.states.iter() {
let peer = ps.value();
let addr = *peer.outgoing_address.as_ref().unwrap_or_else(|| ps.key());

// As per BEP 11 share only those we were able to connect
let has_outgoing_connections = peer
.stats
.counters
.outgoing_connections
.load(Ordering::Relaxed)
> 0;

let is_live = has_outgoing_connections && ps.value().state.is_live();
if is_live {
live_peers.insert(addr);
} else {
live_peers.remove(&addr);
}
}
delay = PEX_MESSAGE_INTERVAL;

let addrs_live_to_sent = self
.peers
.states
.iter()
.filter_map(|e| {
let peer = e.value();
let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key());

if *addr != peer_addr {
let has_outgoing_connections = peer
.stats
.counters
.outgoing_connections
.load(Ordering::Relaxed)
> 0; // As per BEP 11 share only those we were able to connect
if peer.state.is_live()
&& has_outgoing_connections
&& !sent_peers_live.contains(addr)
{
Some(*addr)
} else {
None
}
} else {
None
}
})
.take(MAX_SENT_PEERS)
.collect::<HashSet<_>>();
connected.clear();
dropped.clear();

let addrs_closed_to_sent = sent_peers_live
.iter()
.filter(|addr| {
self.peers
.states
.get(addr)
.map(|p| !p.value().state.is_live())
.unwrap_or(true)
})
.copied()
.take(MAX_SENT_PEERS)
.collect::<HashSet<_>>();
connected.extend(
live_peers
.difference(&peer_view_of_live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
dropped.extend(
peer_view_of_live_peers
.difference(&live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);

// BEP 11 - Dont send closed if they are now in live
// it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent,
// and addrs_closed_to_sent are only filtered addresses from sent_peers_live

if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() {
debug!(
"sending PEX with {} live ({:?})and {} closed peers",
addrs_live_to_sent.len(),
addrs_live_to_sent,
addrs_closed_to_sent.len()
);
let pex_msg =
extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent);
if !connected.is_empty() || !dropped.is_empty() {
let pex_msg = extended::ut_pex::UtPex::from_addrs(&connected, &dropped);
let ext_msg = extended::ExtendedMessage::UtPex(pex_msg);
let msg = Message::Extended(ext_msg);

if tx.send(WriterRequest::Message(msg)).is_err() {
if tx
.send(WriterRequest::Message(Message::Extended(ext_msg)))
.is_err()
{
return Ok(()); // Peer disconnected
}

sent_peers_live.extend(&addrs_live_to_sent);
sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr));
for addr in &dropped {
peer_view_of_live_peers.remove(addr);
}
peer_view_of_live_peers.extend(connected.iter().copied());
}
}
}
Expand Down

0 comments on commit 9665d00

Please sign in to comment.