Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major refactoring #35

Merged
merged 40 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8e50829
1/n [exponential backoff peers]: refactor "tx" to be once per connect…
ikatson Nov 17, 2023
55e692d
2/n Wrap PeerState into peer
ikatson Nov 17, 2023
6ebf212
Peers now reconnect!
ikatson Nov 17, 2023
2203ffe
Fixed bugs
ikatson Nov 18, 2023
db12bba
Not even sure what I'm doing
ikatson Nov 18, 2023
48a1482
Changed log to tracing
ikatson Nov 19, 2023
d39479a
Small refactorings
ikatson Nov 19, 2023
a745257
Split up a couple methods
ikatson Nov 19, 2023
38c9902
Change peer states to dashmap
ikatson Nov 19, 2023
adf3eef
Removed a couple deadlocks
ikatson Nov 19, 2023
19c3fd7
Uninline some methods for easier backtrace debugging
ikatson Nov 19, 2023
2b84202
Debugged one more deadlock (or it was the same one)
ikatson Nov 19, 2023
1a55936
Nothing
ikatson Nov 19, 2023
ff71ade
timed existence for lock time debugging
ikatson Nov 19, 2023
2ad2881
timed existence for lock time debugging
ikatson Nov 19, 2023
3f0c4b7
Fix the NotNeeded warning
ikatson Nov 19, 2023
124c605
more counters
ikatson Nov 19, 2023
17d4082
Timing on_received_message
ikatson Nov 19, 2023
4b3da0b
Trying to see why it hangs for a bit sometimes
ikatson Nov 19, 2023
0c6781b
Saving
ikatson Nov 19, 2023
b891cd4
Remove a couple unused methods
ikatson Nov 19, 2023
b40d33b
Fix a bug with wrong number of queued peers
ikatson Nov 19, 2023
98dbecf
Fix a logging bug
ikatson Nov 19, 2023
0170b19
Remove inline-nevers
ikatson Nov 19, 2023
adb98a2
Add more docs
ikatson Nov 19, 2023
2ebbc0a
Add examples
ikatson Nov 19, 2023
d6cef09
Remove an instance of double-locking in the same scope
ikatson Nov 19, 2023
0c89ee9
Add parameter with_peers to stats_snapshot while its slow
ikatson Nov 19, 2023
22ea146
Starting to implement aggregate peer stats
ikatson Nov 19, 2023
aa99872
WTF is going on with counters
ikatson Nov 20, 2023
88c2f9e
Finally fixed a stupid tracing bug with counters
ikatson Nov 20, 2023
1238593
Remove old slow peer stats computation
ikatson Nov 20, 2023
4a331d9
Make default peer connect timeout less = 2s
ikatson Nov 20, 2023
a9794de
New bug showing up when torrent is downloading super fast
ikatson Nov 20, 2023
1de690a
nothing
ikatson Nov 20, 2023
34ee9d9
Remove Option<BF> to just BF
ikatson Nov 20, 2023
2695a8e
Preventively fixed other race conditions
ikatson Nov 20, 2023
e2f909c
Add some documentation
ikatson Nov 20, 2023
b751b98
Update versions to 3.0.0 as theres a lot of changes internally
ikatson Nov 20, 2023
1b68b0e
Remove unnecessary debugging files
ikatson Nov 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 188 additions & 75 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ members = [
panic = "abort"

[profile.release]
panic = "abort"
panic = "abort"
debug = true
19 changes: 7 additions & 12 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
- [x] Selective file downloading (mostly done)
- [x] Proper counting of how much is left, and how much is downloaded

- [x] Send bitfield at the start if I have something
- [x] use the "update_hash" function in piece checking
- [ ] signaling when file is done

- [ ] when we have the whole torrent, there's no point talking to peers that also have the whole torrent

- [ ] when we have the whole torrent, there's no point talking to peers that also have the whole torrent and keep reconnecting to them.
- [ ] per-file stats
- [ ] per-peer stats

- [x] slow peers cause slowness in the end, need the "end of game" algorithm
- [x (partial)] per-peer stats
- [x] use some concurrent hashmap e.g. flurry or dashmap
- [x] tracing instead of logging. Debugging peers: RUST_LOG=[{peer=.*}]=debug
test-log for tests
- [ ] reopen read only is bugged:
expected to be able to write to disk: error writing to file 0 (""The.Creator.2023.D.AMZN.WEB-DLRip.1.46Gb.MegaPeer.avi"")

someday:
- [ ] cancellation from the client-side for the lib (i.e. stop the torrent manager)
5 changes: 5 additions & 0 deletions crates/buffers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// This crate used for making working with &[u8] or Vec<u8> generic in other parts of librqbit,
// for nicer display of binary data etc.
//
// Not useful outside of librqbit.

use serde::{Deserialize, Deserializer};

use clone_to_owned::CloneToOwned;
Expand Down
8 changes: 8 additions & 0 deletions crates/clone_to_owned/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
// These are helpers for objects that can be borrowed, but can be made owned while changing the type.
// The difference between e.g. Cow and CloneToOwned, is that we can implement it recursively for owned types.
//
// E.g. HashMap<&str, &str> can be converted to HashMap<String, String>.
//
// This lets us express types like TorrentMetaInfo<&[u8]> for zero-copy metadata about a bencode buffer in memory,
// but to have one-line conversion for it into TorrentMetaInfo<Vec<u8>> so that we can store it later somewhere.

use std::collections::HashMap;

pub trait CloneToOwned {
Expand Down
8 changes: 4 additions & 4 deletions crates/dht/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "librqbit-dht"
version = "2.2.2"
version = "3.0.0"
edition = "2018"
description = "DHT implementation, used in rqbit torrent client."
license = "Apache-2.0"
Expand All @@ -25,14 +25,14 @@ hex = "0.4"
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
anyhow = "1"
parking_lot = "0.12"
log = "0.4"
pretty_env_logger = "0.5"
tracing = "0.1"
futures = "0.3"
rand = "0.8"
indexmap = "2"
directories = "5"

clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "3.0.0"}

[dev-dependencies]
tracing-subscriber = "0.3"
8 changes: 4 additions & 4 deletions crates/dht/src/main.rs → crates/dht/examples/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::{str::FromStr, time::Duration};

use anyhow::Context;
use librqbit_dht::{Dht, Id20};
use log::info;
use tokio_stream::StreamExt;
use tracing::info;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::init();

let info_hash = Id20::from_str("64a980abe6e448226bb930ba061592e44c3781a1").unwrap();
tracing_subscriber::fmt::init();

let dht = Dht::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash).await?;

