Skip to content

Commit

Permalink
fix: imex::import_backup: Unpack all blobs before importing a db (#4307)
Browse files Browse the repository at this point in the history
This way we can't get an account with missing blobs if there's not enough disk space.

Also delete already unpacked files if all files weren't unpacked successfully. Still, there are some
minor problems remaining:
- If a db wasn't imported successfully, unpacked blobs aren't deleted because we don't know at which
  step the import failed and whether the db will reference the blobs after a restart.
- If `delete_and_reset_all_device_msgs()` fails, the whole `import_backup()` fails also, but after a
  restart delete_and_reset_all_device_msgs() isn't retried. Probably errors from it should be
  ignored at all.
  • Loading branch information
iequidoo committed Jul 22, 2024
1 parent c596ee0 commit 1b1d21c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 40 deletions.
108 changes: 68 additions & 40 deletions src/imex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ async fn import_backup(
context.get_dbfile().display()
);

import_backup_stream(context, backup_file, file_size, passphrase).await?;
import_backup_stream(context, backup_file, file_size, passphrase)
.await
.0?;
Ok(())
}

Expand All @@ -291,63 +293,89 @@ pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>(
backup_file: R,
file_size: u64,
passphrase: String,
) -> Result<()> {
) -> (Result<()>,) {
let mut archive = Archive::new(backup_file);

let mut entries = archive.entries()?;

let mut entries = match archive.entries() {
Ok(entries) => entries,
Err(e) => return (Err(e).context("Failed to get archive entries"),),
};
let mut blobs = Vec::new();
// We already emitted ImexProgress(10) above
let mut last_progress = 10;
const PROGRESS_MIGRATIONS: u64 = 999;
let mut total_size = 0;
while let Some(mut f) = entries
.try_next()
.await
.context("Failed to get next entry")?
{
total_size += f.header().entry_size()?;
let mut res: Result<()> = loop {
let mut f = match entries.try_next().await {
Ok(Some(f)) => f,
Ok(None) => break Ok(()),
Err(e) => break Err(e).context("Failed to get next entry"),
};
total_size += match f.header().entry_size() {
Ok(size) => size,
Err(e) => break Err(e).context("Failed to get entry size"),
};
let progress = std::cmp::min(
1000 * total_size.checked_div(file_size).unwrap_or_default(),
999,
PROGRESS_MIGRATIONS - 1,
);
if progress > last_progress {
context.emit_event(EventType::ImexProgress(progress as usize));
last_progress = progress;
}

if f.path()?.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) {
// async_tar can't unpack to a specified file name, so we just unpack to the blobdir and then move the unpacked file.
f.unpack_in(context.get_blobdir())
.await
.context("Failed to unpack database")?;
let unpacked_database = context.get_blobdir().join(DBFILE_BACKUP_NAME);
context
.sql
.import(&unpacked_database, passphrase.clone())
.await
.context("cannot import unpacked database")?;
fs::remove_file(unpacked_database)
.await
.context("cannot remove unpacked database")?;
} else {
// async_tar will unpack to blobdir/BLOBS_BACKUP_NAME, so we move the file afterwards.
f.unpack_in(context.get_blobdir())
.await
.context("Failed to unpack blob")?;
let from_path = context.get_blobdir().join(f.path()?);
if from_path.is_file() {
if let Some(name) = from_path.file_name() {
fs::rename(&from_path, context.get_blobdir().join(name)).await?;
} else {
warn!(context, "No file name");
let path = match f.path() {
Ok(path) => path.to_path_buf(),
Err(e) => break Err(e).context("Failed to get entry path"),
};
if let Err(e) = f.unpack_in(context.get_blobdir()).await {
break Err(e).context("Failed to unpack file");
}
if path.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) {
continue;
}
// async_tar unpacked to $BLOBDIR/BLOBS_BACKUP_NAME/, so we move the file afterwards.
let from_path = context.get_blobdir().join(&path);
if from_path.is_file() {
if let Some(name) = from_path.file_name() {
let to_path = context.get_blobdir().join(name);
if let Err(e) = fs::rename(&from_path, &to_path).await {
blobs.push(from_path);
break Err(e).context("Failed to move file to blobdir");
}
blobs.push(to_path);
} else {
warn!(context, "No file name");
}
}
};
if res.is_err() {
for blob in blobs {
fs::remove_file(&blob).await.log_err(context).ok();
}
}

context.sql.run_migrations(context).await?;
delete_and_reset_all_device_msgs(context).await?;

Ok(())
let unpacked_database = context.get_blobdir().join(DBFILE_BACKUP_NAME);
if res.is_ok() {
res = context
.sql
.import(&unpacked_database, passphrase.clone())
.await
.context("cannot import unpacked database");
}
fs::remove_file(unpacked_database)
.await
.context("cannot remove unpacked database")
.log_err(context)
.ok();
if res.is_ok() {
context.emit_event(EventType::ImexProgress(PROGRESS_MIGRATIONS as usize));
res = context.sql.run_migrations(context).await;
}
if res.is_ok() {
res = delete_and_reset_all_device_msgs(context).await;
}
(res,)
}

/*******************************************************************************
Expand Down
1 change: 1 addition & 0 deletions src/imex/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ pub async fn get_backup2(
let file_size = u64::from_be_bytes(file_size_buf);
import_backup_stream(context, recv_stream, file_size, passphrase)
.await
.0
.context("Failed to import backup from QUIC stream")?;
info!(context, "Finished importing backup from the stream.");
context.emit_event(EventType::ImexProgress(1000));
Expand Down

0 comments on commit 1b1d21c

Please sign in to comment.