Skip to content

feat: Pause IO for BackupProvider #4182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions deltachat-ffi/deltachat.h
Original file line number Diff line number Diff line change
Expand Up @@ -2644,10 +2644,6 @@ void dc_str_unref (char* str);
/**
* Creates an object for sending a backup to another device.
*
* Before calling this function IO must be stopped using dc_accounts_stop_io()
* or dc_stop_io() so that no changes to the blobs or database are happening.
* IO should only be restarted once dc_backup_provider_wait() has returned.
*
* The backup is sent to through a peer-to-peer channel which is bootstrapped
* by a QR-code. The backup contains the entire state of the account
* including credentials. This can be used to setup a new device.
Expand Down Expand Up @@ -2708,9 +2704,7 @@ char* dc_backup_provider_get_qr_svg (const dc_backup_provider_t* backup_provider
/**
* Waits for the sending to finish.
*
* This is a blocking call and should only be called once. Once this function
* returns IO can be started again using dc_accounts_start_io() or
* dc_start_io().
* This is a blocking call and should only be called once.
*
* @memberof dc_backup_provider_t
* @param backup_provider The backup provider object as created by
Expand Down
2 changes: 1 addition & 1 deletion src/imex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub async fn imex(
let cancel = context.alloc_ongoing().await?;

let res = {
let mut guard = context.scheduler.pause(context).await;
let mut guard = context.scheduler.pause(context.clone()).await;
let res = imex_inner(context, what, path, passphrase)
.race(async {
cancel.recv().await.ok();
Expand Down
22 changes: 13 additions & 9 deletions src/imex/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl BackupProvider {

// Acquire global "ongoing" mutex.
let cancel_token = context.alloc_ongoing().await?;
let mut paused_guard = context.scheduler.pause(context.clone()).await;
let context_dir = context
.get_blobdir()
.parent()
Expand Down Expand Up @@ -118,15 +119,19 @@ impl BackupProvider {
Ok((provider, ticket)) => (provider, ticket),
Err(err) => {
context.free_ongoing().await;
paused_guard.resume().await;
return Err(err);
}
};
let handle = tokio::spawn(Self::watch_provider(
context.clone(),
provider,
cancel_token,
dbfile,
));
let handle = {
let context = context.clone();
tokio::spawn(async move {
let res = Self::watch_provider(&context, provider, cancel_token, dbfile).await;
context.free_ongoing().await;
paused_guard.resume().await;
res
})
};
let slf = Self { handle, ticket };
let qr = slf.qr();
*context.export_provider.lock().expect("poisoned lock") = Some(qr);
Expand Down Expand Up @@ -181,7 +186,7 @@ impl BackupProvider {
/// The *cancel_token* is the handle for the ongoing process mutex, when this completes
/// we must cancel this operation.
async fn watch_provider(
context: Context,
context: &Context,
mut provider: Provider,
cancel_token: Receiver<()>,
_dbfile: TempPathGuard,
Expand Down Expand Up @@ -262,7 +267,6 @@ impl BackupProvider {
context.emit_event(SendProgress::Failed.into())
}
}
context.free_ongoing().await;
res
}

Expand Down Expand Up @@ -373,7 +377,7 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
!context.is_configured().await?,
"Cannot import backups to accounts in use."
);
let mut guard = context.scheduler.pause(context).await;
let mut guard = context.scheduler.pause(context.clone()).await;

// Acquire global "ongoing" mutex.
let cancel_token = context.alloc_ongoing().await?;
Expand Down
12 changes: 6 additions & 6 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ impl SchedulerState {
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
/// resume will do the right thing and restore the scheduler to the state requested by
/// the last call.
pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> {
pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard {
let mut inner = self.inner.write().await;
inner.paused = true;
Self::do_stop(inner, context).await;
Self::do_stop(inner, &context).await;
IoPausedGuard {
context,
done: false,
Expand Down Expand Up @@ -195,12 +195,12 @@ struct InnerSchedulerState {
}

#[derive(Debug)]
pub(crate) struct IoPausedGuard<'a> {
context: &'a Context,
pub(crate) struct IoPausedGuard {
context: Context,
done: bool,
}

impl<'a> IoPausedGuard<'a> {
impl IoPausedGuard {
pub(crate) async fn resume(&mut self) {
self.done = true;
let mut inner = self.context.scheduler.inner.write().await;
Expand All @@ -211,7 +211,7 @@ impl<'a> IoPausedGuard<'a> {
}
}

impl<'a> Drop for IoPausedGuard<'a> {
impl Drop for IoPausedGuard {
fn drop(&mut self) {
if self.done {
return;
Expand Down