Skip to content

Commit

Permalink
fix: imex::import_backup: Unpack all blobs before importing a db (#4307)
Browse files Browse the repository at this point in the history
This way we can't get an account with missing blobs if there's not enough disk space.

Also delete already unpacked files if all files weren't unpacked successfully. Still, there are some
minor problems remaining:
- If a db wasn't imported successfully, unpacked blobs aren't deleted because we don't know at which
  step the import failed and whether the db will reference the blobs after a restart.
- If `delete_and_reset_all_device_msgs()` fails, the whole `import_backup()` fails also, but after a
  restart delete_and_reset_all_device_msgs() isn't retried. Probably errors from it should be
  ignored at all, but let's postpone changing this for now.
  • Loading branch information
iequidoo committed Dec 6, 2023
1 parent d574ee4 commit d8ef3c5
Showing 1 changed file with 59 additions and 28 deletions.
87 changes: 59 additions & 28 deletions src/imex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ async fn import_backup(
"cannot import backup, IO is running"
);

const PROGRESS_MIGRATIONS: u64 = 990;
let backup_file = File::open(backup_to_import).await?;
let file_size = backup_file.metadata().await?.len();
info!(
Expand All @@ -434,48 +435,78 @@ async fn import_backup(
let mut archive = Archive::new(backup_file);

let mut entries = archive.entries()?;
let mut blobs = Vec::new();
let mut last_progress = 0;
while let Some(file) = entries.next().await {
let f = &mut file?;
let mut res: Result<()> = loop {
let Some(file) = entries.next().await else {
break Ok(());
};
let mut f = match file {
Ok(f) => f,
Err(e) => break Err(e.into()),
};

let current_pos = f.raw_file_position();
let progress = 1000 * current_pos / file_size;
if progress != last_progress && progress > 10 && progress < 1000 {
let progress = PROGRESS_MIGRATIONS * current_pos / file_size;
if progress != last_progress && progress > 10 && progress < PROGRESS_MIGRATIONS {
// We already emitted ImexProgress(10) above
context.emit_event(EventType::ImexProgress(progress as usize));
last_progress = progress;
}

if f.path()?.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) {
// async_tar can't unpack to a specified file name, so we just unpack to the blobdir and then move the unpacked file.
f.unpack_in(context.get_blobdir()).await?;
let unpacked_database = context.get_blobdir().join(DBFILE_BACKUP_NAME);
context
.sql
.import(&unpacked_database, passphrase.clone())
.await
.context("cannot import unpacked database")?;
fs::remove_file(unpacked_database)
.await
.context("cannot remove unpacked database")?;
} else {
// async_tar will unpack to blobdir/BLOBS_BACKUP_NAME, so we move the file afterwards.
f.unpack_in(context.get_blobdir()).await?;
let from_path = context.get_blobdir().join(f.path()?);
if from_path.is_file() {
if let Some(name) = from_path.file_name() {
fs::rename(&from_path, context.get_blobdir().join(name)).await?;
} else {
warn!(context, "No file name");
if let Err(e) = f.unpack_in(context.get_blobdir()).await {
break Err(e.into());
}
let path = match f.path() {
Ok(path) => path,
Err(e) => break Err(e.into()),
};
if path.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) {
continue;
}
// async_tar unpacked to $BLOBDIR/BLOBS_BACKUP_NAME/, so we move the file afterwards.
let from_path = context.get_blobdir().join(path);
if from_path.is_file() {
if let Some(name) = from_path.file_name() {
let to_path = context.get_blobdir().join(name);
if let Err(e) = fs::rename(&from_path, &to_path).await {
blobs.push(from_path);
break Err(e.into());
}
blobs.push(to_path);
} else {
warn!(context, "No file name");
}
}
};
if res.is_err() {
for blob in blobs {
fs::remove_file(&blob).await.log_err(context).ok();
}
}

context.sql.run_migrations(context).await?;
delete_and_reset_all_device_msgs(context).await?;
let unpacked_database = context.get_blobdir().join(DBFILE_BACKUP_NAME);
if res.is_ok() {
res = context
.sql
.import(&unpacked_database, passphrase.clone())
.await
.context("cannot import unpacked database");
}
fs::remove_file(unpacked_database)
.await
.context("cannot remove unpacked database")
.log_err(context)
.ok();

Ok(())
context.emit_event(EventType::ImexProgress(PROGRESS_MIGRATIONS as usize));
if res.is_ok() {
res = context.sql.run_migrations(context).await;
}
if res.is_ok() {
res = delete_and_reset_all_device_msgs(context).await;
}
res
}

/*******************************************************************************
Expand Down

0 comments on commit d8ef3c5

Please sign in to comment.