Expand Down Expand Up @@ -42,7 +42,7 @@ async fn main() -> anyhow::Result<()> {

let peer_printer = async {
while let Some(peer) = stream.next().await {
log::info!("peer found: {}", peer)
info!("peer found: {}", peer)
}
Ok(())
};
Expand Down
4 changes: 2 additions & 2 deletions crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use bencode::ByteString;
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use indexmap::IndexSet;
use librqbit_core::{id20::Id20, peer_id::generate_peer_id};
use log::{debug, info, trace, warn};
use parking_lot::RwLock;
use rand::Rng;
use serde::Serialize;
Expand All @@ -26,6 +25,7 @@ use tokio::{
sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tracing::{debug, info, trace, warn};

#[derive(Debug, Serialize)]
pub struct DhtStats {
Expand Down Expand Up @@ -449,7 +449,7 @@ async fn run_framer(
Err(_) => break,
}
}
Err(e) => log::debug!("{}: error deserializing incoming message: {}", addr, e),
Err(e) => debug!("{}: error deserializing incoming message: {}", addr, e),
}
}
Err::<(), _>(anyhow::anyhow!(
Expand Down
2 changes: 1 addition & 1 deletion crates/dht/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::path::{Path, PathBuf};
use std::time::Duration;

use anyhow::Context;
use log::{debug, error, info, trace, warn};
use tokio::spawn;
use tracing::{debug, error, info, trace, warn};

use crate::dht::{Dht, DhtConfig};
use crate::routing_table::RoutingTable;
Expand Down
2 changes: 1 addition & 1 deletion crates/dht/src/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::{
};

use librqbit_core::id20::Id20;
use log::debug;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tracing::debug;

#[derive(Debug, Clone, Serialize, Deserialize)]
enum BucketTreeNodeData {
Expand Down
15 changes: 9 additions & 6 deletions crates/librqbit/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "librqbit"
version = "2.2.2"
version = "3.0.0-beta.0"
authors = ["Igor Katson <[email protected]>"]
edition = "2018"
description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it."
Expand All @@ -13,6 +13,7 @@ readme = "README.md"

[features]
default = ["sha1-system", "default-tls"]
timed_existence = []
sha1-system = ["sha1w/sha1-system"]
sha1-openssl = ["sha1w/sha1-openssl"]
sha1-rust = ["sha1w/sha1-rust"]
Expand All @@ -22,11 +23,11 @@ rust-tls = ["reqwest/rustls-tls"]
[dependencies]
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"}
librqbit-core = {path = "../librqbit_core", version = "2.2.2"}
librqbit-core = {path = "../librqbit_core", version = "3.0.0"}
clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "2.2.2"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.0.0"}
sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"}
dht = {path = "../dht", package="librqbit-dht", version="2.2.2"}
dht = {path = "../dht", package="librqbit-dht", version="3.0.0"}

tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
axum = {version = "0.6"}
Expand All @@ -43,7 +44,7 @@ byteorder = "1"
bincode = "1"
bitvec = "1"
parking_lot = "0.12"
log = "0.4"
tracing = "0.1.40"
size_format = "1"
rand = "0.8"

Expand All @@ -55,7 +56,9 @@ uuid = {version = "1.2", features = ["v4"]}
futures = "0.3"
url = "2"
hex = "0.4"
backoff = "0.4.0"
dashmap = "5.5.3"

[dev-dependencies]
futures = {version = "0.3"}
pretty_env_logger = "0.5"
tracing-subscriber = "0.3"
30 changes: 1 addition & 29 deletions crates/librqbit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,10 @@
A torrent library 100% written in rust

## Basic example
See ```examples``` folder.
This is a simple program on how to use this library
This program will just download a simple torrent file with a Magnet link

```rust
use std::error::Error;
use std::path::PathBuf;
use librqbit::session::{AddTorrentResponse, Session};
use librqbit::spawn_utils::BlockingSpawner;

const MAGNET_LINK: &str = "magnet:?..."; // Put your magnet link here

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>>{

// Create the session
let session = Session::new("C:\\Anime".parse().unwrap(), BlockingSpawner::new(false)).await?;

// Add the torrent to the session
let handle = match session.add_torrent(MAGNET_LINK, None).await {
Ok(AddTorrentResponse::Added(handle)) => {
Ok(handle)
},
Err(e) => {
eprintln!("Something goes wrong when downloading torrent : {:?}", e);
Err(())
}
_ => Err(())
}.expect("Failed to add torrent to the session");

// Wait until the download is completed
handle.wait_until_completed().await?;

Ok(())
}
```
69 changes: 69 additions & 0 deletions crates/librqbit/examples/ubuntu.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// For production-grade code look at rqbit::main(), which does the same but has more options.
//
// Usage:
// cargo run --release --example ubuntu /tmp/ubuntu/

use std::time::Duration;

use anyhow::Context;
use librqbit::session::{AddTorrentOptions, AddTorrentResponse, Session};
use tracing::info;

// This is ubuntu-21.04-live-server-amd64.iso.torrent
// You can also pass filenames and URLs to add_torrent().
const MAGNET_LINK: &str = "magnet:?xt=urn:btih:cab507494d02ebb1178b38f2e9d7be299c86b862";

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// Output logs to console.
tracing_subscriber::fmt::init();

let output_dir = std::env::args()
.nth(1)
.expect("the first argument should be the output directory");

// Create the session
let session = Session::new(output_dir.into(), Default::default())
.await
.context("error creating session")?;

// Add the torrent to the session
let handle = match session
.add_torrent(
MAGNET_LINK,
Some(AddTorrentOptions {
// Set this to true to allow writing on top of existing files.
// If the file is partially downloaded, librqbit will only download the
// missing pieces.
//
// Otherwise it will throw an error that the file exists.
overwrite: false,
..Default::default()
}),
)
.await
.context("error adding torrent")?
{
AddTorrentResponse::Added(handle) => handle,
// For a brand new session other variants won't happen.
_ => unreachable!(),
};

// Print stats periodically.
tokio::spawn({
let handle = handle.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let stats = handle.torrent_state().stats_snapshot();
info!("stats: {stats:?}");
}
}
});

// Wait until the download is completed
handle.wait_until_completed().await?;
info!("torrent downloaded");

Ok(())
}
19 changes: 5 additions & 14 deletions crates/librqbit/src/chunk_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex};
use log::{debug, info};
use peer_binary_protocol::Piece;
use tracing::{debug, info};

