Skip to content

Commit

Permalink
Merge pull request #284 from ikatson/pex-send-updates
Browse files Browse the repository at this point in the history
PEX - sharing peers with othes [continue #261]
  • Loading branch information
ikatson authored Dec 4, 2024
2 parents fb4e4c5 + fb760b2 commit a1de63e
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 100 deletions.
131 changes: 97 additions & 34 deletions crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ use librqbit_core::{
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer_binary_protocol::{
extended::{
handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage,
self, handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage,
},
Handshake, Message, MessageOwned, Piece, Request,
};
use peers::stats::atomic::AggregatePeerStatsAtomic;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Notify, OwnedSemaphorePermit, Semaphore,
Expand Down Expand Up @@ -258,6 +257,7 @@ impl TorrentStateLive {
session_stats: session_stats.clone(),
stats: Default::default(),
states: Default::default(),
live_outgoing_peers: Default::default(),
},
locked: RwLock::new(TorrentStateLocked {
chunks: Some(paused.chunk_tracker),
Expand Down Expand Up @@ -337,10 +337,6 @@ impl TorrentStateLive {
spawn_with_cancel(span, self.cancellation_token.clone(), fut);
}

fn peer_stats(&self) -> [&AggregatePeerStatsAtomic; 2] {
[&self.peers.stats, &self.peers.session_stats.peers]
}

pub fn down_speed_estimator(&self) -> &SpeedEstimator {
&self.down_speed_estimator
}
Expand Down Expand Up @@ -373,21 +369,21 @@ impl TorrentStateLive {
let counters = match self.peers.states.entry(checked_peer.addr) {
Entry::Occupied(mut occ) => {
let peer = occ.get_mut();
peer.state
.incoming_connection(
Id20::new(checked_peer.handshake.peer_id),
tx.clone(),
&self.peer_stats(),
)
.context("peer already existed")?;
peer.incoming_connection(
Id20::new(checked_peer.handshake.peer_id),
tx.clone(),
&self.peers,
)
.context("peer already existed")?;
peer.stats.counters.clone()
}
Entry::Vacant(vac) => {
atomic_inc(&self.peers.stats.seen);
let peer = Peer::new_live_for_incoming_connection(
*vac.key(),
Id20::new(checked_peer.handshake.peer_id),
tx.clone(),
&self.peer_stats(),
&self.peers,
);
let counters = peer.stats.counters.clone();
vac.insert(peer);
Expand Down Expand Up @@ -619,8 +615,7 @@ impl TorrentStateLive {

fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state
.connecting_to_live(Id20::new(h.peer_id), &self.peer_stats());
p.connecting_to_live(Id20::new(h.peer_id), &self.peers);
});
}

Expand Down Expand Up @@ -677,7 +672,7 @@ impl TorrentStateLive {
.peers
.states
.iter()
.filter(|e| filter.state.matches(e.value().state.get()))
.filter(|e| filter.state.matches(e.value().get_state()))
.map(|e| (e.key().to_string(), e.value().into()))
.collect(),
}
Expand Down Expand Up @@ -812,9 +807,9 @@ impl TorrentStateLive {

fn disconnect_all_peers_that_have_full_torrent(&self) {
for mut pe in self.peers.states.iter_mut() {
if let PeerState::Live(l) = pe.value().state.get() {
if let PeerState::Live(l) = pe.value().get_state() {
if l.has_full_torrent(self.lengths.total_pieces() as usize) {
let prev = pe.value_mut().state.set_not_needed(&self.peer_stats());
let prev = pe.value_mut().set_not_needed(&self.peers);
let _ = prev
.take_live_no_counters()
.unwrap()
Expand All @@ -829,15 +824,75 @@ impl TorrentStateLive {
self.peers
.states
.iter_mut()
.filter_map(|mut p| {
let known_addr = *p.key();
p.value_mut()
.reconnect_not_needed_peer(known_addr, &self.peer_stats())
})
.filter_map(|mut p| p.value_mut().reconnect_not_needed_peer(&self.peers))
.map(|socket_addr| self.peer_queue_tx.send(socket_addr))
.take_while(|r| r.is_ok())
.last();
}

async fn task_send_pex_to_peer(
self: Arc<Self>,
_peer_addr: SocketAddr,
tx: PeerTx,
) -> anyhow::Result<()> {
// As per BEP 11 we should not send more than 50 peers at once
// (here it also applies to fist message, should be OK as we anyhow really have more)
const MAX_SENT_PEERS: usize = 50;
// As per BEP 11 recommended interval is min 60 seconds
const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60);

let mut connected = Vec::with_capacity(MAX_SENT_PEERS);
let mut dropped = Vec::with_capacity(MAX_SENT_PEERS);
let mut peer_view_of_live_peers = HashSet::new();

// Wait 10 seconds before sending the first message to assure that peer will stay with us
tokio::time::sleep(Duration::from_secs(10)).await;

let mut interval = tokio::time::interval(PEX_MESSAGE_INTERVAL);

loop {
interval.tick().await;

{
let live_peers = self.peers.live_outgoing_peers.read();
connected.clear();
dropped.clear();

connected.extend(
live_peers
.difference(&peer_view_of_live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
dropped.extend(
peer_view_of_live_peers
.difference(&live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
}

// BEP 11 - Dont send closed if they are now in live
// it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent,
// and addrs_closed_to_sent are only filtered addresses from sent_peers_live

if !connected.is_empty() || !dropped.is_empty() {
let pex_msg = extended::ut_pex::UtPex::from_addrs(&connected, &dropped);
let ext_msg = extended::ExtendedMessage::UtPex(pex_msg);
if tx
.send(WriterRequest::Message(Message::Extended(ext_msg)))
.is_err()
{
return Ok(()); // Peer disconnected
}

for addr in &dropped {
peer_view_of_live_peers.remove(addr);
}
peer_view_of_live_peers.extend(connected.iter().copied());
}
}
}
}

struct PeerHandlerLocked {
Expand Down Expand Up @@ -963,8 +1018,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
}

fn on_extended_handshake(&self, hs: &ExtendedHandshake<ByteBuf>) -> anyhow::Result<()> {
if let Some(peer_pex_msg_id) = hs.ut_pex() {
trace!("peer supports pex at {peer_pex_msg_id}");
if let Some(_peer_pex_msg_id) = hs.ut_pex() {
self.state.clone().spawn(
error_span!(
parent: self.state.torrent.span.clone(),
"sending_pex_to_peer",
peer = self.addr.to_string()
),
self.state
.clone()
.task_send_pex_to_peer(self.addr, self.tx.clone()),
);
}
// Lets update outgoing Socket address for incoming connection
if self.incoming {
Expand Down Expand Up @@ -1020,7 +1084,6 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
impl PeerHandler {
fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> {
let peers = &self.state.peers;
let pstats = self.state.peer_stats();
let handle = self.addr;
let mut pe = match peers.states.get_mut(&handle) {
Some(peer) => TimedExistence::new(peer, "on_peer_died"),
Expand All @@ -1029,7 +1092,7 @@ impl PeerHandler {
return Ok(());
}
};
let prev = pe.value_mut().state.take(&pstats);
let prev = pe.value_mut().take_state(peers);

match prev {
PeerState::Connecting(_) => {}
Expand All @@ -1048,7 +1111,7 @@ impl PeerHandler {
}
PeerState::NotNeeded => {
// Restore it as std::mem::take() replaced it above.
pe.value_mut().state.set(PeerState::NotNeeded, &pstats);
pe.value_mut().set_state(PeerState::NotNeeded, peers);
return Ok(());
}
s @ PeerState::Queued | s @ PeerState::Dead => {
Expand All @@ -1064,7 +1127,7 @@ impl PeerHandler {
Some(e) => e,
None => {
trace!("peer died without errors, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, &pstats);
pe.value_mut().set_state(PeerState::NotNeeded, peers);
return Ok(());
}
};
Expand All @@ -1073,11 +1136,11 @@ impl PeerHandler {

if self.state.is_finished_and_no_active_streams() {
debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, &pstats);
pe.value_mut().set_state(PeerState::NotNeeded, peers);
return Ok(());
}

pe.value_mut().state.set(PeerState::Dead, &pstats);
pe.value_mut().set_state(PeerState::Dead, peers);

if self.incoming {
// do not retry incoming peers
Expand Down Expand Up @@ -1108,9 +1171,9 @@ impl PeerHandler {
self.state
.peers
.with_peer_mut(handle, "dead_to_queued", |peer| {
match peer.state.get() {
match peer.get_state() {
PeerState::Dead => {
peer.state.set(PeerState::Queued, &self.state.peer_stats())
peer.set_state(PeerState::Queued, &self.state.peers)
}
other => bail!(
"peer is in unexpected state: {}. Expected dead",
Expand Down
Loading

0 comments on commit a1de63e

Please sign in to comment.