Skip to content

Commit 616eabc

Browse files
authored
feat: Make the IoPausedGuard a simple sender (#4184)
This replaces the mechanism by which the IoPauseGuard makes sure the IO scheduler is resumed: it really is a drop guard now by sending a single message on drop. This makes it not have to hold on to anything like the context so makes it a lot easier to use. The trade-off is that a long-running task is spawned when the guard is created, this task needs to receive the message from the drop guard in order for the scheduler to resume.
1 parent 89b32e0 commit 616eabc

File tree

3 files changed

+33
-37
lines changed

3 files changed

+33
-37
lines changed

src/imex.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,13 @@ pub async fn imex(
9191
let cancel = context.alloc_ongoing().await?;
9292

9393
let res = {
94-
let mut guard = context.scheduler.pause(context.clone()).await;
95-
let res = imex_inner(context, what, path, passphrase)
94+
let _guard = context.scheduler.pause(context.clone()).await;
95+
imex_inner(context, what, path, passphrase)
9696
.race(async {
9797
cancel.recv().await.ok();
9898
Err(format_err!("canceled"))
9999
})
100-
.await;
101-
guard.resume().await;
102-
res
100+
.await
103101
};
104102
context.free_ongoing().await;
105103

src/imex/transfer.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl BackupProvider {
9191

9292
// Acquire global "ongoing" mutex.
9393
let cancel_token = context.alloc_ongoing().await?;
94-
let mut paused_guard = context.scheduler.pause(context.clone()).await;
94+
let paused_guard = context.scheduler.pause(context.clone()).await;
9595
let context_dir = context
9696
.get_blobdir()
9797
.parent()
@@ -119,7 +119,6 @@ impl BackupProvider {
119119
Ok((provider, ticket)) => (provider, ticket),
120120
Err(err) => {
121121
context.free_ongoing().await;
122-
paused_guard.resume().await;
123122
return Err(err);
124123
}
125124
};
@@ -128,7 +127,9 @@ impl BackupProvider {
128127
tokio::spawn(async move {
129128
let res = Self::watch_provider(&context, provider, cancel_token).await;
130129
context.free_ongoing().await;
131-
paused_guard.resume().await;
130+
131+
// Explicit drop to move the guards into this future
132+
drop(paused_guard);
132133
drop(dbfile);
133134
res
134135
})
@@ -369,7 +370,7 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
369370
!context.is_configured().await?,
370371
"Cannot import backups to accounts in use."
371372
);
372-
let mut guard = context.scheduler.pause(context.clone()).await;
373+
let _guard = context.scheduler.pause(context.clone()).await;
373374

374375
// Acquire global "ongoing" mutex.
375376
let cancel_token = context.alloc_ongoing().await?;
@@ -381,7 +382,6 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
381382
}
382383
_ = cancel_token.recv() => Err(format_err!("cancelled")),
383384
};
384-
guard.resume().await;
385385
res
386386
}
387387

src/scheduler.rs

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use anyhow::{bail, Context as _, Result};
55
use async_channel::{self as channel, Receiver, Sender};
66
use futures::future::try_join_all;
77
use futures_lite::FutureExt;
8-
use tokio::sync::{RwLock, RwLockWriteGuard};
8+
use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
99
use tokio::task;
1010

1111
use self::connectivity::ConnectivityStore;
@@ -89,20 +89,28 @@ impl SchedulerState {
8989

9090
/// Pauses the IO scheduler.
9191
///
92-
/// If it is currently running the scheduler will be stopped. When
93-
/// [`IoPausedGuard::resume`] is called the scheduler is started again.
92+
/// If it is currently running the scheduler will be stopped. When the
93+
/// [`IoPausedGuard`] is dropped the scheduler is started again.
9494
///
9595
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
9696
/// resume will do the right thing and restore the scheduler to the state requested by
9797
/// the last call.
9898
pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard {
99-
let mut inner = self.inner.write().await;
100-
inner.paused = true;
101-
Self::do_stop(inner, &context).await;
102-
IoPausedGuard {
103-
context,
104-
done: false,
99+
{
100+
let mut inner = self.inner.write().await;
101+
inner.paused = true;
102+
Self::do_stop(inner, &context).await;
105103
}
104+
let (tx, rx) = oneshot::channel();
105+
tokio::spawn(async move {
106+
rx.await.ok();
107+
let mut inner = context.scheduler.inner.write().await;
108+
inner.paused = false;
109+
if inner.started && inner.scheduler.is_none() {
110+
SchedulerState::do_start(inner, context.clone()).await;
111+
}
112+
});
113+
IoPausedGuard { sender: Some(tx) }
106114
}
107115

108116
/// Restarts the scheduler, only if it is running.
@@ -194,31 +202,21 @@ struct InnerSchedulerState {
194202
paused: bool,
195203
}
196204

205+
/// Guard to make sure the IO Scheduler is resumed.
206+
///
207+
/// Returned by [`SchedulerState::pause`]. To resume the IO scheduler simply drop this
208+
/// guard.
197209
#[derive(Debug)]
198210
pub(crate) struct IoPausedGuard {
199-
context: Context,
200-
done: bool,
201-
}
202-
203-
impl IoPausedGuard {
204-
pub(crate) async fn resume(&mut self) {
205-
self.done = true;
206-
let mut inner = self.context.scheduler.inner.write().await;
207-
inner.paused = false;
208-
if inner.started && inner.scheduler.is_none() {
209-
SchedulerState::do_start(inner, self.context.clone()).await;
210-
}
211-
}
211+
sender: Option<oneshot::Sender<()>>,
212212
}
213213

214214
impl Drop for IoPausedGuard {
215215
fn drop(&mut self) {
216-
if self.done {
217-
return;
216+
if let Some(sender) = self.sender.take() {
217+
// Can only fail if receiver is dropped, but then we're already resumed.
218+
sender.send(()).ok();
218219
}
219-
220-
// Async .resume() should be called manually due to lack of async drop.
221-
error!(self.context, "Pause guard dropped without resuming.");
222220
}
223221
}
224222

0 commit comments

Comments
 (0)