Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fastresume: check at least one piece from each file + windows fix #235

Merged
merged 2 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/librqbit/src/bitv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use bitvec::{
vec::BitVec,
view::{AsBits, AsMutBits},
};
use tracing::trace;

pub trait BitV: Send + Sync {
fn as_slice(&self) -> &BitSlice<u8, Msb0>;
Expand All @@ -24,6 +25,12 @@ pub struct MmapBitV {
mmap: memmap2::MmapMut,
}

impl Drop for MmapBitV {
fn drop(&mut self) {
trace!("dropping MmapBitV, this should unmap the .bitv file")
}
}

impl MmapBitV {
pub fn new(file: File) -> anyhow::Result<Self> {
let mmap =
Expand Down
16 changes: 8 additions & 8 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,14 +1251,6 @@ impl Session {
debug!("error pausing torrent before deletion: {e:?}")
}

if let Some(p) = self.persistence.as_ref() {
if let Err(e) = p.delete(id).await {
error!(error=?e, "error deleting torrent from persistence database");
} else {
debug!(?id, "deleted torrent from persistence database")
}
}

let storage = removed
.with_state_mut(|s| match s.take() {
ManagedTorrentState::Initializing(p) => p.files.take().ok(),
Expand All @@ -1277,6 +1269,14 @@ impl Session {
.map(Ok)
.unwrap_or_else(|| removed.shared.storage_factory.create(removed.shared()));

if let Some(p) = self.persistence.as_ref() {
if let Err(e) = p.delete(id).await {
error!(error=?e, "error deleting torrent from persistence database");
} else {
debug!(?id, "deleted torrent from persistence database")
}
}

match (storage, delete_files) {
(Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"),
(Ok(storage), true) => {
Expand Down
55 changes: 44 additions & 11 deletions crates/librqbit/src/torrent_state/initializing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,60 @@ impl TorrentStateInitializing {

use rand::seq::SliceRandom;

let mut have_pieces = hp
.as_slice()
.iter_ones()
.filter_map(|i| self.shared.lengths.validate_piece_index(i.try_into().ok()?))
.collect_vec();
have_pieces.shuffle(&mut rand::thread_rng());
let mut to_validate = BF::from_boxed_slice(
vec![0u8; self.shared.lengths.piece_bitfield_bytes()].into_boxed_slice(),
);
let mut queue = hp.as_slice().to_owned();

// Validate at least one piece from each file, if we claim we have it.
for fi in self.shared.file_infos.iter() {
let prange = fi.piece_range_usize();
let offset = prange.start;
for piece_id in hp
.as_slice()
.get(fi.piece_range_usize())
.into_iter()
.flat_map(|s| s.iter_ones())
.map(|pid| pid + offset)
.take(1)
{
to_validate.set(piece_id, true);
queue.set(piece_id, false);
}
}

// Validate a certain threshold of fastresume pieces with decreasing probability of actual disk reads.
for (tmp_id, hpiece) in have_pieces.iter().enumerate() {
// For all the remaining pieces we claim we have, validate them with decreasing probability.
let mut queue = queue.iter_ones().collect_vec();
queue.shuffle(&mut rand::thread_rng());
for (tmp_id, piece_id) in queue.into_iter().enumerate() {
let denom: u32 = (tmp_id + 1).min(50).try_into().unwrap();
if rand::thread_rng().gen_ratio(1, denom) && fo.check_piece(*hpiece).is_err() {
if rand::thread_rng().gen_ratio(1, denom) {
to_validate.set(piece_id, true);
}
}

let to_validate_count = to_validate.count_ones();
for (id, piece_id) in to_validate
.iter_ones()
.filter_map(|id| {
self.shared
.lengths
.validate_piece_index(id.try_into().ok()?)
})
.enumerate()
{
if fo.check_piece(piece_id).is_err() {
return true;
}

#[allow(clippy::cast_possible_truncation)]
let progress = (self.shared.lengths.total_length() as f64
/ have_pieces.len() as f64
* (tmp_id + 1) as f64) as u64;
/ to_validate_count as f64
* (id + 1) as f64) as u64;
let progress = progress.min(self.shared.lengths.total_length());
self.checked_bytes.store(progress, Ordering::Relaxed);
}

false
});

Expand Down