diff --git a/Cargo.lock b/Cargo.lock index b6b41598..6b7eae70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,6 +146,17 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom", + "instant", + "rand", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -353,6 +364,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "digest" version = "0.10.7" @@ -399,19 +423,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "env_logger" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece" -dependencies = [ - "humantime", - "is-terminal", - "log", - "regex", - "termcolor", -] - [[package]] name = "equivalent" version = "1.0.1" @@ -434,6 +445,12 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "fnv" version = "1.0.7" @@ -678,12 +695,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" version = "0.14.27" @@ -756,21 +767,19 @@ dependencies = [ ] [[package]] -name = "ipnet" -version = "2.9.0" +name = "instant" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] [[package]] -name = "is-terminal" -version = "0.4.9" +name = "ipnet" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" -dependencies = [ - "hermit-abi", - "rustix", - "windows-sys", -] +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "itertools" @@ -821,14 +830,16 @@ dependencies = [ [[package]] name = "librqbit" -version = "2.2.2" +version = "3.0.0-beta.0" dependencies = [ "anyhow", "axum", + "backoff", "bincode", "bitvec", "byteorder", "crypto-hash", + "dashmap", "futures", "hex 0.4.3", "http", @@ -839,10 +850,8 @@ dependencies = [ "librqbit-dht", "librqbit-peer-protocol", "librqbit-sha1-wrapper", - "log", "openssl", "parking_lot", - "pretty_env_logger", "rand", "regex", "reqwest", @@ -853,6 +862,8 @@ dependencies = [ "size_format", "tokio", "tokio-stream", + "tracing", + "tracing-subscriber", "url", "urlencoding", "uuid", @@ -883,7 +894,7 @@ version = "2.2.1" [[package]] name = "librqbit-core" -version = "2.2.2" +version = "3.0.0" dependencies = [ "anyhow", "hex 0.4.3", @@ -891,7 +902,6 @@ dependencies = [ "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", - "log", "parking_lot", "serde", "url", @@ -900,7 +910,7 @@ dependencies = [ [[package]] name = "librqbit-dht" -version = "2.2.2" +version = "3.0.0" dependencies = [ "anyhow", "directories", @@ -910,19 +920,19 @@ dependencies = [ "librqbit-bencode", "librqbit-clone-to-owned", "librqbit-core", - "log", "parking_lot", - "pretty_env_logger", "rand", "serde", "serde_json", "tokio", "tokio-stream", + "tracing", + "tracing-subscriber", ] [[package]] name = "librqbit-peer-protocol" -version = "2.2.2" +version = "3.0.0" dependencies = [ "anyhow", "bincode", @@ -966,6 +976,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1022,6 +1041,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.2.1" @@ -1174,6 +1203,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1190,10 +1225,13 @@ version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ + "backtrace", "cfg-if", "libc", + "petgraph", "redox_syscall", "smallvec", + "thread-id", "windows-targets", ] @@ -1214,6 +1252,16 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -1258,16 +1306,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pretty_env_logger" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" -dependencies = [ - "env_logger", - "log", -] - [[package]] name = "proc-macro2" version = "1.0.69" @@ -1350,8 +1388,17 @@ checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1362,9 +1409,15 @@ checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.2", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.2" @@ -1430,22 +1483,23 @@ dependencies = [ [[package]] name = "rqbit" -version = "2.2.2" +version = "3.0.0-beta.0" dependencies = [ "anyhow", "clap", "futures", "librqbit", "librqbit-dht", - "log", + "parking_lot", "parse_duration", - "pretty_env_logger", "regex", "reqwest", "serde", "serde_json", "size_format", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -1469,9 +1523,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.8" +version = "0.21.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "446e14c5cda4f3f30fe71863c34ec70f5ac79d6087097ad0bb433e1be5edf04c" +checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" dependencies = [ "log", "ring", @@ -1622,6 +1676,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "size_format" version = "1.0.2" @@ -1736,15 +1799,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "termcolor" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff1bc3d3f05aff0403e8ac0d92ced918ec05b666a43f83297ccef5bea8a3d449" -dependencies = [ - "winapi-util", -] - [[package]] name = "thiserror" version = "1.0.50" @@ -1765,6 +1819,26 @@ dependencies = [ "syn", ] +[[package]] +name = "thread-id" +version = "4.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0ec81c46e9eb50deaa257be2f148adf052d1fb7701cfd55ccfab2525280b70b" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -1890,9 +1964,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -1900,6 +1986,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -1973,6 +2089,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" @@ -2098,15 +2220,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 09adc48c..42dc033d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,5 @@ members = [ panic = "abort" [profile.release] -panic = "abort" \ No newline at end of file +panic = "abort" +debug = true \ No newline at end of file diff --git a/TODO.md b/TODO.md index 13194fce..61067186 100644 --- a/TODO.md +++ b/TODO.md @@ -1,16 +1,11 @@ -- [x] Selective file downloading (mostly done) - - [x] Proper counting of how much is left, and how much is downloaded - -- [x] Send bitfield at the start if I have something -- [x] use the "update_hash" function in piece checking -- [ ] signaling when file is done - -- [ ] when we have the whole torrent, there's no point talking to peers that also have the whole torrent - +- [ ] when we have the whole torrent, there's no point talking to peers that also have the whole torrent and keep reconnecting to them. - [ ] per-file stats -- [ ] per-peer stats - -- [x] slow peers cause slowness in the end, need the "end of game" algorithm +- [x (partial)] per-peer stats +- [x] use some concurrent hashmap e.g. flurry or dashmap +- [x] tracing instead of logging. Debugging peers: RUST_LOG=[{peer=.*}]=debug + test-log for tests +- [ ] reopen read only is bugged: + expected to be able to write to disk: error writing to file 0 (""The.Creator.2023.D.AMZN.WEB-DLRip.1.46Gb.MegaPeer.avi"") someday: - [ ] cancellation from the client-side for the lib (i.e. stop the torrent manager) \ No newline at end of file diff --git a/crates/buffers/src/lib.rs b/crates/buffers/src/lib.rs index d6ced8e8..b45c891f 100644 --- a/crates/buffers/src/lib.rs +++ b/crates/buffers/src/lib.rs @@ -1,3 +1,8 @@ +// This crate used for making working with &[u8] or Vec generic in other parts of librqbit, +// for nicer display of binary data etc. +// +// Not useful outside of librqbit. + use serde::{Deserialize, Deserializer}; use clone_to_owned::CloneToOwned; diff --git a/crates/clone_to_owned/src/lib.rs b/crates/clone_to_owned/src/lib.rs index e661cd08..561033f8 100644 --- a/crates/clone_to_owned/src/lib.rs +++ b/crates/clone_to_owned/src/lib.rs @@ -1,3 +1,11 @@ +// These are helpers for objects that can be borrowed, but can be made owned while changing the type. +// The difference between e.g. Cow and CloneToOwned, is that we can implement it recursively for owned types. +// +// E.g. HashMap<&str, &str> can be converted to HashMap. +// +// This lets us express types like TorrentMetaInfo<&[u8]> for zero-copy metadata about a bencode buffer in memory, +// but to have one-line conversion for it into TorrentMetaInfo> so that we can store it later somewhere. + use std::collections::HashMap; pub trait CloneToOwned { diff --git a/crates/dht/Cargo.toml b/crates/dht/Cargo.toml index c9ae8a75..b2f22403 100644 --- a/crates/dht/Cargo.toml +++ b/crates/dht/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit-dht" -version = "2.2.2" +version = "3.0.0" edition = "2018" description = "DHT implementation, used in rqbit torrent client." license = "Apache-2.0" @@ -25,14 +25,14 @@ hex = "0.4" bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} anyhow = "1" parking_lot = "0.12" -log = "0.4" -pretty_env_logger = "0.5" +tracing = "0.1" futures = "0.3" rand = "0.8" indexmap = "2" directories = "5" clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} -librqbit-core = {path="../librqbit_core", version = "2.2.1"} +librqbit-core = {path="../librqbit_core", version = "3.0.0"} [dev-dependencies] +tracing-subscriber = "0.3" \ No newline at end of file diff --git a/crates/dht/src/main.rs b/crates/dht/examples/dht.rs similarity index 94% rename from crates/dht/src/main.rs rename to crates/dht/examples/dht.rs index fc6f5246..469e0f6e 100644 --- a/crates/dht/src/main.rs +++ b/crates/dht/examples/dht.rs @@ -2,14 +2,14 @@ use std::{str::FromStr, time::Duration}; use anyhow::Context; use librqbit_dht::{Dht, Id20}; -use log::info; use tokio_stream::StreamExt; +use tracing::info; #[tokio::main] async fn main() -> anyhow::Result<()> { - pretty_env_logger::init(); - let info_hash = Id20::from_str("64a980abe6e448226bb930ba061592e44c3781a1").unwrap(); + tracing_subscriber::fmt::init(); + let dht = Dht::new().await.context("error initializing DHT")?; let mut stream = dht.get_peers(info_hash).await?; @@ -42,7 +42,7 @@ async fn main() -> anyhow::Result<()> { let peer_printer = async { while let Some(peer) = stream.next().await { - log::info!("peer found: {}", peer) + info!("peer found: {}", peer) } Ok(()) }; diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index a1b23ba5..970d78ce 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -17,7 +17,6 @@ use bencode::ByteString; use futures::{stream::FuturesUnordered, Stream, StreamExt}; use indexmap::IndexSet; use librqbit_core::{id20::Id20, peer_id::generate_peer_id}; -use log::{debug, info, trace, warn}; use parking_lot::RwLock; use rand::Rng; use serde::Serialize; @@ -26,6 +25,7 @@ use tokio::{ sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, }; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; +use tracing::{debug, info, trace, warn}; #[derive(Debug, Serialize)] pub struct DhtStats { @@ -449,7 +449,7 @@ async fn run_framer( Err(_) => break, } } - Err(e) => log::debug!("{}: error deserializing incoming message: {}", addr, e), + Err(e) => debug!("{}: error deserializing incoming message: {}", addr, e), } } Err::<(), _>(anyhow::anyhow!( diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index affc9811..d22ad6b8 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -8,8 +8,8 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use anyhow::Context; -use log::{debug, error, info, trace, warn}; use tokio::spawn; +use tracing::{debug, error, info, trace, warn}; use crate::dht::{Dht, DhtConfig}; use crate::routing_table::RoutingTable; diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 89cd4042..5118c81a 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -4,8 +4,8 @@ use std::{ }; use librqbit_core::id20::Id20; -use log::debug; use serde::{ser::SerializeMap, Deserialize, Serialize}; +use tracing::debug; #[derive(Debug, Clone, Serialize, Deserialize)] enum BucketTreeNodeData { diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index e5e882c1..46c1c159 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit" -version = "2.2.2" +version = "3.0.0-beta.0" authors = ["Igor Katson "] edition = "2018" description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it." @@ -13,6 +13,7 @@ readme = "README.md" [features] default = ["sha1-system", "default-tls"] +timed_existence = [] sha1-system = ["sha1w/sha1-system"] sha1-openssl = ["sha1w/sha1-openssl"] sha1-rust = ["sha1w/sha1-rust"] @@ -22,11 +23,11 @@ rust-tls = ["reqwest/rustls-tls"] [dependencies] bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"} -librqbit-core = {path = "../librqbit_core", version = "2.2.2"} +librqbit-core = {path = "../librqbit_core", version = "3.0.0"} clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} -peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "2.2.2"} +peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.0.0"} sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"} -dht = {path = "../dht", package="librqbit-dht", version="2.2.2"} +dht = {path = "../dht", package="librqbit-dht", version="3.0.0"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} axum = {version = "0.6"} @@ -43,7 +44,7 @@ byteorder = "1" bincode = "1" bitvec = "1" parking_lot = "0.12" -log = "0.4" +tracing = "0.1.40" size_format = "1" rand = "0.8" @@ -55,7 +56,9 @@ uuid = {version = "1.2", features = ["v4"]} futures = "0.3" url = "2" hex = "0.4" +backoff = "0.4.0" +dashmap = "5.5.3" [dev-dependencies] futures = {version = "0.3"} -pretty_env_logger = "0.5" \ No newline at end of file +tracing-subscriber = "0.3" \ No newline at end of file diff --git a/crates/librqbit/README.md b/crates/librqbit/README.md index dfa4845f..0e33b145 100644 --- a/crates/librqbit/README.md +++ b/crates/librqbit/README.md @@ -3,38 +3,10 @@ A torrent library 100% written in rust ## Basic example +See ```examples``` folder. This is a simple program on how to use this library This program will just download a simple torrent file with a Magnet link ```rust -use std::error::Error; -use std::path::PathBuf; -use librqbit::session::{AddTorrentResponse, Session}; -use librqbit::spawn_utils::BlockingSpawner; -const MAGNET_LINK: &str = "magnet:?..."; // Put your magnet link here - -#[tokio::main] -async fn main() -> Result<(), Box>{ - - // Create the session - let session = Session::new("C:\\Anime".parse().unwrap(), BlockingSpawner::new(false)).await?; - - // Add the torrent to the session - let handle = match session.add_torrent(MAGNET_LINK, None).await { - Ok(AddTorrentResponse::Added(handle)) => { - Ok(handle) - }, - Err(e) => { - eprintln!("Something goes wrong when downloading torrent : {:?}", e); - Err(()) - } - _ => Err(()) - }.expect("Failed to add torrent to the session"); - - // Wait until the download is completed - handle.wait_until_completed().await?; - - Ok(()) -} ``` \ No newline at end of file diff --git a/crates/librqbit/examples/ubuntu.rs b/crates/librqbit/examples/ubuntu.rs new file mode 100644 index 00000000..68130434 --- /dev/null +++ b/crates/librqbit/examples/ubuntu.rs @@ -0,0 +1,69 @@ +// For production-grade code look at rqbit::main(), which does the same but has more options. +// +// Usage: +// cargo run --release --example ubuntu /tmp/ubuntu/ + +use std::time::Duration; + +use anyhow::Context; +use librqbit::session::{AddTorrentOptions, AddTorrentResponse, Session}; +use tracing::info; + +// This is ubuntu-21.04-live-server-amd64.iso.torrent +// You can also pass filenames and URLs to add_torrent(). +const MAGNET_LINK: &str = "magnet:?xt=urn:btih:cab507494d02ebb1178b38f2e9d7be299c86b862"; + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + // Output logs to console. + tracing_subscriber::fmt::init(); + + let output_dir = std::env::args() + .nth(1) + .expect("the first argument should be the output directory"); + + // Create the session + let session = Session::new(output_dir.into(), Default::default()) + .await + .context("error creating session")?; + + // Add the torrent to the session + let handle = match session + .add_torrent( + MAGNET_LINK, + Some(AddTorrentOptions { + // Set this to true to allow writing on top of existing files. + // If the file is partially downloaded, librqbit will only download the + // missing pieces. + // + // Otherwise it will throw an error that the file exists. + overwrite: false, + ..Default::default() + }), + ) + .await + .context("error adding torrent")? + { + AddTorrentResponse::Added(handle) => handle, + // For a brand new session other variants won't happen. + _ => unreachable!(), + }; + + // Print stats periodically. + tokio::spawn({ + let handle = handle.clone(); + async move { + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + let stats = handle.torrent_state().stats_snapshot(); + info!("stats: {stats:?}"); + } + } + }); + + // Wait until the download is completed + handle.wait_until_completed().await?; + info!("torrent downloaded"); + + Ok(()) +} diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index c0135887..e223d06e 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -1,6 +1,6 @@ use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex}; -use log::{debug, info}; use peer_binary_protocol::Piece; +use tracing::{debug, info}; use crate::type_aliases::BF; @@ -8,8 +8,9 @@ pub struct ChunkTracker { // This forms the basis of a "queue" to pull from. // It's set to 1 if we need a piece, but the moment we start requesting a peer, // it's set to 0. - - // Better to rename into piece_queue or smth, and maybe use some other form of a queue. + // + // Initially this is the opposite of "have", until we start making requests. + // An in-flight request is not in "needed", and not in "have". needed_pieces: BF, // This has a bit set per each chunk (block) that we have written to the output file. @@ -21,6 +22,7 @@ pub struct ChunkTracker { lengths: Lengths, + // What pieces to download first. priority_piece_ids: Vec, } @@ -168,17 +170,6 @@ impl ChunkTracker { piece.index, chunk_info, chunk_range, ); - // TODO: remove me, it's for debugging - // { - // use std::io::Write; - // let mut f = std::fs::OpenOptions::new() - // .write(true) - // .create(true) - // .open("/tmp/chunks") - // .unwrap(); - // write!(f, "{:?}", &self.have).unwrap(); - // } - if chunk_range.all() { return Some(ChunkMarkingResult::Completed); } diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index 56fd46ec..2c9f3fc5 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -4,7 +4,7 @@ use anyhow::Context; use buffers::ByteString; use futures::{stream::FuturesUnordered, Stream, StreamExt}; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; -use log::debug; +use tracing::debug; use crate::{ peer_connection::PeerConnectionOptions, peer_info_reader, spawn_utils::BlockingSpawner, @@ -97,7 +97,7 @@ mod tests { fn init_logging() { #[allow(unused_must_use)] LOG_INIT.call_once(|| { - pretty_env_logger::try_init(); + // pretty_env_logger::try_init(); }) } diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index a635fe29..32e2bb3b 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -11,7 +11,7 @@ use librqbit_core::{ lengths::{ChunkInfo, Lengths, ValidPieceIndex}, torrent_metainfo::{FileIteratorName, TorrentMetaV1Info}, }; -use log::{debug, trace, warn}; +use tracing::{debug, trace, warn}; use parking_lot::Mutex; use peer_binary_protocol::Piece; use sha1w::ISha1; diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 93deadf5..e63156fa 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -7,12 +7,12 @@ use dht::{Dht, DhtStats}; use http::StatusCode; use librqbit_core::id20::Id20; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; -use log::warn; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; +use tracing::{info, warn}; use axum::Router; @@ -110,7 +110,7 @@ impl HttpApi { .route("/torrents/:id/stats", get(torrent_stats)) .with_state(state); - log::info!("starting HTTP server on {}", addr); + info!("starting HTTP server on {}", addr); axum::Server::try_bind(&addr) .with_context(|| format!("error binding to {addr}"))? .serve(app.into_make_service()) @@ -341,7 +341,10 @@ impl ApiInternal { let mgr = self.mgr_handle(idx)?; Ok(format!( "{:?}", - mgr.torrent_state().lock_read().chunks.get_have_pieces(), + mgr.torrent_state() + .lock_read("api_dump_haves") + .chunks + .get_have_pieces(), )) } } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index ecf4ef15..097d45e7 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -5,7 +5,6 @@ pub mod http_api; pub mod http_api_client; mod http_api_error; pub mod peer_connection; -pub mod peer_handler; pub mod peer_info_reader; pub mod peer_state; pub mod session; diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index af35a025..0814a9f8 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -4,13 +4,13 @@ use anyhow::Context; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; use librqbit_core::{id20::Id20, lengths::ChunkInfo, peer_id::try_decode_peer_id}; -use log::{debug, trace}; use peer_binary_protocol::{ extended::{handshake::ExtendedHandshake, ExtendedMessage}, serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, PIECE_MESSAGE_DEFAULT_LEN, }; use tokio::time::timeout; +use tracing::{debug, trace}; use crate::spawn_utils::BlockingSpawner; @@ -31,6 +31,7 @@ pub trait PeerConnectionHandler { pub enum WriterRequest { Message(MessageOwned), ReadChunkRequest(ChunkInfo), + Disconnect, } #[derive(Default, Copy, Clone)] @@ -56,10 +57,10 @@ async fn with_timeout( where E: Into, { - timeout(timeout_value, fut) - .await - .with_context(|| format!("timeout at {timeout_value:?}"))? - .map_err(|e| e.into()) + match timeout(timeout_value, fut).await { + Ok(v) => v.map_err(Into::into), + Err(_) => anyhow::bail!("timeout at {timeout_value:?}"), + } } macro_rules! read_one { @@ -148,11 +149,7 @@ impl PeerConnection { let (h, size) = Handshake::deserialize(&read_buf[..read_so_far]) .map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?; - debug!( - "connected peer {}: {:?}", - self.addr, - try_decode_peer_id(Id20(h.peer_id)) - ); + debug!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id))); if h.info_hash != self.info_hash.0 { anyhow::bail!("info hash does not match"); } @@ -169,11 +166,7 @@ impl PeerConnection { if supports_extended { let my_extended = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new())); - trace!( - "sending extended handshake to {}: {:?}", - self.addr, - &my_extended - ); + trace!("sending extended handshake: {:?}", &my_extended); my_extended.serialize(&mut write_buf, None).unwrap(); with_timeout(rwtimeout, conn.write_all(&write_buf)) .await @@ -183,7 +176,7 @@ impl PeerConnection { let (extended, size) = read_one!(conn, read_buf, read_so_far, rwtimeout); match extended { Message::Extended(ExtendedMessage::Handshake(h)) => { - trace!("received from {}: {:?}", self.addr, &h); + trace!("received: {:?}", &h); self.handler.on_extended_handshake(&h)?; extended_handshake = Some(h.clone_to_owned()) } @@ -212,7 +205,7 @@ impl PeerConnection { with_timeout(rwtimeout, write_half.write_all(&write_buf[..len])) .await .context("error writing bitfield to peer")?; - debug!("sent bitfield to {}", self.addr); + debug!("sent bitfield"); } } @@ -247,9 +240,12 @@ impl PeerConnection { uploaded_add = Some(chunk.size); full_len } + WriterRequest::Disconnect => { + return Ok(()); + } }; - debug!("sending to {}: {:?}, length={}", self.addr, &req, len); + debug!("sending: {:?}, length={}", &req, len); with_timeout(rwtimeout, write_half.write_all(&write_buf[..len])) .await @@ -269,7 +265,7 @@ impl PeerConnection { let reader = async move { loop { let (message, size) = read_one!(read_half, read_buf, read_so_far, rwtimeout); - trace!("received from {}: {:?}", self.addr, &message); + trace!("received: {:?}", &message); self.handler .on_received_message(message) @@ -290,7 +286,7 @@ impl PeerConnection { r = reader => {r} r = writer => {r} }; - debug!("{}: either reader or writer are done, exiting", self.addr); + debug!("either reader or writer are done, exiting"); r } } diff --git a/crates/librqbit/src/peer_handler.rs b/crates/librqbit/src/peer_handler.rs deleted file mode 100644 index 8b137891..00000000 --- a/crates/librqbit/src/peer_handler.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index 927958a8..fca366cf 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -8,7 +8,6 @@ use librqbit_core::{ lengths::{ceil_div_u64, last_element_size_u64, ChunkInfo}, torrent_metainfo::TorrentMetaV1Info, }; -use log::debug; use parking_lot::{Mutex, RwLock}; use peer_binary_protocol::{ extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage}, @@ -16,6 +15,7 @@ use peer_binary_protocol::{ }; use sha1w::{ISha1, Sha1}; use tokio::sync::mpsc::UnboundedSender; +use tracing::debug; use crate::{ peer_connection::{ @@ -238,7 +238,7 @@ mod tests { fn init_logging() { #[allow(unused_must_use)] LOG_INIT.call_once(|| { - pretty_env_logger::try_init(); + // pretty_env_logger::try_init(); }) } diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 27bfa632..d076e2f0 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -1,9 +1,16 @@ +use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::Duration; use std::{collections::HashSet, sync::Arc}; +use anyhow::Context; +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use librqbit_core::id20::Id20; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; +use serde::Serialize; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::{Notify, Semaphore}; +use crate::peer_connection::WriterRequest; use crate::type_aliases::BF; #[derive(Debug, Hash, PartialEq, Eq)] @@ -21,11 +28,194 @@ impl From<&ChunkInfo> for InflightRequest { } } +// TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak. +pub type PeerRx = UnboundedReceiver; +pub type PeerTx = UnboundedSender; + +pub trait SendMany { + fn send_many(&self, requests: impl IntoIterator) -> anyhow::Result<()>; +} + +impl SendMany for PeerTx { + fn send_many(&self, requests: impl IntoIterator) -> anyhow::Result<()> { + requests + .into_iter() + .try_for_each(|r| self.send(r)) + .context("peer dropped") + } +} + #[derive(Debug)] +pub struct PeerStats { + pub backoff: ExponentialBackoff, +} + +impl Default for PeerStats { + fn default() -> Self { + Self { + backoff: ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_secs(10)) + .with_multiplier(6.) + .with_max_interval(Duration::from_secs(3600)) + .with_max_elapsed_time(Some(Duration::from_secs(86400))) + .build(), + } + } +} + +#[derive(Debug, Default)] +pub struct Peer { + pub state: PeerStateNoMut, + pub stats: PeerStats, +} + +#[derive(Debug, Default, Serialize)] +pub struct AggregatePeerStatsAtomic { + pub queued: AtomicU32, + pub connecting: AtomicU32, + pub live: AtomicU32, + pub seen: AtomicU32, + pub dead: AtomicU32, + pub not_needed: AtomicU32, +} + +pub fn atomic_inc(c: &AtomicU32) -> u32 { + c.fetch_add(1, Ordering::Relaxed) +} + +pub fn atomic_dec(c: &AtomicU32) -> u32 { + c.fetch_sub(1, Ordering::Relaxed) +} + +impl AggregatePeerStatsAtomic { + pub fn counter(&self, state: &PeerState) -> &AtomicU32 { + match state { + PeerState::Connecting(_) => &self.connecting, + PeerState::Live(_) => &self.live, + PeerState::Queued => &self.queued, + PeerState::Dead => &self.dead, + PeerState::NotNeeded => &self.not_needed, + } + } + + pub fn inc(&self, state: &PeerState) { + atomic_inc(self.counter(state)); + } + + pub fn dec(&self, state: &PeerState) { + atomic_dec(self.counter(state)); + } + + pub fn incdec(&self, old: &PeerState, new: &PeerState) { + self.dec(old); + self.inc(new); + } +} + +#[derive(Debug, Default)] pub enum PeerState { + #[default] + // Will be tried to be connected as soon as possible. Queued, - Connecting, + Connecting(PeerTx), Live(LivePeerState), + // There was an error, and it's waiting for exponential backoff. + Dead, + // We don't need to do anything with the peer any longer. + // The peer has the full torrent, and we have the full torrent, so no need + // to keep talking to it. + NotNeeded, +} + +impl std::fmt::Display for PeerState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} + +impl PeerState { + pub fn name(&self) -> &'static str { + match self { + PeerState::Queued => "queued", + PeerState::Connecting(_) => "connecting", + PeerState::Live(_) => "live", + PeerState::Dead => "dead", + PeerState::NotNeeded => "not needed", + } + } + + pub fn take_live_no_counters(self) -> Option { + match self { + PeerState::Live(l) => Some(l), + _ => None, + } + } +} + +#[derive(Debug, Default)] +pub struct PeerStateNoMut(PeerState); + +impl PeerStateNoMut { + pub fn get(&self) -> &PeerState { + &self.0 + } + + pub fn take(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + self.set(Default::default(), counters) + } + + pub fn set(&mut self, new: PeerState, counters: &AggregatePeerStatsAtomic) -> PeerState { + counters.incdec(&self.0, &new); + std::mem::replace(&mut self.0, new) + } + + pub fn get_live(&self) -> Option<&LivePeerState> { + match &self.0 { + PeerState::Live(l) => Some(l), + _ => None, + } + } + + pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { + match &mut self.0 { + PeerState::Live(l) => Some(l), + _ => None, + } + } + + pub fn queued_to_connecting(&mut self, counters: &AggregatePeerStatsAtomic) -> Option { + if let PeerState::Queued = &self.0 { + let (tx, rx) = unbounded_channel(); + self.set(PeerState::Connecting(tx), counters); + Some(rx) + } else { + None + } + } + pub fn connecting_to_live( + &mut self, + peer_id: Id20, + counters: &AggregatePeerStatsAtomic, + ) -> Option<&mut LivePeerState> { + if let PeerState::Connecting(_) = &self.0 { + let tx = match self.take(counters) { + PeerState::Connecting(tx) => tx, + _ => unreachable!(), + }; + self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters); + self.get_live_mut() + } else { + None + } + } + + pub fn to_dead(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + self.set(PeerState::Dead, counters) + } + + pub fn to_not_needed(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + self.set(PeerState::NotNeeded, counters) + } } #[derive(Debug)] @@ -33,22 +223,45 @@ pub struct LivePeerState { pub peer_id: Id20, pub i_am_choked: bool, pub peer_interested: bool, + + // This is used to limit the number of chunk requests we send to a peer at a time. pub requests_sem: Arc, + + // This is used to unpause processes after we were choked. pub have_notify: Arc, - pub bitfield: Option, + + // This is used to track the pieces the peer has. + pub bitfield: BF, + + // This is used to only request a piece from a peer once when stealing from others. + // So that you don't steal then re-steal the same piece in a loop. + pub previously_requested_pieces: BF, + + // When the peer sends us data this is used to track if we asked for it. pub inflight_requests: HashSet, + + // The main channel to send requests to peer. + pub tx: PeerTx, } impl LivePeerState { - pub fn new(peer_id: Id20) -> Self { + pub fn new(peer_id: Id20, tx: PeerTx) -> Self { LivePeerState { peer_id, i_am_choked: true, peer_interested: false, - bitfield: None, + bitfield: BF::new(), + previously_requested_pieces: BF::new(), have_notify: Arc::new(Notify::new()), requests_sem: Arc::new(Semaphore::new(0)), inflight_requests: Default::default(), + tx, } } + + pub fn has_full_torrent(&self, total_pieces: usize) -> bool { + self.bitfield + .get(0..total_pieces) + .map_or(false, |s| s.all()) + } } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 07230039..8033572f 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -8,10 +8,10 @@ use librqbit_core::{ peer_id::generate_peer_id, torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned}, }; -use log::{debug, info, warn}; use parking_lot::RwLock; use reqwest::Url; use tokio_stream::StreamExt; +use tracing::{debug, info, span, warn, Level}; use crate::{ dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, @@ -251,7 +251,10 @@ impl Session { torrent_from_file(url)? }; let dht_rx = match self.dht.as_ref() { - Some(dht) => Some(dht.get_peers(torrent.info_hash).await?), + Some(dht) => { + debug!("reading peers for {:?} from DHT", torrent.info_hash); + Some(dht.get_peers(torrent.info_hash).await?) + } None => None, }; let trackers = torrent @@ -402,7 +405,7 @@ impl Session { } if let Some(mut dht_peer_rx) = dht_peer_rx { - spawn("DHT peer adder", { + spawn(span!(Level::INFO, "dht_peer_adder"), { let handle = handle.clone(); async move { while let Some(peer) = dht_peer_rx.next().await { diff --git a/crates/librqbit/src/spawn_utils.rs b/crates/librqbit/src/spawn_utils.rs index 8a52faeb..e108254c 100644 --- a/crates/librqbit/src/spawn_utils.rs +++ b/crates/librqbit/src/spawn_utils.rs @@ -1,22 +1,22 @@ -use std::fmt::Display; +use tracing::{debug, error, trace, Instrument}; -use log::{debug, error}; - -pub fn spawn( - name: N, +pub fn spawn( + span: tracing::Span, fut: impl std::future::Future> + Send + 'static, ) { - debug!("starting task \"{}\"", &name); - tokio::spawn(async move { + let fut = async move { + trace!("started"); match fut.await { Ok(_) => { - debug!("task \"{}\" finished", &name); + debug!("finished"); } Err(e) => { - error!("error in task \"{}\": {:#}", &name, e) + error!("{:#}", e) } } - }); + } + .instrument(span.or_current()); + tokio::spawn(fut); } #[derive(Clone, Copy, Debug)] @@ -38,3 +38,14 @@ impl BlockingSpawner { f() } } + +impl Default for BlockingSpawner { + fn default() -> Self { + let allow_block_in_place = match tokio::runtime::Handle::current().runtime_flavor() { + tokio::runtime::RuntimeFlavor::CurrentThread => false, + tokio::runtime::RuntimeFlavor::MultiThread => true, + _ => true, + }; + Self::new(allow_block_in_place) + } +} diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index d3826e8d..c8157d4f 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -14,11 +14,11 @@ use librqbit_core::{ id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator, torrent_metainfo::TorrentMetaV1Info, }; -use log::{debug, info, warn}; use parking_lot::Mutex; use reqwest::Url; use sha1w::Sha1; use size_format::SizeFormatterBinary as SF; +use tracing::{debug, info, span, warn, Level}; use crate::{ chunk_tracker::ChunkTracker, @@ -116,9 +116,10 @@ impl TorrentManagerHandle { pub fn add_tracker(&self, url: Url) -> bool { let mgr = self.manager.clone(); if mgr.trackers.lock().insert(url.clone()) { - spawn(format!("tracker monitor {url}"), async move { - mgr.single_tracker_monitor(url).await - }); + spawn( + span!(Level::ERROR, "tracker_monitor", url = url.to_string()), + async move { mgr.single_tracker_monitor(url).await }, + ); true } else { false @@ -289,12 +290,12 @@ impl TorrentManager { options, }); - spawn("speed estimator updater", { + spawn(span!(Level::ERROR, "speed_estimator_updater"), { let state = mgr.state.clone(); async move { loop { let stats = state.stats_snapshot(); - let fetched = state.stats_snapshot().fetched_bytes; + let fetched = stats.fetched_bytes; let needed = state.initially_needed(); // fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64. let remaining = needed diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 2bb39a68..bb46f9fa 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -1,5 +1,46 @@ +// The main logic of rqbit is here - connecting to peers, reading and writing messages +// to them, tracking peer state etc. +// +// ## Architecture +// There are many tasks cooperating to download the torrent. Tasks communicate both with message passing +// and shared memory. +// +// ### Shared locked state +// Shared state is access by almost all actors through RwLocks. +// +// There's one source of truth (TorrentStateLocked) for which chunks we have, need, and what peers are we waiting them from. +// +// Peer states that are important to the outsiders (tasks other than manage_peer) are in a sharded hash-map (DashMap) +// +// ### Tasks (actors) +// Peer adder task: +// - spawns new peers as they become known. It pulls them from a queue. The queue is filled in by DHT and torrent trackers. +// Also gets updated when peers are reconnecting after errors. +// +// Each peer has at least 2 tasks: +// - "manage_peer" - this talks to the peer over network and calls callbacks on PeerHandler. The callbacks are not async, +// and are supposed to finish quickly (apart from writing to disk, which is accounted for as "spawn_blocking"). +// - "peer_chunk_requester" - this continuously sends requests for chunks to the peer. +// it MAY steal chunks/pieces from other peers, which +// +// ## Peer lifecycle +// State transitions: +// - queued (initial state) -> connected +// - connected -> live +// - ANY STATE -> dead (on error) +// - ANY STATE -> not_needed (when we don't need to talk to the peer anymore) +// +// When the peer dies, it's rescheduled with exponential backoff. +// +// > NOTE: deadlock notice: +// > peers and stateLocked are behind 2 different locks. +// > if you lock them in different order, this may deadlock. +// > +// > so don't lock them both at the same time at all, or at the worst lock them in the +// > same order (peers one first, then the global one). + use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, fs::File, net::SocketAddr, path::PathBuf, @@ -10,17 +51,18 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::Context; +use anyhow::{bail, Context}; +use backoff::backoff::Backoff; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; +use dashmap::DashMap; use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::{ id20::Id20, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, torrent_metainfo::TorrentMetaV1Info, }; -use log::{debug, info, trace, warn}; -use parking_lot::{Mutex, RwLock, RwLockReadGuard}; +use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request, }; @@ -33,6 +75,7 @@ use tokio::{ }, time::timeout, }; +use tracing::{debug, error, info, span, trace, warn, Level}; use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker}, @@ -40,7 +83,10 @@ use crate::{ peer_connection::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, - peer_state::{InflightRequest, LivePeerState, PeerState}, + peer_state::{ + atomic_inc, AggregatePeerStatsAtomic, InflightRequest, LivePeerState, Peer, PeerRx, + PeerState, PeerTx, SendMany, + }, spawn_utils::{spawn, BlockingSpawner}, type_aliases::{PeerHandle, BF}, }; @@ -52,123 +98,149 @@ pub struct InflightPiece { #[derive(Default)] pub struct PeerStates { - states: HashMap, - seen: HashSet, - inflight_pieces: HashMap, - tx: HashMap>>, + stats: AggregatePeerStatsAtomic, + states: DashMap, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Serialize, PartialEq, Eq)] pub struct AggregatePeerStats { pub queued: usize, pub connecting: usize, pub live: usize, pub seen: usize, + pub dead: usize, + pub not_needed: usize, +} + +impl<'a> From<&'a AggregatePeerStatsAtomic> for AggregatePeerStats { + fn from(s: &'a AggregatePeerStatsAtomic) -> Self { + let ordering = Ordering::Relaxed; + Self { + queued: s.queued.load(ordering) as usize, + connecting: s.connecting.load(ordering) as usize, + live: s.live.load(ordering) as usize, + seen: s.seen.load(ordering) as usize, + dead: s.dead.load(ordering) as usize, + not_needed: s.not_needed.load(ordering) as usize, + } + } } impl PeerStates { pub fn stats(&self) -> AggregatePeerStats { - let mut stats = self - .states - .values() - .fold(AggregatePeerStats::default(), |mut s, p| { - match p { - PeerState::Connecting => s.connecting += 1, - PeerState::Live(_) => s.live += 1, - PeerState::Queued => s.queued += 1, - }; - s - }); - stats.seen = self.seen.len(); - stats + AggregatePeerStats::from(&self.stats) } - pub fn add_if_not_seen( - &mut self, - addr: SocketAddr, - tx: UnboundedSender, - ) -> Option { - if self.seen.contains(&addr) { - return None; + + pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option { + use dashmap::mapref::entry::Entry; + match self.states.entry(addr) { + Entry::Occupied(_) => None, + Entry::Vacant(vac) => { + vac.insert(Default::default()); + atomic_inc(&self.stats.queued); + atomic_inc(&self.stats.seen); + Some(addr) + } } - let handle = self.add(addr, tx)?; - self.seen.insert(addr); - Some(handle) } - pub fn seen(&self) -> &HashSet { - &self.seen + pub fn with_peer(&self, addr: PeerHandle, f: impl FnOnce(&Peer) -> R) -> Option { + self.states.get(&addr).map(|e| f(e.value())) } - pub fn get_live(&self, handle: PeerHandle) -> Option<&LivePeerState> { - if let PeerState::Live(ref l) = self.states.get(&handle)? { - return Some(l); - } - None + + pub fn with_peer_mut( + &self, + addr: PeerHandle, + reason: &'static str, + f: impl FnOnce(&mut Peer) -> R, + ) -> Option { + timeit(reason, || self.states.get_mut(&addr)) + .map(|e| f(TimedExistence::new(e, reason).value_mut())) } - pub fn get_live_mut(&mut self, handle: PeerHandle) -> Option<&mut LivePeerState> { - if let PeerState::Live(ref mut l) = self.states.get_mut(&handle)? { - return Some(l); - } - None + pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { + self.states + .get(&addr) + .and_then(|e| match &e.value().state.get() { + PeerState::Live(l) => Some(f(l)), + _ => None, + }) } - pub fn try_get_live_mut(&mut self, handle: PeerHandle) -> anyhow::Result<&mut LivePeerState> { - self.get_live_mut(handle) - .ok_or_else(|| anyhow::anyhow!("peer dropped")) + pub fn with_live_mut( + &self, + addr: PeerHandle, + reason: &'static str, + f: impl FnOnce(&mut LivePeerState) -> R, + ) -> Option { + self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f)) + .flatten() } - pub fn add( - &mut self, - addr: SocketAddr, - tx: UnboundedSender, - ) -> Option { - let handle = addr; - if self.states.contains_key(&addr) { - return None; - } - self.states.insert(handle, PeerState::Queued); - self.tx.insert(handle, Arc::new(tx)); - Some(handle) - } - pub fn drop_peer(&mut self, handle: PeerHandle) -> Option { - let result = self.states.remove(&handle); - self.tx.remove(&handle); - result - } - pub fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option { - let live = self.get_live_mut(handle)?; - let prev = live.i_am_choked; - live.i_am_choked = is_choked; - Some(prev) + + pub fn mark_peer_dead(&self, handle: PeerHandle) -> Option> { + let prev = self.with_peer_mut(handle, "mark_peer_dead", |peer| { + peer.state.to_dead(&self.stats) + })?; + Some(prev.take_live_no_counters()) } - pub fn mark_peer_interested( - &mut self, - handle: PeerHandle, - is_interested: bool, - ) -> Option { - let live = self.get_live_mut(handle)?; - let prev = live.peer_interested; - live.peer_interested = is_interested; - Some(prev) + pub fn drop_peer(&self, handle: PeerHandle) -> Option { + let p = self.states.remove(&handle).map(|r| r.1)?; + self.stats.dec(p.state.get()); + Some(p) } - pub fn update_bitfield_from_vec( - &mut self, - handle: PeerHandle, - bitfield: Vec, - ) -> Option> { - let live = self.get_live_mut(handle)?; - let bitfield = BF::from_vec(bitfield); - let prev = live.bitfield.take(); - live.bitfield = Some(bitfield); - Some(prev) + pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option { + self.with_live_mut(handle, "mark_i_am_choked", |live| { + let prev = live.i_am_choked; + live.i_am_choked = is_choked; + prev + }) } - pub fn clone_tx(&self, handle: PeerHandle) -> Option>> { - Some(self.tx.get(&handle)?.clone()) + pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option { + self.with_live_mut(handle, "mark_peer_interested", |live| { + let prev = live.peer_interested; + live.peer_interested = is_interested; + prev + }) } - pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option { - self.inflight_pieces.remove(&piece) + pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec) -> Option<()> { + self.with_live_mut(handle, "update_bitfield_from_vec", |live| { + live.previously_requested_pieces = BF::from_vec(vec![0; bitfield.len()]); + live.bitfield = BF::from_vec(bitfield); + }) + } + pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result { + let rx = self + .with_peer_mut(h, "mark_peer_connecting", |peer| { + peer.state + .queued_to_connecting(&self.stats) + .context("invalid peer state") + }) + .context("peer not found in states")??; + Ok(rx) + } + + pub fn clone_tx(&self, handle: PeerHandle) -> Option { + self.with_live(handle, |live| live.tx.clone()) + } + + fn reset_peer_backoff(&self, handle: PeerHandle) { + self.with_peer_mut(handle, "reset_peer_backoff", |p| { + p.stats.backoff.reset(); + }); + } + + fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { + let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { + peer.state.to_not_needed(&self.stats) + })?; + Some(prev) } } pub struct TorrentStateLocked { - pub peers: PeerStates, + // What chunks we have and need. pub chunks: ChunkTracker, + + // At a moment in time, we are expecting a piece from only one peer. + // inflight_pieces stores this information. + pub inflight_pieces: HashMap, } #[derive(Default, Debug)] @@ -203,13 +275,10 @@ pub struct StatsSnapshot { pub initially_needed_bytes: u64, pub remaining_bytes: u64, pub total_bytes: u64, - pub live_peers: u32, - pub seen_peers: u32, - pub connecting_peers: u32, #[serde(skip)] pub time: Instant, - pub queued_peers: u32, - total_piece_download_ms: u64, + pub total_piece_download_ms: u64, + pub peer_stats: AggregatePeerStats, } impl StatsSnapshot { @@ -230,6 +299,7 @@ pub struct TorrentStateOptions { } pub struct TorrentState { + peers: PeerStates, info: TorrentMetaV1Info, locked: Arc>, files: Vec>>, @@ -242,12 +312,114 @@ pub struct TorrentState { stats: AtomicStats, options: TorrentStateOptions, + // Limits how many active (occupying network resources) peers there are at a moment in time. peer_semaphore: Semaphore, - peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver)>, + + // The queue for peer manager to connect to them. + peer_queue_tx: UnboundedSender, finished_notify: Notify, } +// Used during debugging to see if some locks take too long. +#[cfg(not(feature = "timed_existence"))] +mod timed_existence { + use std::ops::{Deref, DerefMut}; + + pub struct TimedExistence(T); + + impl TimedExistence { + #[inline(always)] + pub fn new(object: T, _reason: &'static str) -> Self { + Self(object) + } + } + + impl Deref for TimedExistence { + type Target = T; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl DerefMut for TimedExistence { + #[inline(always)] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } + } + + #[inline(always)] + pub fn timeit(_n: impl std::fmt::Display, f: impl FnOnce() -> R) -> R { + f() + } +} + +#[cfg(feature = "timed_existence")] +mod timed_existence { + use std::ops::{Deref, DerefMut}; + use std::time::{Duration, Instant}; + use tracing::warn; + + const MAX: Duration = Duration::from_millis(1); + + // Prints if the object exists for too long. + // This is used to track long-lived locks for debugging. + pub struct TimedExistence { + object: T, + reason: &'static str, + started: Instant, + } + + impl TimedExistence { + pub fn new(object: T, reason: &'static str) -> Self { + Self { + object, + reason, + started: Instant::now(), + } + } + } + + impl Drop for TimedExistence { + fn drop(&mut self) { + let elapsed = self.started.elapsed(); + let reason = self.reason; + if elapsed > MAX { + warn!("elapsed on lock {reason:?}: {elapsed:?}") + } + } + } + + impl Deref for TimedExistence { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.object + } + } + + impl DerefMut for TimedExistence { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.object + } + } + + pub fn timeit(name: impl std::fmt::Display, f: impl FnOnce() -> R) -> R { + let now = Instant::now(); + let r = f(); + let elapsed = now.elapsed(); + if elapsed > MAX { + warn!("elapsed on \"{name:}\": {elapsed:?}") + } + r + } +} + +pub use timed_existence::{timeit, TimedExistence}; + impl TorrentState { #[allow(clippy::too_many_arguments)] pub fn new( @@ -264,14 +436,15 @@ impl TorrentState { options: Option, ) -> Arc { let options = options.unwrap_or_default(); - let (peer_queue_tx, mut peer_queue_rx) = unbounded_channel(); + let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let state = Arc::new(TorrentState { info_hash, info, peer_id, + peers: Default::default(), locked: Arc::new(RwLock::new(TorrentStateLocked { - peers: Default::default(), chunks: chunk_tracker, + inflight_pieces: Default::default(), })), files, filenames, @@ -288,55 +461,80 @@ impl TorrentState { peer_queue_tx, finished_notify: Notify::new(), }); - spawn("peer adder", { - let state = state.clone(); - async move { - loop { - let (addr, out_rx) = peer_queue_rx.recv().await.unwrap(); + spawn( + span!(Level::ERROR, "peer_adder"), + state.clone().task_peer_adder(peer_queue_rx, spawner), + ); + state + } - let permit = state.peer_semaphore.acquire().await.unwrap(); - match state.locked.write().peers.states.get_mut(&addr) { - Some(s @ PeerState::Queued) => *s = PeerState::Connecting, - s => { - warn!("did not expect to see the peer in state {:?}", s); - continue; - } - }; + pub async fn task_manage_peer( + self: Arc, + addr: SocketAddr, + spawner: BlockingSpawner, + ) -> anyhow::Result<()> { + let state = self; + let rx = state.peers.mark_peer_connecting(addr)?; + + let handler = PeerHandler { + addr, + state: state.clone(), + spawner, + }; + let options = PeerConnectionOptions { + connect_timeout: state.options.peer_connect_timeout, + read_write_timeout: state.options.peer_read_write_timeout, + ..Default::default() + }; + let peer_connection = PeerConnection::new( + addr, + state.info_hash, + state.peer_id, + handler, + Some(options), + spawner, + ); - let handler = PeerHandler { - addr, - state: state.clone(), - spawner, - }; - let options = PeerConnectionOptions { - connect_timeout: state.options.peer_connect_timeout, - read_write_timeout: state.options.peer_read_write_timeout, - ..Default::default() - }; - let peer_connection = PeerConnection::new( - addr, - state.info_hash, - state.peer_id, - handler, - Some(options), - spawner, - ); + let res = peer_connection.manage_peer(rx).await; + let state = peer_connection.into_handler().state; + state.peer_semaphore.add_permits(1); - permit.forget(); - spawn(format!("manage_peer({addr})"), async move { - if let Err(e) = peer_connection.manage_peer(out_rx).await { - debug!("error managing peer {}: {:#}", addr, e) - }; - let state = peer_connection.into_handler().state; - state.drop_peer(addr); - state.peer_semaphore.add_permits(1); - Ok::<_, anyhow::Error>(()) - }); - } + match res { + // We disconnected the peer ourselves as we don't need it + Ok(()) => { + state.on_peer_died(addr, None); } - }); - state + Err(e) => { + debug!("error managing peer: {:#}", e); + state.on_peer_died(addr, Some(e)); + } + } + Ok::<_, anyhow::Error>(()) } + + pub async fn task_peer_adder( + self: Arc, + mut peer_queue_rx: UnboundedReceiver, + spawner: BlockingSpawner, + ) -> anyhow::Result<()> { + let state = self; + loop { + let addr = peer_queue_rx.recv().await.unwrap(); + if state.is_finished() { + debug!("ignoring peer {} as we are finished", addr); + state.peers.mark_peer_not_needed(addr); + continue; + } + + let permit = state.peer_semaphore.acquire().await.unwrap(); + permit.forget(); + spawn( + span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()), + state.clone().task_manage_peer(addr, spawner), + ); + } + } + pub fn info(&self) -> &TorrentMetaV1Info { &self.info } @@ -352,57 +550,71 @@ impl TorrentState { pub fn initially_needed(&self) -> u64 { self.needed } - pub fn lock_read(&self) -> RwLockReadGuard { - self.locked.read() + pub fn lock_read( + &self, + reason: &'static str, + ) -> TimedExistence> { + TimedExistence::new(timeit(reason, || self.locked.read()), reason) + } + pub fn lock_write( + &self, + reason: &'static str, + ) -> TimedExistence> { + TimedExistence::new(timeit(reason, || self.locked.write()), reason) } fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { - let g = self.locked.read(); - let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; - for n in g.chunks.iter_needed_pieces() { - if bf.get(n).map(|v| *v) == Some(true) { - // in theory it should be safe without validation, but whatever. - return self.lengths.validate_piece_index(n as u32); - } - } - None + self.peers + .with_live_mut(peer_handle, "l(get_next_needed_piece)", |live| { + let g = self.lock_read("g(get_next_needed_piece)"); + let bf = &live.bitfield; + for n in g.chunks.iter_needed_pieces() { + if bf.get(n).map(|v| *v) == Some(true) { + // in theory it should be safe without validation, but whatever. + return self.lengths.validate_piece_index(n as u32); + } + } + None + })? } fn am_i_choked(&self, peer_handle: PeerHandle) -> Option { - self.locked - .read() - .peers - .get_live(peer_handle) - .map(|l| l.i_am_choked) + self.peers.with_live(peer_handle, |l| l.i_am_choked) } fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { - if self.am_i_choked(peer_handle)? { - debug!("we are choked by {}, can't reserve next piece", peer_handle); - return None; - } - let mut g = self.locked.write(); - let n = { - let mut n_opt = None; - let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; - for n in g.chunks.iter_needed_pieces() { - if bf.get(n).map(|v| *v) == Some(true) { - n_opt = Some(n); - break; + // TODO: locking one inside the other in different order results in deadlocks. + self.peers + .with_live_mut(peer_handle, "reserve_next_needed_piece", |live| { + if live.i_am_choked { + debug!("we are choked, can't reserve next piece"); + return None; } - } + let mut g = self.lock_write("reserve_next_needed_piece"); + + let n = { + let mut n_opt = None; + let bf = &live.bitfield; + for n in g.chunks.iter_needed_pieces() { + if bf.get(n).map(|v| *v) == Some(true) { + n_opt = Some(n); + break; + } + } - self.lengths.validate_piece_index(n_opt? as u32)? - }; - g.peers.inflight_pieces.insert( - n, - InflightPiece { - peer: peer_handle, - started: Instant::now(), - }, - ); - g.chunks.reserve_needed_piece(n); - Some(n) + self.lengths.validate_piece_index(n_opt? as u32)? + }; + g.inflight_pieces.insert( + n, + InflightPiece { + peer: peer_handle, + started: Instant::now(), + }, + ); + g.chunks.reserve_needed_piece(n); + Some(n) + }) + .flatten() } fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { @@ -418,9 +630,8 @@ impl TorrentState { } let avg_time = self.stats.average_piece_download_time()?; - let mut g = self.locked.write(); + let mut g = self.lock_write("try_steal_old_slow_piece"); let (idx, elapsed, piece_req) = g - .peers .inflight_pieces .iter_mut() // don't steal from myself @@ -431,8 +642,8 @@ impl TorrentState { // heuristic for "too slow peer" if elapsed > avg_time * 10 { debug!( - "{} will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}", - handle, idx, piece_req.peer, elapsed, avg_time + "will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}", + idx, piece_req.peer, elapsed, avg_time ); piece_req.peer = handle; piece_req.started = Instant::now(); @@ -441,50 +652,151 @@ impl TorrentState { None } + // NOTE: this doesn't actually "steal" it, but only returns an id we might steal. fn try_steal_piece(&self, handle: PeerHandle) -> Option { let mut rng = rand::thread_rng(); use rand::seq::IteratorRandom; - let g = self.locked.read(); - let pl = g.peers.get_live(handle)?; - g.peers - .inflight_pieces - .keys() - .filter(|p| !pl.inflight_requests.iter().any(|req| req.piece == **p)) - .choose(&mut rng) - .copied() + + self.peers + .with_live(handle, |live| { + let g = self.lock_read("try_steal_piece"); + g.inflight_pieces + .keys() + .filter(|p| { + live.previously_requested_pieces + .get(p.get() as usize) + .map(|r| *r) + == Some(false) + }) + .filter(|p| !live.inflight_requests.iter().any(|req| req.piece == **p)) + .choose(&mut rng) + .copied() + }) + .flatten() } fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { - let mut g = self.locked.write(); - match g.peers.states.get_mut(&handle) { - Some(s @ &mut PeerState::Connecting) => { - *s = PeerState::Live(LivePeerState::new(Id20(h.peer_id))); - } - _ => { - warn!("peer {} was in wrong state", handle); + let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| { + p.state + .connecting_to_live(Id20(h.peer_id), &self.peers.stats) + .is_some() + }); + match result { + Some(true) => { + debug!("set peer to live") } + Some(false) => debug!("can't set peer live, it was in wrong state"), + None => debug!("can't set peer live, it disappeared"), } } - fn drop_peer(&self, handle: PeerHandle) -> bool { - let mut g = self.locked.write(); - let peer = match g.peers.drop_peer(handle) { - Some(peer) => peer, - None => return false, + fn on_peer_died(self: &Arc, handle: PeerHandle, error: Option) { + let mut pe = match self.peers.states.get_mut(&handle) { + Some(peer) => TimedExistence::new(peer, "on_peer_died"), + None => { + warn!("bug: peer not found in table. Forgetting it forever"); + return; + } }; - if let PeerState::Live(l) = peer { - for req in l.inflight_requests { - g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); + let prev = pe.value_mut().state.take(&self.peers.stats); + + match prev { + PeerState::Connecting(_) => {} + PeerState::Live(live) => { + let mut g = self.lock_write("mark_chunk_requests_canceled"); + for req in live.inflight_requests { + debug!( + "peer dead, marking chunk request cancelled, index={}, chunk={}", + req.piece.get(), + req.chunk + ); + g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); + } + } + PeerState::NotNeeded => { + // Restore it as std::mem::take() replaced it above. + pe.value_mut() + .state + .set(PeerState::NotNeeded, &self.peers.stats); + return; + } + s @ PeerState::Queued | s @ PeerState::Dead => { + warn!("bug: peer was in a wrong state {s:?}, ignoring it forever"); + // Prevent deadlocks. + drop(pe); + self.peers.drop_peer(handle); + return; } + }; + + if error.is_none() { + debug!("peer died without errors, not re-queueing"); + pe.value_mut() + .state + .set(PeerState::NotNeeded, &self.peers.stats); + return; + } + + if self.is_finished() { + debug!("torrent finished, not re-queueing"); + pe.value_mut() + .state + .set(PeerState::NotNeeded, &self.peers.stats); + return; + } + + pe.value_mut().state.set(PeerState::Dead, &self.peers.stats); + let backoff = pe.value_mut().stats.backoff.next_backoff(); + + // Prevent deadlocks. + drop(pe); + + if let Some(dur) = backoff { + let state = self.clone(); + spawn( + span!( + parent: None, + Level::ERROR, + "wait_for_peer", + peer = handle.to_string(), + duration = format!("{dur:?}") + ), + async move { + tokio::time::sleep(dur).await; + state + .peers + .with_peer_mut(handle, "dead_to_queued", |peer| { + match peer.state.get() { + PeerState::Dead => { + peer.state.set(PeerState::Queued, &state.peers.stats) + } + other => bail!( + "peer is in unexpected state: {}. Expected dead", + other.name() + ), + }; + Ok(()) + }) + .context("bug: peer disappeared")??; + state.peer_queue_tx.send(handle)?; + Ok::<_, anyhow::Error>(()) + }, + ); + } else { + debug!("dropping peer, backoff exhausted"); + self.peers.drop_peer(handle); } - true } pub fn get_uploaded(&self) -> u64 { self.stats.uploaded.load(Ordering::Relaxed) } pub fn get_downloaded(&self) -> u64 { - self.stats.downloaded_and_checked.load(Ordering::Relaxed) + self.stats.downloaded_and_checked.load(Ordering::Acquire) + } + + pub fn is_finished(&self) -> bool { + self.get_left_to_download() == 0 } pub fn get_left_to_download(&self) -> u64 { @@ -494,9 +806,8 @@ impl TorrentState { fn maybe_transmit_haves(&self, index: ValidPieceIndex) { let mut futures = Vec::new(); - let g = self.locked.read(); - for (handle, peer_state) in g.peers.states.iter() { - match peer_state { + for pe in self.peers.states.iter() { + match &pe.value().state.get() { PeerState::Live(live) => { if !live.peer_interested { continue; @@ -504,18 +815,14 @@ impl TorrentState { if live .bitfield - .as_ref() - .and_then(|b| b.get(index.get() as usize).map(|v| *v)) + .get(index.get() as usize) + .map(|v| *v) .unwrap_or(false) { continue; } - let tx = match g.peers.tx.get(handle) { - Some(tx) => tx, - None => continue, - }; - let tx = Arc::downgrade(tx); + let tx = live.tx.downgrade(); futures.push(async move { if let Some(tx) = tx.upgrade() { if tx @@ -538,7 +845,12 @@ impl TorrentState { let mut unordered: FuturesUnordered<_> = futures.into_iter().collect(); spawn( - format!("transmit_haves(piece={}, count={})", index, unordered.len()), + span!( + Level::ERROR, + "transmit_haves", + piece = index.get(), + count = unordered.len() + ), async move { while unordered.next().await.is_some() {} Ok(()) @@ -547,29 +859,17 @@ impl TorrentState { } pub fn add_peer_if_not_seen(self: &Arc, addr: SocketAddr) -> bool { - let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::(); - match self.locked.write().peers.add_if_not_seen(addr, out_tx) { + match self.peers.add_if_not_seen(addr) { Some(handle) => handle, None => return false, }; - match self.peer_queue_tx.send((addr, out_rx)) { - Ok(_) => {} - Err(_) => { - warn!("peer adder died, can't add peer") - } - } + self.peer_queue_tx.send(addr).unwrap(); true } - pub fn peer_stats_snapshot(&self) -> AggregatePeerStats { - self.locked.read().peers.stats() - } - pub fn stats_snapshot(&self) -> StatsSnapshot { - let g = self.locked.read(); use Ordering::*; - let peer_stats = g.peers.stats(); let downloaded = self.stats.downloaded_and_checked.load(Relaxed); let remaining = self.needed - downloaded; StatsSnapshot { @@ -579,19 +879,16 @@ impl TorrentState { fetched_bytes: self.stats.fetched_bytes.load(Relaxed), uploaded_bytes: self.stats.uploaded.load(Relaxed), total_bytes: self.have_plus_needed, - live_peers: peer_stats.live as u32, - seen_peers: g.peers.seen.len() as u32, - connecting_peers: peer_stats.connecting as u32, time: Instant::now(), initially_needed_bytes: self.needed, remaining_bytes: remaining, - queued_peers: peer_stats.queued as u32, total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), + peer_stats: self.peers.stats(), } } pub async fn wait_until_completed(&self) { - if self.get_left_to_download() == 0 { + if self.is_finished() { return; } self.finished_notify.notified().await; @@ -610,32 +907,28 @@ impl PeerConnectionHandler for PeerHandler { match message { Message::Request(request) => { self.on_download_request(self.addr, request) - .with_context(|| { - format!("error handling download request from {}", self.addr) - })?; + .context("on_download_request")?; } - Message::Bitfield(b) => self.on_bitfield(self.addr, b.clone_to_owned())?, + Message::Bitfield(b) => self + .on_bitfield(self.addr, b.clone_to_owned()) + .context("on_bitfield")?, Message::Choke => self.on_i_am_choked(self.addr), Message::Unchoke => self.on_i_am_unchoked(self.addr), Message::Interested => self.on_peer_interested(self.addr), - Message::Piece(piece) => { - self.on_received_piece(self.addr, piece) - .context("error in on_received_piece()")?; - } + Message::Piece(piece) => self + .on_received_piece(self.addr, piece) + .context("on_received_piece")?, Message::KeepAlive => { - debug!("keepalive received from {}", self.addr); + debug!("keepalive received"); } Message::Have(h) => self.on_have(self.addr, h), Message::NotInterested => { info!("received \"not interested\", but we don't care yet") } message => { - warn!( - "{}: received unsupported message {:?}, ignoring", - self.addr, message - ); + warn!("received unsupported message {:?}, ignoring", message); } - } + }; Ok(()) } @@ -644,10 +937,10 @@ impl PeerConnectionHandler for PeerHandler { } fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> Option { - let g = self.state.locked.read(); + let g = self.state.lock_read("serialize_bitfield_message_to_buf"); let msg = Message::Bitfield(ByteBuf(g.chunks.get_have_pieces().as_raw_slice())); let len = msg.serialize(buf, None).unwrap(); - debug!("sending to {}: {:?}, length={}", self.addr, &msg, len); + debug!("sending: {:?}, length={}", &msg, len); Some(len) } @@ -678,8 +971,8 @@ impl PeerHandler { Some(p) => p, None => { anyhow::bail!( - "{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.", - peer_handle, request + "received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.", + request ); } }; @@ -691,124 +984,111 @@ impl PeerHandler { Some(d) => d, None => { anyhow::bail!( - "{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.", - peer_handle, request + "received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.", + request ); } }; let tx = { - let g = self.state.locked.read(); - if !g.chunks.is_chunk_ready_to_upload(&chunk_info) { + if !self + .state + .lock_read("is_chunk_ready_to_upload") + .chunks + .is_chunk_ready_to_upload(&chunk_info) + { anyhow::bail!( "got request for a chunk that is not ready to upload. chunk {:?}", &chunk_info ); } - g.peers.clone_tx(peer_handle).ok_or_else(|| { - anyhow::anyhow!( - "peer {} died, dropping chunk that it requested", - peer_handle - ) - })? + self.state + .peers + .clone_tx(peer_handle) + .context("peer died, dropping chunk that it requested")? }; // TODO: this is not super efficient as it does copying multiple times. // Theoretically, this could be done in the sending code, so that it reads straight into // the send buffer. let request = WriterRequest::ReadChunkRequest(chunk_info); - debug!("sending to {}: {:?}", peer_handle, &request); + debug!("sending {:?}", &request); Ok::<_, anyhow::Error>(tx.send(request)?) } fn on_have(&self, handle: PeerHandle, have: u32) { - if let Some(bitfield) = self - .state - .locked - .write() - .peers - .get_live_mut(handle) - .and_then(|l| l.bitfield.as_mut()) - { - debug!("{}: updated bitfield with have={}", handle, have); - bitfield.set(have as usize, true) - } + self.state.peers.with_live_mut(handle, "on_have", |live| { + live.bitfield.set(have as usize, true); + debug!("updated bitfield with have={}", have); + }); } fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> { if bitfield.len() != self.state.lengths.piece_bitfield_bytes() { anyhow::bail!( - "dropping {} as its bitfield has unexpected size. Got {}, expected {}", - handle, + "dropping peer as its bitfield has unexpected size. Got {}, expected {}", bitfield.len(), self.state.lengths.piece_bitfield_bytes(), ); } self.state - .locked - .write() .peers .update_bitfield_from_vec(handle, bitfield.0); if !self.state.am_i_interested_in_peer(handle) { - let tx = self - .state - .locked - .read() - .peers - .clone_tx(handle) - .ok_or_else(|| anyhow::anyhow!("peer closed"))?; - tx.send(WriterRequest::Message(MessageOwned::Unchoke)) - .context("peer dropped")?; - tx.send(WriterRequest::Message(MessageOwned::NotInterested)) - .context("peer dropped")?; + let tx = self.state.peers.clone_tx(handle).context("peer dropped")?; + tx.send(WriterRequest::Message(MessageOwned::Unchoke))?; + tx.send(WriterRequest::Message(MessageOwned::NotInterested))?; + if self.state.is_finished() { + tx.send(WriterRequest::Disconnect)?; + } return Ok(()); } // Additional spawn per peer, not good. spawn( - format!("peer_chunk_requester({handle})"), + span!( + parent: None, + Level::ERROR, + "peer_chunk_requester", + peer = handle.to_string() + ), self.clone().task_peer_chunk_requester(handle), ); Ok(()) } async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> { - let tx = match self.state.locked.read().peers.clone_tx(handle) { + let tx = match self.state.peers.clone_tx(handle) { Some(tx) => tx, None => return Ok(()), }; - tx.send(WriterRequest::Message(MessageOwned::Unchoke)) - .context("peer dropped")?; - tx.send(WriterRequest::Message(MessageOwned::Interested)) - .context("peer dropped")?; - + tx.send_many([ + WriterRequest::Message(MessageOwned::Unchoke), + WriterRequest::Message(MessageOwned::Interested), + ])?; self.requester(handle).await?; Ok::<_, anyhow::Error>(()) } fn on_i_am_choked(&self, handle: PeerHandle) { - debug!("we are choked by {}", handle); - self.state - .locked - .write() - .peers - .mark_i_am_choked(handle, true); + debug!("we are choked"); + self.state.peers.mark_i_am_choked(handle, true); } fn on_peer_interested(&self, handle: PeerHandle) { - debug!("peer {} is interested", handle); - self.state - .locked - .write() - .peers - .mark_peer_interested(handle, true); + debug!("peer is interested"); + self.state.peers.mark_peer_interested(handle, true); } async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { - let notify = match self.state.locked.read().peers.get_live(handle) { - Some(l) => l.have_notify.clone(), + let notify = match self + .state + .peers + .with_live(handle, |l| l.have_notify.clone()) + { + Some(notify) => notify, None => return Ok(()), }; @@ -821,7 +1101,7 @@ impl PeerHandler { loop { match self.state.am_i_choked(handle) { Some(true) => { - debug!("we are choked by {}, can't reserve next piece", handle); + debug!("we are choked, can't reserve next piece"); #[allow(unused_must_use)] { timeout(Duration::from_secs(60), notify.notified()).await; @@ -832,21 +1112,22 @@ impl PeerHandler { None => return Ok(()), } + // Try steal a pice from a very slow peer first. let next = match self.state.try_steal_old_slow_piece(handle) { Some(next) => next, None => match self.state.reserve_next_needed_piece(handle) { Some(next) => next, None => { - if self.state.get_left_to_download() == 0 { - debug!("{}: nothing left to download, closing requester", handle); + if self.state.is_finished() { + debug!("nothing left to download, closing requester"); return Ok(()); } if let Some(piece) = self.state.try_steal_piece(handle) { - debug!("{}: stole a piece {}", handle, piece); + debug!("stole a piece {}", piece); piece } else { - debug!("no pieces to request from {}", handle); + debug!("no pieces to request"); #[allow(unused_must_use)] { timeout(Duration::from_secs(60), notify.notified()).await; @@ -857,43 +1138,59 @@ impl PeerHandler { }, }; - let tx = match self.state.locked.read().peers.clone_tx(handle) { - Some(tx) => tx, - None => return Ok(()), - }; - let sem = match self.state.locked.read().peers.get_live(handle) { - Some(live) => live.requests_sem.clone(), - None => return Ok(()), - }; - for chunk in self.state.lengths.iter_chunk_infos(next) { - if self.state.locked.read().chunks.is_chunk_downloaded(&chunk) { - continue; - } - if !self + let (tx, sem) = + match self .state - .locked - .write() .peers - .try_get_live_mut(handle)? - .inflight_requests - .insert(InflightRequest::from(&chunk)) - { - warn!( - "{}: probably a bug, we already requested {:?}", - handle, chunk - ); - continue; - } + .with_live_mut(handle, "peer_setup_for_piece_request", |l| { + l.previously_requested_pieces.set(next.get() as usize, true); + (l.tx.clone(), l.requests_sem.clone()) + }) { + Some(res) => res, + None => return Ok(()), + }; + for chunk in self.state.lengths.iter_chunk_infos(next) { let request = Request { index: next.get(), begin: chunk.offset, length: chunk.size, }; - sem.acquire().await?.forget(); - tx.send(WriterRequest::Message(MessageOwned::Request(request))) - .context("peer dropped")?; + match self + .state + .peers + .with_live_mut(handle, "add chunk request", |live| { + live.inflight_requests.insert(InflightRequest::from(&chunk)) + }) { + Some(true) => {} + Some(false) => { + // This request was already in-flight for this peer for this chunk. + // This might happen in theory, but not very likely. + // + // Example: + // someone stole a piece from us, and then died, the piece became "needed" again, and we reserved it + // all before the piece request was processed by us. + warn!("we already requested {:?} previously", chunk); + continue; + } + // peer died + None => return Ok(()), + }; + + loop { + match timeout(Duration::from_secs(10), sem.acquire()).await { + Ok(acq) => break acq?.forget(), + Err(_) => continue, + }; + } + + if tx + .send(WriterRequest::Message(MessageOwned::Request(request))) + .is_err() + { + return Ok(()); + } } } } @@ -911,6 +1208,9 @@ impl PeerHandler { .with_context(|| format!("error opening {}", DEVNULL)) } + // Lock exclusive just in case to ensure in-flight operations finish.?? + let _guard = self.state.lock_write("reopen_read_only"); + for (file, filename) in self.state.files.iter().zip(self.state.filenames.iter()) { let mut g = file.lock(); // this should close the original file @@ -924,19 +1224,19 @@ impl PeerHandler { .with_context(|| format!("error re-opening {:?} readonly", filename))?; debug!("reopened {:?} read-only", filename); } + info!("reopened all torrent files in read-only mode"); Ok(()) } fn on_i_am_unchoked(&self, handle: PeerHandle) { - debug!("we are unchoked by {}", handle); - let mut g = self.state.locked.write(); - let live = match g.peers.get_live_mut(handle) { - Some(live) => live, - None => return, - }; - live.i_am_choked = false; - live.have_notify.notify_waiters(); - live.requests_sem.add_permits(16); + debug!("we are unchoked"); + self.state + .peers + .with_live_mut(handle, "on_i_am_unchoked", |live| { + live.i_am_choked = false; + live.have_notify.notify_waiters(); + live.requests_sem.add_permits(16); + }); } fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> anyhow::Result<()> { @@ -947,63 +1247,82 @@ impl PeerHandler { ) { Some(i) => i, None => { - anyhow::bail!( - "peer {} sent us a piece that is invalid {:?}", - handle, - &piece, - ); + anyhow::bail!("peer sent us an invalid piece {:?}", &piece,); } }; - let mut g = self.state.locked.write(); - let h = g.peers.try_get_live_mut(handle)?; - h.requests_sem.add_permits(1); - self.state - .stats - .fetched_bytes - .fetch_add(piece.block.len() as u64, Ordering::Relaxed); + .peers + .with_live_mut(handle, "inflight_requests.remove", |h| { + h.requests_sem.add_permits(1); - if !h - .inflight_requests - .remove(&InflightRequest::from(&chunk_info)) - { - anyhow::bail!( - "peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.inflight_requests, &piece, - ); - } + self.state + .stats + .fetched_bytes + .fetch_add(piece.block.len() as u64, Ordering::Relaxed); - let full_piece_download_time = match g.chunks.mark_chunk_downloaded(&piece) { - Some(ChunkMarkingResult::Completed) => { - debug!( - "piece={} done by {}, will write and checksum", - piece.index, handle - ); - // This will prevent others from stealing it. - g.peers - .remove_inflight_piece(chunk_info.piece_index) + if !h + .inflight_requests + .remove(&InflightRequest::from(&chunk_info)) + { + anyhow::bail!( + "peer sent us a piece we did not ask. Requested pieces: {:?}. Got: {:?}", + &h.inflight_requests, + &piece, + ); + } + Ok(()) + }) + .context("peer not found")??; + + let full_piece_download_time = { + let mut g = self.state.lock_write("mark_chunk_downloaded"); + + match g.inflight_pieces.get(&chunk_info.piece_index) { + Some(InflightPiece { peer, .. }) if *peer == handle => {} + Some(InflightPiece { peer, .. }) => { + debug!( + "in-flight piece {} was stolen by {}, ignoring", + chunk_info.piece_index, peer + ); + return Ok(()); + } + None => { + debug!( + "in-flight piece {} not found. it was probably completed by someone else", + chunk_info.piece_index + ); + return Ok(()); + } + }; + + match g.chunks.mark_chunk_downloaded(&piece) { + Some(ChunkMarkingResult::Completed) => { + debug!("piece={} done, will write and checksum", piece.index,); + // This will prevent others from stealing it. + { + let piece = chunk_info.piece_index; + g.inflight_pieces.remove(&piece) + } .map(|t| t.started.elapsed()) - } - Some(ChunkMarkingResult::PreviouslyCompleted) => { - // TODO: we might need to send cancellations here. - debug!( - "piece={} was done by someone else {}, ignoring", - piece.index, handle - ); - return Ok(()); - } - Some(ChunkMarkingResult::NotCompleted) => None, - None => { - anyhow::bail!( - "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", - handle, - piece - ); + } + Some(ChunkMarkingResult::PreviouslyCompleted) => { + // TODO: we might need to send cancellations here. + debug!("piece={} was done by someone else, ignoring", piece.index,); + return Ok(()); + } + Some(ChunkMarkingResult::NotCompleted) => None, + None => { + anyhow::bail!( + "bogus data received: {:?}, cannot map this to a chunk, dropping peer", + piece + ); + } } }; - // to prevent deadlocks. - drop(g); + // By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would + // have fallen off above in one of the defensive checks. self.spawner .spawn_block_in_place(move || { @@ -1013,10 +1332,17 @@ impl PeerHandler { // should we really do? If we unmark it, it will get requested forever... // // So let's just unwrap and abort. - self.state + match self + .state .file_ops() .write_chunk(handle, &piece, &chunk_info) - .expect("expected to be able to write to disk"); + { + Ok(()) => {} + Err(e) => { + error!("FATAL: error writing chunk to disk: {:?}", e); + panic!("{:?}", e); + } + } let full_piece_download_time = match full_piece_download_time { Some(t) => t, @@ -1035,7 +1361,9 @@ impl PeerHandler { self.state .stats .downloaded_and_checked - .fetch_add(piece_len, Ordering::Relaxed); + // This counter is used to compute "is_finished", so using + // stronger ordering. + .fetch_add(piece_len, Ordering::Release); self.state .stats .have @@ -1052,32 +1380,28 @@ impl PeerHandler { full_piece_download_time.as_millis() as u64, Ordering::Relaxed, ); - self.state - .locked - .write() - .chunks - .mark_piece_downloaded(chunk_info.piece_index); + { + let mut g = self.state.lock_write("mark_piece_downloaded"); + g.chunks.mark_piece_downloaded(chunk_info.piece_index); + } - debug!( - "piece={} successfully downloaded and verified from {}", - index, handle - ); + self.state.peers.reset_peer_backoff(handle); + + debug!("piece={} successfully downloaded and verified", index); - if self.state.get_left_to_download() == 0 { + if self.state.is_finished() { + info!("torrent finished downloading"); self.state.finished_notify.notify_waiters(); + self.disconnect_all_peers_that_have_full_torrent(); self.reopen_read_only()?; } self.state.maybe_transmit_haves(chunk_info.piece_index); } false => { - warn!( - "checksum for piece={} did not validate, came from {}", - index, handle - ); + warn!("checksum for piece={} did not validate", index,); self.state - .locked - .write() + .lock_write("mark_piece_broken") .chunks .mark_piece_broken(chunk_info.piece_index); } @@ -1087,4 +1411,19 @@ impl PeerHandler { .with_context(|| format!("error processing received chunk {chunk_info:?}"))?; Ok(()) } + + fn disconnect_all_peers_that_have_full_torrent(&self) { + for mut pe in self.state.peers.states.iter_mut() { + if let PeerState::Live(l) = pe.value().state.get() { + if l.has_full_torrent(self.state.lengths.total_pieces() as usize) { + let prev = pe.value_mut().state.to_not_needed(&self.state.peers.stats); + let _ = prev + .take_live_no_counters() + .unwrap() + .tx + .send(WriterRequest::Disconnect); + } + } + } + } } diff --git a/crates/librqbit_core/Cargo.toml b/crates/librqbit_core/Cargo.toml index 92139c6b..0c10aba8 100644 --- a/crates/librqbit_core/Cargo.toml +++ b/crates/librqbit_core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit-core" -version = "2.2.2" +version = "3.0.0" edition = "2018" description = "Important utilities used throughout librqbit useful for working with torrents." license = "Apache-2.0" @@ -21,7 +21,6 @@ hex = "0.4" anyhow = "1" url = "2" uuid = {version = "1", features = ["v4"]} -log = "0.4" parking_lot = "0.12" serde = {version = "1", features=["derive"]} buffers = {path="../buffers", package="librqbit-buffers", version = "2.2.1"} diff --git a/crates/peer_binary_protocol/Cargo.toml b/crates/peer_binary_protocol/Cargo.toml index 0beada23..32f32444 100644 --- a/crates/peer_binary_protocol/Cargo.toml +++ b/crates/peer_binary_protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit-peer-protocol" -version = "2.2.2" +version = "3.0.0" edition = "2018" description = "Protocol for working with torrent peers. Used in rqbit torrent client." license = "Apache-2.0" @@ -23,6 +23,6 @@ byteorder = "1" buffers = {path="../buffers", package="librqbit-buffers", version = "2.2.1"} bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} -librqbit-core = {path="../librqbit_core", version = "2.2.2"} +librqbit-core = {path="../librqbit_core", version = "3.0.0"} bitvec = "1" anyhow = "1" \ No newline at end of file diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 0ba8a579..2535ac7b 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -1,3 +1,7 @@ +// BitTorrent peer protocol implementation: parsing, serialization etc. +// +// Can be used outside of librqbit. + pub mod extended; use bincode::Options; diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index ec8c310f..eea95d77 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rqbit" -version = "2.2.2" +version = "3.0.0-beta.0" authors = ["Igor Katson "] edition = "2018" description = "A bittorrent command line client and server." @@ -13,6 +13,7 @@ readme = "README.md" [features] default = ["sha1-system", "default-tls"] +timed_existence = ["librqbit/timed_existence"] sha1-system = ["librqbit/sha1-system"] sha1-openssl = ["librqbit/sha1-openssl"] sha1-rust = ["librqbit/sha1-rust"] @@ -20,16 +21,17 @@ default-tls = ["librqbit/default-tls"] rust-tls = ["librqbit/rust-tls"] [dependencies] -librqbit = {path="../librqbit", default-features=false, version = "2.2.2"} -dht = {path="../dht", package="librqbit-dht", version="2.2.2"} +librqbit = {path="../librqbit", default-features=false, version = "3.0.0-beta.0"} +dht = {path="../dht", package="librqbit-dht", version="3.0.0"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} anyhow = "1" clap = {version = "4", features = ["derive", "deprecated"]} -log = "0.4" -pretty_env_logger = "0.5" +tracing = "0.1" +tracing-subscriber = {version = "0.3", features = ["env-filter"]} regex = "1" futures = "0.3" parse_duration = "2" +parking_lot = {version = "0.12", features = ["deadlock_detection"]} reqwest = "0.11" serde = {version = "1", features=["derive"]} serde_json = "1" diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 4ce72e78..29addef1 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -11,9 +11,10 @@ use librqbit::{ SessionOptions, }, spawn_utils::{spawn, BlockingSpawner}, + torrent_state::timeit, }; -use log::{error, info, warn}; use size_format::SizeFormatterBinary as SF; +use tracing::{error, info, span, warn, Level}; #[derive(Debug, Clone, Copy, ValueEnum)] enum LogLevel { @@ -57,12 +58,12 @@ struct Opts { disable_dht_persistence: bool, /// The connect timeout, e.g. 1s, 1.5s, 100ms etc. - #[arg(long = "peer-connect-timeout", value_parser = parse_duration::parse)] - peer_connect_timeout: Option, + #[arg(long = "peer-connect-timeout", value_parser = parse_duration::parse, default_value="2s")] + peer_connect_timeout: Duration, /// The connect timeout, e.g. 1s, 1.5s, 100ms etc. - #[arg(long = "peer-read-write-timeout" , value_parser = parse_duration::parse)] - peer_read_write_timeout: Option, + #[arg(long = "peer-read-write-timeout" , value_parser = parse_duration::parse, default_value="10s")] + peer_read_write_timeout: Duration, /// How many threads to spawn for the executor. #[arg(short = 't', long)] @@ -151,13 +152,44 @@ fn init_logging(opts: &Opts) { } }; } - pretty_env_logger::init(); + + use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); +} + +fn _start_deadlock_detector_thread() { + use parking_lot::deadlock; + use std::thread; + + // Create a background thread which checks for deadlocks every 10s + thread::spawn(move || loop { + thread::sleep(Duration::from_secs(10)); + let deadlocks = deadlock::check_deadlock(); + if deadlocks.is_empty() { + continue; + } + + println!("{} deadlocks detected", deadlocks.len()); + for (i, threads) in deadlocks.iter().enumerate() { + println!("Deadlock #{}", i); + for t in threads { + println!("Thread Id {:#?}", t.thread_id()); + println!("{:#?}", t.backtrace()); + } + } + std::process::exit(42); + }); } fn main() -> anyhow::Result<()> { let opts = Opts::parse(); init_logging(&opts); + // start_deadlock_detector_thread(); let (mut rt_builder, spawner) = match opts.single_thread_runtime { true => ( @@ -197,8 +229,8 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> dht_config: None, peer_id: None, peer_opts: Some(PeerConnectionOptions { - connect_timeout: opts.peer_connect_timeout, - read_write_timeout: opts.peer_read_write_timeout, + connect_timeout: Some(opts.peer_connect_timeout), + read_write_timeout: Some(opts.peer_read_write_timeout), ..Default::default() }), }; @@ -212,8 +244,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> info!("[{}] initializing", idx); }, ManagedTorrentState::Running(handle) => { - let peer_stats = handle.torrent_state().peer_stats_snapshot(); - let stats = handle.torrent_state().stats_snapshot(); + let stats = timeit("stats_snapshot", || handle.torrent_state().stats_snapshot()); let speed = handle.speed_estimator(); let total = stats.total_bytes; let progress = stats.total_bytes - stats.remaining_bytes; @@ -223,7 +254,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> (progress as f64 / total as f64) * 100f64 }; info!( - "[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", + "[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}, dead: {}}}", idx, downloaded_pct, SF::new(progress), @@ -232,10 +263,11 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> SF::new(stats.remaining_bytes), SF::new(total), SF::new(stats.uploaded_bytes), - peer_stats.live, - peer_stats.connecting, - peer_stats.queued, - peer_stats.seen, + stats.peer_stats.live, + stats.peer_stats.connecting, + stats.peer_stats.queued, + stats.peer_stats.seen, + stats.peer_stats.dead, ); }, } @@ -257,7 +289,10 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> .await .context("error initializing rqbit session")?, ); - spawn("Stats printer", stats_printer(session.clone())); + spawn( + span!(Level::TRACE, "stats_printer"), + stats_printer(session.clone()), + ); let http_api = HttpApi::new(session); let http_api_listen_addr = opts.http_api_listen_addr; http_api @@ -330,11 +365,14 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> .await .context("error initializing rqbit session")?, ); - spawn("Stats printer", stats_printer(session.clone())); + spawn( + span!(Level::TRACE, "stats_printer"), + stats_printer(session.clone()), + ); let http_api = HttpApi::new(session.clone()); let http_api_listen_addr = opts.http_api_listen_addr; spawn( - "HTTP API", + span!(Level::ERROR, "http_api"), http_api.clone().make_http_api_and_run(http_api_listen_addr), ); diff --git a/crates/sha1w/src/lib.rs b/crates/sha1w/src/lib.rs index 036bed9d..116b6f26 100644 --- a/crates/sha1w/src/lib.rs +++ b/crates/sha1w/src/lib.rs @@ -1,8 +1,9 @@ -// Wrapper for sha1 libraries. -// Sha1 computation is the majority of CPU usage of this library. -// openssl seems 2-3x faster, so using it for now, but -// leaving the pure-rust impl here too. Maybe someday make them -// runtime swappable or enabled with a feature. +// Wrapper for sha1 libraries to be able to swap them easily, +// e.g. to measure performance, or change implementations depending on platform. +// +// Sha1 computation is the majority of CPU usage of librqbit. +// openssl is 2-3x faster than rust's sha1. +// system library is the best choice probably (it's the default anyway). #[cfg(feature = "sha1-openssl")] pub type Sha1 = Sha1Openssl;