diff --git a/src/main.rs b/src/main.rs index b952eda..927b720 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ use crossterm::terminal::{ use crossterm::ExecutableCommand; use directories::ProjectDirs; use flexi_logger::{FileSpec, LogSpecification, Logger, WriteMode}; -use indicatif::{ProgressBar, ProgressStyle}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use log::{error, info, trace}; use ratatui::backend::{Backend, CrosstermBackend}; use ratatui::layout::Size; @@ -132,7 +132,7 @@ fn main() -> anyhow::Result<()> { Logger::with(spec) .log_to_file(filespec) .write_mode(WriteMode::BufferAndFlush) - .format(flexi_logger::detailed_format) + .format(flexi_logger::with_thread) .start()? }; @@ -144,7 +144,7 @@ fn main() -> anyhow::Result<()> { let mut cache = { // Get config to determine repo id and open cache - let pb = new_pb("Getting restic config {spinner}"); + let pb = new_pb(" {spinner} Getting restic config"); let repo_id = restic.config()?.id; pb.finish(); @@ -271,7 +271,7 @@ fn sync_snapshots( // Figure out what snapshots we need to fetch let missing_snapshots: Vec> = { // Fetch snapshot list - let pb = new_pb("Fetching repository snapshot list {spinner}"); + let pb = new_pb(" {spinner} Fetching repository snapshot list"); let repo_snapshots = restic .snapshots()? .into_iter() @@ -287,8 +287,8 @@ fn sync_snapshots( .map(|snapshot_id| cache.get_snapshot_group(snapshot_id)) .collect::, rusqlite::Error>>()?; if groups_to_delete.len() > 0 { - eprintln!("Need to delete {} groups", groups_to_delete.len()); - let pb = new_pb("{wide_bar} [{pos}/{len}] {spinner}"); + eprintln!("Need to delete {} group(s)", groups_to_delete.len()); + let pb = new_pb(" {spinner} {wide_bar} [{pos}/{len}]"); pb.set_length(groups_to_delete.len() as u64); for group in groups_to_delete { cache.delete_group(group)?; @@ -312,23 +312,24 @@ fn sync_snapshots( n => n, }; - eprintln!("Fetching {} snapshots", total_missing_snapshots); - let missing_queue = Queue::new(missing_snapshots); // Create progress indicators - let pb = new_pb("{wide_bar} [{pos}/{len}] {msg} {spinner}"); + let mpb = MultiProgress::new(); + let pb = mpb_add(&mpb, " {spinner} {prefix} {wide_bar} [{pos}/{len}] "); + pb.set_prefix("Fetching snapshots"); pb.set_length(total_missing_snapshots as u64); - let speed = { - let pb = pb.clone(); - Speed::new(move |v| { - let mut msg = humansize::format_size_i(v, humansize::BINARY); - msg.push_str("/s"); - pb.set_message(format!("({msg:>12})")) - }) - }; + + const SHOULD_QUIT_POLL_PERIOD: Duration = Duration::from_millis(500); thread::scope(|scope| { + macro_rules! spawn { + ($name_fmt:literal, $scope:expr, $thunk:expr) => { + thread::Builder::new() + .name(format!($name_fmt)) + .spawn_scoped($scope, $thunk)? + }; + } let mut handles: Vec>> = Vec::new(); // The threads periodically poll this to see if they should @@ -340,17 +341,17 @@ fn sync_snapshots( mpsc::sync_channel::<(Box, FileTree)>(2); // Start fetching threads - for _ in 0..fetching_thread_count { + for i in 0..fetching_thread_count { let missing_queue = missing_queue.clone(); let snapshot_sender = snapshot_sender.clone(); - let speed = speed.clone(); + let mpb = &mpb; let should_quit = should_quit.clone(); - handles.push(scope.spawn(move || { + handles.push(spawn!("fetching-{i}", &scope, move || { fetching_thread_body( restic, missing_queue, + mpb, snapshot_sender, - speed, should_quit.clone(), ) .inspect_err(|_| should_quit.store(true, Ordering::SeqCst)) @@ -368,13 +369,14 @@ fn sync_snapshots( // Start grouping thread handles.push({ let should_quit = should_quit.clone(); - scope.spawn(move || { + spawn!("grouping", &scope, move || { grouping_thread_body( group_size, snapshot_receiver, group_sender, pb, should_quit.clone(), + SHOULD_QUIT_POLL_PERIOD, ) .inspect_err(|_| should_quit.store(true, Ordering::SeqCst)) .map_err(anyhow::Error::from) @@ -384,10 +386,15 @@ fn sync_snapshots( // Start DB thread handles.push({ let should_quit = should_quit.clone(); - scope.spawn(move || { - db_thread_body(cache, group_receiver, should_quit.clone()) - .inspect_err(|_| should_quit.store(true, Ordering::SeqCst)) - .map_err(anyhow::Error::from) + spawn!("db", &scope, move || { + db_thread_body( + cache, + group_receiver, + should_quit.clone(), + SHOULD_QUIT_POLL_PERIOD, + ) + .inspect_err(|_| should_quit.store(true, Ordering::SeqCst)) + .map_err(anyhow::Error::from) }) }); @@ -412,45 +419,53 @@ enum FetchingThreadError { fn fetching_thread_body( restic: &Restic, missing_queue: Queue>, + mpb: &MultiProgress, snapshot_sender: mpsc::SyncSender<(Box, FileTree)>, - mut speed: Speed, should_quit: Arc, ) -> Result<(), FetchingThreadError> { - defer! { trace!("(fetching-thread) terminated") } - trace!("(fetching-thread) started"); + defer! { trace!("terminated") } + trace!("started"); while let Some(snapshot) = missing_queue.pop() { let short_id = snapshot_short_id(&snapshot); + let pb = mpb_add(mpb, " {spinner} fetching {prefix}: starting up") + .with_prefix(short_id.clone()); let mut filetree = FileTree::new(); let files = restic.ls(&snapshot)?; - trace!("(fetching-thread) started fetching snapshot ({short_id})"); + trace!("started fetching snapshot ({short_id})"); let start = Instant::now(); for r in files { if should_quit.load(Ordering::SeqCst) { return Ok(()); } - let (file, bytes_read) = r?; - speed.inc(bytes_read); + let (file, _bytes_read) = r?; filetree .insert(&file.path, file.size) .expect("repeated entry in restic snapshot ls"); + if pb.position() == 0 { + pb.set_style(new_style( + " {spinner} fetching {prefix}: {pos} file(s)", + )); + } + pb.inc(1); } + pb.finish_and_clear(); + mpb.remove(&pb); info!( - "(fetching-thread) snapshot fetched in {}s ({short_id})", + "snapshot fetched in {}s ({short_id})", start.elapsed().as_secs_f64() ); - trace!("(fetching-thread) got snapshot, sending ({short_id})"); + trace!("got snapshot, sending ({short_id})"); if should_quit.load(Ordering::SeqCst) { return Ok(()); } let start = Instant::now(); snapshot_sender.send((snapshot.clone(), filetree)).unwrap(); info!( - "(fetching-thread) waited {}s to send snapshot ({short_id})", + "waited {}s to send snapshot ({short_id})", start.elapsed().as_secs_f64() ); - trace!("(fetching-thread) snapshot sent ({short_id})"); + trace!("snapshot sent ({short_id})"); } - speed.stop(); Ok(()) } @@ -466,65 +481,66 @@ fn grouping_thread_body( group_sender: mpsc::SyncSender, pb: ProgressBar, should_quit: Arc, + should_quit_poll_period: Duration, ) -> Result<(), GroupingThreadError> { - defer! { trace!("(grouping-thread) terminated") } - trace!("(grouping-thread) started"); + defer! { trace!("terminated") } + trace!("started"); let mut group = SnapshotGroup::new(); loop { - trace!("(grouping-thread) waiting for snapshot"); + trace!("waiting for snapshot"); if should_quit.load(Ordering::SeqCst) { return Ok(()); } let start = Instant::now(); // We wait with timeout to poll the should_quit periodically - match snapshot_receiver.recv_timeout(Duration::from_millis(500)) { + match snapshot_receiver.recv_timeout(should_quit_poll_period) { Ok((snapshot, filetree)) => { let short_id = snapshot_short_id(&snapshot); info!( - "(grouping-thread) waited {}s to get snapshot ({short_id})", + "waited {}s to get snapshot ({short_id})", start.elapsed().as_secs_f64() ); - trace!("(grouping-thread) got snapshot ({short_id})"); + trace!("got snapshot ({short_id})"); if should_quit.load(Ordering::SeqCst) { return Ok(()); } group.add_snapshot(snapshot.clone(), filetree); pb.inc(1); - trace!("(grouping-thread) added snapshot ({short_id})"); + trace!("added snapshot ({short_id})"); if group.count() == group_size { - trace!("(grouping-thread) group is full, sending"); + trace!("group is full, sending"); if should_quit.load(Ordering::SeqCst) { return Ok(()); } let start = Instant::now(); group_sender.send(group).unwrap(); info!( - "(grouping-thread) waited {}s to send group", + "waited {}s to send group", start.elapsed().as_secs_f64() ); - trace!("(grouping-thread) sent group"); + trace!("sent group"); group = SnapshotGroup::new(); } } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => { - trace!("(grouping-thread) loop done"); + trace!("loop done"); break; } } } if group.count() > 0 { - trace!("(grouping-thread) sending leftover group"); + trace!("sending leftover group"); if should_quit.load(Ordering::SeqCst) { return Ok(()); } let start = Instant::now(); group_sender.send(group).unwrap(); info!( - "(grouping-thread) waited {}s to send leftover group", + "waited {}s to send leftover group", start.elapsed().as_secs_f64() ); - trace!("(grouping-thread) sent leftover group"); + trace!("sent leftover group"); } pb.finish_with_message("Done"); Ok(()) @@ -540,23 +556,21 @@ fn db_thread_body( cache: &mut Cache, group_receiver: mpsc::Receiver, should_quit: Arc, + should_quit_poll_period: Duration, ) -> Result<(), DBThreadError> { - defer! { trace!("(db-thread) terminated") } - trace!("(db-thread) started"); + defer! { trace!("terminated") } + trace!("started"); loop { - trace!("(db-thread) waiting for group"); + trace!("waiting for group"); if should_quit.load(Ordering::SeqCst) { return Ok(()); } let start = Instant::now(); // We wait with timeout to poll the should_quit periodically - match group_receiver.recv_timeout(Duration::from_millis(500)) { + match group_receiver.recv_timeout(should_quit_poll_period) { Ok(group) => { - info!( - "(db-thread) waited {}s to get group", - start.elapsed().as_secs_f64() - ); - trace!("(db-thread) got group, saving"); + info!("waited {}s to get group", start.elapsed().as_secs_f64()); + trace!("got group, saving"); if should_quit.load(Ordering::SeqCst) { return Ok(()); } @@ -565,14 +579,14 @@ fn db_thread_body( .save_snapshot_group(group) .expect("unable to save snapshot group"); info!( - "(db-thread) waited {}s to save group", + "waited {}s to save group", start.elapsed().as_secs_f64() ); - trace!("(db-thread) group saved"); + trace!("group saved"); } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => { - trace!("(db-thread) loop done"); + trace!("loop done"); break Ok(()); } } @@ -631,70 +645,35 @@ fn render<'a>( } /// Util /////////////////////////////////////////////////////////////////////// - -/// Track the speed of something in units/sec -/// Periodically calls a callback with the current speed -/// Users are expected to call the `inc` method to add units -#[derive(Clone)] -struct Speed { - state: Arc>, -} - -struct SpeedState { - should_quit: bool, - count: usize, - previous: f64, +fn new_style(template: &str) -> ProgressStyle { + let frames = &[ + "(● )", + "( ● )", + "( ● )", + "( ● )", + "( ● )", + "( ● )", + "( ● )", + "(● )", + "(● )", + ]; + ProgressStyle::with_template(template).unwrap().tick_strings(frames) } -impl Speed { - pub fn new(mut cb: impl FnMut(f64) + Send + 'static) -> Self { - const WINDOW_MILLIS: u64 = 300; - const ALPHA: f64 = 0.3; - - let state = Arc::new(Mutex::new(SpeedState { - should_quit: false, - count: 0, - previous: 0.0, - })); - thread::spawn({ - let state = Arc::downgrade(&state); - move || { - while let Some(state) = state.upgrade() { - let value = { - let SpeedState { should_quit, count, previous } = - &mut *state.lock().unwrap(); - if *should_quit { - break; - } - let current = - *count as f64 / (WINDOW_MILLIS as f64 / 1000.0); - *count = 0; - let value = - (ALPHA * current) + ((1.0 - ALPHA) * *previous); - *previous = current; - value - }; - cb(value); - thread::sleep(Duration::from_millis(WINDOW_MILLIS)); - } - } - }); - Speed { state } - } - - pub fn inc(&self, delta: usize) { - self.state.lock().unwrap().count += delta; - } +const PB_TICK_INTERVAL: Duration = Duration::from_millis(100); - pub fn stop(&mut self) { - self.state.lock().unwrap().should_quit = true; - } +fn new_pb(template: &str) -> ProgressBar { + let pb = ProgressBar::new_spinner().with_style(new_style(template)); + pb.enable_steady_tick(PB_TICK_INTERVAL); + pb } -pub fn new_pb(style: &str) -> ProgressBar { - let pb = ProgressBar::new_spinner() - .with_style(ProgressStyle::with_template(style).unwrap()); - pb.enable_steady_tick(Duration::from_millis(500)); +// This is necessary to avoid some weird redraws that happen +// when enabling the tick thread before adding to the MultiProgress. +fn mpb_add(mpb: &MultiProgress, template: &str) -> ProgressBar { + let pb = + mpb.add(ProgressBar::new_spinner().with_style(new_style(template))); + pb.enable_steady_tick(PB_TICK_INTERVAL); pb } diff --git a/src/restic.rs b/src/restic.rs index 8244b5a..8a081f5 100644 --- a/src/restic.rs +++ b/src/restic.rs @@ -7,6 +7,8 @@ use std::process::{Child, ChildStdout, Command, ExitStatusError, Stdio}; use std::str::Utf8Error; use camino::Utf8PathBuf; +use log::info; +use scopeguard::defer; use serde::de::DeserializeOwned; use serde::Deserialize; use serde_json::Value; @@ -156,6 +158,8 @@ impl Restic { A: AsRef, { let child = self.run_command(args)?; + let id = child.id(); + defer! { info!("finished pid {}", id); } let output = child.wait_with_output().map_err(|e| Error { kind: ErrorKind::Run(RunError::Io(e)), stderr: None, @@ -195,18 +199,21 @@ impl Restic { } cmd.arg("--json"); cmd.args(args); - Ok(cmd + let child = cmd .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() - .map_err(LaunchError)?) + .map_err(LaunchError)?; + info!("running \"{cmd:?}\" (pid {})", child.id()); + Ok(child) } } struct Iter { child: Child, lines: Lines>, + finished: bool, _phantom_data: PhantomData, } @@ -216,6 +223,7 @@ impl Iter { Iter { child, lines: BufReader::new(stdout).lines(), + finished: false, _phantom_data: PhantomData::default(), } } @@ -230,6 +238,12 @@ impl Iter { Ok(_) => Err(Error { kind, stderr: Some(buf) }), } } + + fn finish(&mut self) { + if !self.finished { + info!("finished pid {}", self.child.id()); + } + } } impl Iterator for Iter { @@ -243,10 +257,14 @@ impl Iterator for Iter { (value, line.len()) }; Some(match r_value { - Err(kind) => self.read_stderr(kind), + Err(kind) => { + self.finish(); + self.read_stderr(kind) + } Ok(value) => Ok(value), }) } else { + self.finish(); match self.child.wait() { Err(e) => Some(self.read_stderr(ErrorKind::Run(RunError::Io(e)))), diff --git a/src/ui.rs b/src/ui.rs index 30870b9..75a0a46 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -453,7 +453,7 @@ impl WidgetRef for App { fn render_ref(&self, area: Rect, buf: &mut Buffer) { let (header_rect, list_rect, footer_rect) = compute_layout(area); { - // Heading + // Header let mut string = "--- ".to_string(); string.push_str( shorten_to( @@ -461,7 +461,8 @@ impl WidgetRef for App { None => "#", Some(path) => path.as_str(), }, - header_rect.width as usize - string.len(), + max(0, header_rect.width as isize - string.len() as isize) + as usize, ) .as_ref(), );