Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for more than 500 snapshots in a repository #63

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 66 additions & 15 deletions benches/cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::cell::Cell;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use redu::{cache::tests::*, restic::Snapshot};
use redu::{
cache::{tests::*, Migrator},
restic::Snapshot,
};

pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("merge sizetree", |b| {
Expand All @@ -12,10 +15,47 @@ pub fn criterion_benchmark(c: &mut Criterion) {
b.iter(move || sizetree0.take().merge(black_box(sizetree1.take())));
});

c.bench_function("create and save snapshot", |b| {
with_cache_open(|mut cache| {
let foo = Snapshot {
id: "foo".to_string(),
c.bench_function("save snapshot", |b| {
let foo = Snapshot {
id: "foo".to_string(),
time: mk_datetime(2024, 4, 12, 12, 00, 00),
parent: Some("bar".to_string()),
tree: "sometree".to_string(),
paths: vec![
"/home/user".to_string(),
"/etc".to_string(),
"/var".to_string(),
],
hostname: Some("foo.com".to_string()),
username: Some("user".to_string()),
uid: Some(123),
gid: Some(456),
excludes: vec![
".cache".to_string(),
"Cache".to_string(),
"/home/user/Downloads".to_string(),
],
tags: vec!["foo_machine".to_string(), "rewrite".to_string()],
original_id: Some("fefwfwew".to_string()),
program_version: Some("restic 0.16.0".to_string()),
};
b.iter_with_setup(
|| {
let tempfile = Tempfile::new();
let cache =
Migrator::open(&tempfile.0).unwrap().migrate().unwrap();
(tempfile, cache, generate_sizetree(6, 12))
},
|(_tempfile, mut cache, tree)| {
cache.save_snapshot(&foo, tree).unwrap()
},
);
});

c.bench_function("save lots of small snapshots", |b| {
fn mk_snapshot(id: String) -> Snapshot {
Snapshot {
id,
time: mk_datetime(2024, 4, 12, 12, 00, 00),
parent: Some("bar".to_string()),
tree: "sometree".to_string(),
Expand All @@ -36,16 +76,27 @@ pub fn criterion_benchmark(c: &mut Criterion) {
tags: vec!["foo_machine".to_string(), "rewrite".to_string()],
original_id: Some("fefwfwew".to_string()),
program_version: Some("restic 0.16.0".to_string()),
};
b.iter(move || {
cache
.save_snapshot(
&foo,
generate_sizetree(black_box(6), black_box(12)),
)
.unwrap();
});
})
}
}

b.iter_with_setup(
|| {
let tempfile = Tempfile::new();
let cache =
Migrator::open(&tempfile.0).unwrap().migrate().unwrap();
(tempfile, cache, generate_sizetree(1, 0))
},
|(_tempfile, mut cache, tree)| {
for i in 0..10_000 {
cache
.save_snapshot(
&mk_snapshot(i.to_string()),
tree.clone(),
)
.unwrap();
}
},
);
});
}

Expand Down
211 changes: 110 additions & 101 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::{collections::HashSet, path::Path};
use std::{
cmp::{max, Reverse},
collections::{HashMap, HashSet},
path::Path,
};

use camino::{Utf8Path, Utf8PathBuf};
use chrono::{DateTime, Utc};
use log::trace;
use rusqlite::{
functions::FunctionFlags, params, types::FromSqlError, Connection,
OptionalExtension, Row,
OptionalExtension,
};
use thiserror::Error;

Expand Down Expand Up @@ -117,119 +121,124 @@ impl Cache {
Ok(path_id)
}

fn entries_tables(
&self,
) -> Result<impl Iterator<Item = String>, rusqlite::Error> {
Ok(get_tables(&self.conn)?
.into_iter()
.filter(|name| name.starts_with("entries_")))
}

/// This returns the children files/directories of the given path.
/// Each entry's size is the largest size of that file/directory across
/// all snapshots.
pub fn get_entries(
&self,
path_id: Option<PathId>,
) -> Result<Vec<Entry>, rusqlite::Error> {
let aux = |row: &Row| {
Ok(Entry {
path_id: PathId(row.get("path_id")?),
component: row.get("component")?,
size: row.get("size")?,
is_dir: row.get("is_dir")?,
})
};
let raw_path_id = o_path_id_to_raw_u64(path_id);
let cte_stmt_string = get_tables(&self.conn)?
.into_iter()
.filter(|name| name.starts_with("entries_"))
.map(|table| {
format!(
"SELECT \
path_id, \
component, \
size, \
is_dir \
FROM \"{table}\" JOIN paths ON path_id = paths.id \
WHERE parent_id = {raw_path_id}\n"
)
})
.intersperse(String::from(" UNION ALL "))
.collect::<String>();
if cte_stmt_string.is_empty() {
return Ok(vec![]);
let mut entries: Vec<Entry> = Vec::new();
let mut index: HashMap<PathId, usize> = HashMap::new();
for table in self.entries_tables()? {
let stmt_str = format!(
"SELECT \
path_id, \
component, \
size, \
is_dir \
FROM \"{table}\" JOIN paths ON path_id = paths.id \
WHERE parent_id = {raw_path_id}\n",
);
let mut stmt = self.conn.prepare(&stmt_str)?;
let rows = stmt.query_map([], |row| {
Ok(Entry {
path_id: PathId(row.get("path_id")?),
component: row.get("component")?,
size: row.get("size")?,
is_dir: row.get("is_dir")?,
})
})?;
for row in rows {
let row = row?;
let path_id = row.path_id;
match index.get(&path_id) {
None => {
entries.push(row);
index.insert(path_id, entries.len() - 1);
}
Some(i) => {
let entry = &mut entries[*i];
entry.size = max(entry.size, row.size);
entry.is_dir = entry.is_dir || row.is_dir;
}
}
}
}
let mut stmt = self.conn.prepare(&format!(
"WITH rich_entries AS ({cte_stmt_string}) \
SELECT \
path_id, \
component, \
max(size) as size, \
max(is_dir) as is_dir \
FROM rich_entries \
GROUP BY path_id \
ORDER BY size DESC",
))?;
let rows = stmt.query_map([], aux)?;
rows.collect()
entries.sort_by_key(|e| Reverse(e.size));
Ok(entries)
}

pub fn get_entry_details(
&self,
path_id: PathId,
) -> Result<EntryDetails, Error> {
let aux = |row: &Row| -> Result<EntryDetails, Error> {
Ok(EntryDetails {
max_size: row.get("max_size")?,
max_size_snapshot_hash: row.get("max_size_snapshot_hash")?,
first_seen: timestamp_to_datetime(row.get("first_seen")?)?,
first_seen_snapshot_hash: row
.get("first_seen_snapshot_hash")?,
last_seen: timestamp_to_datetime(row.get("last_seen")?)?,
last_seen_snapshot_hash: row.get("last_seen_snapshot_hash")?,
})
};
) -> Result<Option<EntryDetails>, Error> {
let raw_path_id = path_id.0;
let rich_entries_cte = get_tables(&self.conn)?
.iter()
.filter_map(|name| name.strip_prefix("entries_"))
.map(|snapshot_hash| {
format!(
let run_query =
|table: &str| -> Result<(String, usize, DateTime<Utc>), Error> {
let snapshot_hash = table.strip_prefix("entries_").unwrap();
let stmt_str = format!(
"SELECT \
hash, \
size, \
time \
FROM \"entries_{snapshot_hash}\" \
JOIN paths ON path_id = paths.id \
JOIN snapshots ON hash = '{snapshot_hash}' \
WHERE path_id = {raw_path_id}\n"
)
})
.intersperse(String::from(" UNION ALL "))
.collect::<String>();
let query = format!(
"WITH \
rich_entries AS ({rich_entries_cte}), \
first_seen AS (
SELECT hash, time
FROM rich_entries
ORDER BY time ASC
LIMIT 1), \
last_seen AS (
SELECT hash, time
FROM rich_entries
ORDER BY time DESC
LIMIT 1), \
max_size AS (
SELECT hash, size
FROM rich_entries
ORDER BY size DESC, time DESC
LIMIT 1) \
SELECT \
max_size.size AS max_size, \
max_size.hash AS max_size_snapshot_hash, \
first_seen.time AS first_seen, \
first_seen.hash as first_seen_snapshot_hash, \
last_seen.time AS last_seen, \
last_seen.hash as last_seen_snapshot_hash \
FROM max_size
JOIN first_seen ON 1=1
JOIN last_seen ON 1=1"
);
self.conn.query_row_and_then(&query, [], aux)
hash, \
size, \
time \
FROM \"{table}\" \
JOIN paths ON path_id = paths.id \
JOIN snapshots ON hash = '{snapshot_hash}' \
WHERE path_id = {raw_path_id}\n"
);
let mut stmt = self.conn.prepare(&stmt_str)?;
let (hash, size, timestamp) = stmt.query_row([], |row| {
Ok((row.get("hash")?, row.get("size")?, row.get("time")?))
})?;
let time = timestamp_to_datetime(timestamp)?;
Ok((hash, size, time))
};

let mut entries_tables = self.entries_tables()?;
let mut details = match entries_tables.next() {
None => return Ok(None),
Some(table) => {
let (hash, size, time) = run_query(&table)?;
EntryDetails {
max_size: size,
max_size_snapshot_hash: hash.clone(),
first_seen: time,
first_seen_snapshot_hash: hash.clone(),
last_seen: time,
last_seen_snapshot_hash: hash,
}
}
};
let mut max_size_time = details.first_seen; // Time of the max_size snapshot
for table in entries_tables {
let (hash, size, time) = run_query(&table)?;
if size > details.max_size
|| (size == details.max_size && time > max_size_time)
{
details.max_size = size;
details.max_size_snapshot_hash = hash.clone();
max_size_time = time;
}
if time < details.first_seen {
details.first_seen = time;
details.first_seen_snapshot_hash = hash.clone();
}
if time > details.last_seen {
details.last_seen = time;
details.last_seen_snapshot_hash = hash;
}
}
Ok(Some(details))
}

pub fn save_snapshot(
Expand Down Expand Up @@ -383,7 +392,7 @@ impl Cache {

// A PathId should never be 0.
// This is reserved for the absolute root and should match None
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct PathId(u64);

Expand All @@ -407,7 +416,7 @@ pub struct Entry {
pub is_dir: bool,
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct EntryDetails {
pub max_size: usize,
pub max_size_snapshot_hash: String,
Expand Down
Loading