use crate::type_aliases::BF;

pub struct ChunkTracker {
// This forms the basis of a "queue" to pull from.
// It's set to 1 if we need a piece, but the moment we start requesting a peer,
// it's set to 0.

// Better to rename into piece_queue or smth, and maybe use some other form of a queue.
//
// Initially this is the opposite of "have", until we start making requests.
// An in-flight request is not in "needed", and not in "have".
needed_pieces: BF,

// This has a bit set per each chunk (block) that we have written to the output file.
Expand All @@ -21,6 +22,7 @@ pub struct ChunkTracker {

lengths: Lengths,

// What pieces to download first.
priority_piece_ids: Vec<usize>,
}

Expand Down Expand Up @@ -168,17 +170,6 @@ impl ChunkTracker {
piece.index, chunk_info, chunk_range,
);

// TODO: remove me, it's for debugging
// {
// use std::io::Write;
// let mut f = std::fs::OpenOptions::new()
// .write(true)
// .create(true)
// .open("/tmp/chunks")
// .unwrap();
// write!(f, "{:?}", &self.have).unwrap();
// }

if chunk_range.all() {
return Some(ChunkMarkingResult::Completed);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/librqbit/src/dht_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Context;
use buffers::ByteString;
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use log::debug;
use tracing::debug;

use crate::{
peer_connection::PeerConnectionOptions, peer_info_reader, spawn_utils::BlockingSpawner,
Expand Down Expand Up @@ -97,7 +97,7 @@ mod tests {
fn init_logging() {
#[allow(unused_must_use)]
LOG_INIT.call_once(|| {
pretty_env_logger::try_init();
// pretty_env_logger::try_init();
})
}

Expand Down
2 changes: 1 addition & 1 deletion crates/librqbit/src/file_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use librqbit_core::{
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
torrent_metainfo::{FileIteratorName, TorrentMetaV1Info},
};
use log::{debug, trace, warn};
use tracing::{debug, trace, warn};
use parking_lot::Mutex;
use peer_binary_protocol::Piece;
use sha1w::ISha1;
Expand Down
Loading