From 1d42e4743fac7f1e38ca3a2c6f601809c489840f Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 Mar 2023 14:20:42 +0100 Subject: [PATCH 01/12] Allow pausing IO scheduler from inside core To handle backups the UIs have to make sure they do stop the IO scheduler and also don't accidentally restart it while working on it. Since they have to call start_io from a bunch of locations this can be a bit difficult to manage. This introduces a mechanism for the core to pause IO for some time, which is used by the imex function. It interacts well with other calls to dc_start_io() and dc_stop_io() making sure that when resumed the scheduler will be running or not as the latest calls to them. This was a little more invasive then hoped due to the scheduler. The additional abstraction of the scheduler on the context seems a nice improvement though. --- src/accounts.rs | 4 +- src/chat.rs | 22 +++- src/config.rs | 2 +- src/configure.rs | 6 +- src/contact.rs | 5 +- src/context.rs | 51 ++++---- src/ephemeral.rs | 4 +- src/imap.rs | 7 +- src/imex.rs | 22 ++-- src/job.rs | 6 +- src/location.rs | 2 +- src/message.rs | 10 +- src/quota.rs | 4 +- src/scheduler.rs | 227 ++++++++++++++++++++++++++++------ src/scheduler/connectivity.rs | 50 ++------ src/webxdc.rs | 4 +- 16 files changed, 292 insertions(+), 134 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index e1620526d1..a379b44e1b 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -271,14 +271,14 @@ impl Accounts { /// Notifies all accounts that the network may have become available. pub async fn maybe_network(&self) { for account in self.accounts.values() { - account.maybe_network().await; + account.scheduler.maybe_network().await; } } /// Notifies all accounts that the network connection may have been lost. pub async fn maybe_network_lost(&self) { for account in self.accounts.values() { - account.maybe_network_lost().await; + account.scheduler.maybe_network_lost(account).await; } } diff --git a/src/chat.rs b/src/chat.rs index 0442dea3ca..9d49ec5529 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -639,7 +639,10 @@ impl ChatId { context.emit_msgs_changed_without_ids(); context.set_config(Config::LastHousekeeping, None).await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; if chat.is_self_talk() { let mut msg = Message::new(Viewtype::Text); @@ -1667,7 +1670,7 @@ impl Chat { maybe_set_logging_xdc(context, msg, self.id).await?; } - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; Ok(msg.id) } } @@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - context.emit_event(EventType::LocationChanged(Some(ContactId::SELF))); } - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(msg.id) @@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .await?; curr_timestamp += 1; if create_send_msg_job(context, new_msg_id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } created_chats.push(chat_id); @@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg_id: msg.id, }); if create_send_msg_job(context, msg.id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } } diff --git a/src/config.rs b/src/config.rs index 76dca83a2f..035caf3143 100644 --- a/src/config.rs +++ b/src/config.rs @@ -443,7 +443,7 @@ impl Context { Config::DeleteDeviceAfter => { let ret = self.sql.set_raw_config(key.as_ref(), value).await; // Interrupt ephemeral loop to delete old messages immediately. - self.interrupt_ephemeral_task().await; + self.scheduler.interrupt_ephemeral_task().await; ret? } Config::Displayname => { diff --git a/src/configure.rs b/src/configure.rs index 48b91a3f72..eb69ad25cd 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -59,7 +59,7 @@ impl Context { /// Configures this account with the currently set parameters. pub async fn configure(&self) -> Result<()> { ensure!( - self.scheduler.read().await.is_none(), + !self.scheduler.is_running().await, "cannot configure, already running" ); ensure!( @@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { ctx.set_config_bool(Config::FetchedExistingMsgs, false) .await?; - ctx.interrupt_inbox(InterruptInfo::new(false)).await; + ctx.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; progress!(ctx, 940); update_device_chats_handle.await??; diff --git a/src/contact.rs b/src/contact.rs index 1d00a80f00..07430d3082 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen( > 0 && timestamp > time() - SEEN_RECENTLY_SECONDS { - context.interrupt_recently_seen(contact_id, timestamp).await; + context + .scheduler + .interrupt_recently_seen(contact_id, timestamp) + .await; } Ok(()) } diff --git a/src/context.rs b/src/context.rs index 13479a7eac..9c2eb33e2a 100644 --- a/src/context.rs +++ b/src/context.rs @@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; -use crate::scheduler::Scheduler; +use crate::scheduler::{IoPausedGuard, SchedulerState}; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -201,7 +201,7 @@ pub struct InnerContext { pub(crate) translated_stockstrings: StockStrings, pub(crate) events: Events, - pub(crate) scheduler: RwLock>, + pub(crate) scheduler: SchedulerState, pub(crate) ratelimit: RwLock, /// Recently loaded quota information, if any. @@ -370,7 +370,7 @@ impl Context { wrong_pw_warning_mutex: Mutex::new(()), translated_stockstrings: stockstrings, events, - scheduler: RwLock::new(None), + scheduler: SchedulerState::new(), ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds. quota: RwLock::new(None), quota_update_request: AtomicBool::new(false), @@ -395,42 +395,33 @@ impl Context { warn!(self, "can not start io on a context that is not configured"); return; } - - info!(self, "starting IO"); - let mut lock = self.inner.scheduler.write().await; - if lock.is_none() { - match Scheduler::start(self.clone()).await { - Err(err) => error!(self, "Failed to start IO: {:#}", err), - Ok(scheduler) => *lock = Some(scheduler), - } - } + self.scheduler.start(self.clone()).await; } /// Stops the IO scheduler. pub async fn stop_io(&self) { - // Sending an event wakes up event pollers (get_next_event) - // so the caller of stop_io() can arrange for proper termination. - // For this, the caller needs to instruct the event poller - // to terminate on receiving the next event and then call stop_io() - // which will emit the below event(s) - info!(self, "stopping IO"); - if let Some(debug_logging) = self.debug_logging.read().await.as_ref() { - debug_logging.loop_handle.abort(); - } - if let Some(scheduler) = self.inner.scheduler.write().await.take() { - scheduler.stop(self).await; - } + self.scheduler.stop(self).await; } /// Restarts the IO scheduler if it was running before /// when it is not running this is an no-op pub async fn restart_io_if_running(&self) { - info!(self, "restarting IO"); - let is_running = { self.inner.scheduler.read().await.is_some() }; - if is_running { - self.stop_io().await; - self.start_io().await; - } + self.scheduler.restart(self).await; + } + + /// Pauses the IO scheduler. + /// + /// This temporarily pauses the IO scheduler and will make sure calls to + /// [`Context::start_io`] are no-ops while being paused. + /// + /// It is recommended to call [`IoPausedGuard::resume`] rather than simply dropping it. + pub(crate) async fn pause_io(&self) -> IoPausedGuard<'_> { + self.scheduler.pause(self).await + } + + /// Indicate that the network likely has come back. + pub async fn maybe_network(&self) { + self.scheduler.maybe_network().await; } /// Returns a reference to the underlying SQL instance. diff --git a/src/ephemeral.rs b/src/ephemeral.rs index f7dadf5d8d..75fb1c1272 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -317,7 +317,7 @@ impl MsgId { paramsv![ephemeral_timestamp, ephemeral_timestamp, self], ) .await?; - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } @@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids( ) .await?; if count > 0 { - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } diff --git a/src/imap.rs b/src/imap.rs index 91f6ae9ccc..6e3437fc0d 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -475,7 +475,7 @@ impl Imap { // Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are // fetched while the per-chat ephemeral timers start as soon as the messages are marked // as noticed. - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } let session = self @@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) paramsv![message_id], ) .await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/imex.rs b/src/imex.rs index 7ca3bb15c4..af02ba1d84 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -86,13 +86,17 @@ pub async fn imex( ) -> Result<()> { let cancel = context.alloc_ongoing().await?; - let res = imex_inner(context, what, path, passphrase) - .race(async { - cancel.recv().await.ok(); - Err(format_err!("canceled")) - }) - .await; - + let res = { + let mut guard = context.pause_io().await; + let res = imex_inner(context, what, path, passphrase) + .race(async { + cancel.recv().await.ok(); + Err(format_err!("canceled")) + }) + .await; + guard.resume().await; + res + }; context.free_ongoing().await; if let Err(err) = res.as_ref() { @@ -413,7 +417,7 @@ async fn import_backup( "Cannot import backups to accounts in use." ); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot import backup, IO is running" ); @@ -523,7 +527,7 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res sql::housekeeping(context).await.ok_or_log(context); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot export backup, IO is running" ); diff --git a/src/job.rs b/src/job.rs index 25f3814a21..029272d177 100644 --- a/src/job.rs +++ b/src/job.rs @@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 { pub(crate) async fn schedule_resync(context: &Context) -> Result<()> { context.resync_request.store(true, Ordering::Relaxed); context + .scheduler .interrupt_inbox(InterruptInfo { probe_network: false, }) @@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { job.save(context).await.context("failed to save job")?; info!(context, "interrupt: imap"); - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/location.rs b/src/location.rs index 52315585ba..375c40662c 100644 --- a/src/location.rs +++ b/src/location.rs @@ -267,7 +267,7 @@ pub async fn send_locations_to_chat( } context.emit_event(EventType::ChatModified(chat_id)); if 0 != seconds { - context.interrupt_location().await; + context.scheduler.interrupt_location().await; } Ok(()) } diff --git a/src/message.rs b/src/message.rs index b287e6c572..415618f08f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { } // Interrupt Inbox loop to start message deletion and run housekeeping. - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } @@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> ) .await .context("failed to insert into smtp_mdns")?; - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } updated_chat_ids.insert(curr_chat_id); diff --git a/src/quota.rs b/src/quota.rs index b4a2be2256..f192fd1d79 100644 --- a/src/quota.rs +++ b/src/quota.rs @@ -115,7 +115,9 @@ impl Context { let requested = self.quota_update_request.swap(true, Ordering::Relaxed); if !requested { // Quota update was not requested before. - self.interrupt_inbox(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; } Ok(()) } diff --git a/src/scheduler.rs b/src/scheduler.rs index daa5429405..3970200b6e 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Context as _, Result}; use async_channel::{self as channel, Receiver, Sender}; use futures::future::try_join_all; use futures_lite::FutureExt; +use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio::task; use self::connectivity::ConnectivityStore; @@ -23,79 +24,231 @@ use crate::tools::{duration_to_str, maybe_add_time_based_warnings}; pub(crate) mod connectivity; -#[derive(Debug)] -struct SchedBox { - meaning: FolderMeaning, - conn_state: ImapConnectionState, - handle: task::JoinHandle<()>, +/// State of the IO scheduler, as stored on the [`Context`]. +/// +/// The IO scheduler can be stopped or started, but core can also pause it. After pausing +/// the IO scheduler will be restarted only if it was running before paused or +/// [`Context::start_io`] was called in the meantime while it was paused. +#[derive(Debug, Default)] +pub(crate) struct SchedulerState { + inner: RwLock, } -/// Job and connection scheduler. -#[derive(Debug)] -pub(crate) struct Scheduler { - inbox: SchedBox, - /// Optional boxes -- mvbox, sentbox. - oboxes: Vec, - smtp: SmtpConnectionState, - smtp_handle: task::JoinHandle<()>, - ephemeral_handle: task::JoinHandle<()>, - ephemeral_interrupt_send: Sender<()>, - location_handle: task::JoinHandle<()>, - location_interrupt_send: Sender<()>, +impl SchedulerState { + pub(crate) fn new() -> Self { + Default::default() + } - recently_seen_loop: RecentlySeenLoop, -} + /// Whether the scheduler is currently running. + pub(crate) async fn is_running(&self) -> bool { + let inner = self.inner.read().await; + inner.scheduler.is_some() + } -impl Context { - /// Indicate that the network likely has come back. - pub async fn maybe_network(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network(); + /// Starts the scheduler if it is not yet started. + pub(crate) async fn start(&self, context: Context) { + let mut inner = self.inner.write().await; + inner.started = true; + if inner.scheduler.is_none() { + Self::do_start(inner, context).await; } - connectivity::idle_interrupted(lock).await; } - /// Indicate that the network likely is lost. - pub async fn maybe_network_lost(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network_lost(); + /// Starts the scheduler if it is not yet started. + async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) { + info!(context, "starting IO"); + let ctx = context.clone(); + match Scheduler::start(context).await { + Ok(scheduler) => inner.scheduler = Some(scheduler), + Err(err) => error!(&ctx, "Failed to start IO: {:#}", err), } - connectivity::maybe_network_lost(self, lock).await; + } + + /// Stops the scheduler if it is currently running. + pub(crate) async fn stop(&self, context: &Context) { + let mut inner = self.inner.write().await; + inner.started = false; + Self::do_stop(inner, context).await; + } + + /// Stops the scheduler if it is currently running. + async fn do_stop(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: &Context) { + // Sending an event wakes up event pollers (get_next_event) + // so the caller of stop_io() can arrange for proper termination. + // For this, the caller needs to instruct the event poller + // to terminate on receiving the next event and then call stop_io() + // which will emit the below event(s) + info!(context, "stopping IO"); + if let Some(debug_logging) = context.debug_logging.read().await.as_ref() { + debug_logging.loop_handle.abort(); + } + if let Some(scheduler) = inner.scheduler.take() { + scheduler.stop(context).await; + } + } + + /// Pauses the scheduler. + /// + /// If it is currently running the scheduler will be stopped. When + /// [`IoPausedGuard::resume`] is called the scheduler is started again. If in the + /// meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called resume will + /// do the right thing. + pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> { + let mut inner = self.inner.write().await; + inner.paused = true; + Self::do_stop(inner, context).await; + IoPausedGuard { + context, + done: false, + } + } + + /// Restarts the scheduler, only if it is running. + pub(crate) async fn restart(&self, context: &Context) { + info!(context, "restarting IO"); + if self.is_running().await { + self.stop(context).await; + self.start(context.clone()).await; + } + } + + /// Indicate that the network likely has come back. + pub(crate) async fn maybe_network(&self) { + let inner = self.inner.read().await; + let (inbox, oboxes) = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network(); + let inbox = scheduler.inbox.conn_state.state.connectivity.clone(); + let oboxes = scheduler + .oboxes + .iter() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect::>(); + (inbox, oboxes) + } + None => return, + }; + drop(inner); + // TODO: maybe this called code should move into scheduler.maybe_network() instead? + connectivity::idle_interrupted(inbox, oboxes).await; + } + + /// Indicate that the network likely is lost. + pub(crate) async fn maybe_network_lost(&self, context: &Context) { + let inner = self.inner.read().await; + let stores = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network_lost(); + scheduler + .boxes() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect() + } + None => return, + }; + drop(inner); + // TODO; maybe this called code should move into scheduler.maybe_network_lost() + // instead? + connectivity::maybe_network_lost(context, stores).await; } pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_inbox(info); } } pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_smtp(info); } } pub(crate) async fn interrupt_ephemeral_task(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_ephemeral_task(); } } pub(crate) async fn interrupt_location(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_location(); } } pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_recently_seen(contact_id, timestamp); } } } +#[derive(Debug, Default)] +struct InnerSchedulerState { + scheduler: Option, + started: bool, + paused: bool, +} + +#[derive(Debug)] +pub(crate) struct IoPausedGuard<'a> { + context: &'a Context, + done: bool, +} + +impl<'a> IoPausedGuard<'a> { + pub(crate) async fn resume(&mut self) { + self.done = true; + let inner = self.context.scheduler.inner.write().await; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, self.context.clone()).await; + } + } +} + +impl<'a> Drop for IoPausedGuard<'a> { + fn drop(&mut self) { + if self.done { + return; + } + let context = self.context.clone(); + tokio::spawn(async move { + let inner = context.scheduler.inner.write().await; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, context.clone()).await; + } + }); + } +} + +#[derive(Debug)] +struct SchedBox { + meaning: FolderMeaning, + conn_state: ImapConnectionState, + handle: task::JoinHandle<()>, +} + +/// Job and connection scheduler. +#[derive(Debug)] +pub(crate) struct Scheduler { + inbox: SchedBox, + /// Optional boxes -- mvbox, sentbox. + oboxes: Vec, + smtp: SmtpConnectionState, + smtp_handle: task::JoinHandle<()>, + ephemeral_handle: task::JoinHandle<()>, + ephemeral_interrupt_send: Sender<()>, + location_handle: task::JoinHandle<()>, + location_interrupt_send: Sender<()>, + + recently_seen_loop: RecentlySeenLoop, +} + async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { use futures::future::FutureExt; diff --git a/src/scheduler/connectivity.rs b/src/scheduler/connectivity.rs index 6ede2074f0..91edcad614 100644 --- a/src/scheduler/connectivity.rs +++ b/src/scheduler/connectivity.rs @@ -3,7 +3,7 @@ use std::{iter::once, ops::Deref, sync::Arc}; use anyhow::{anyhow, Result}; use humansize::{format_size, BINARY}; -use tokio::sync::{Mutex, RwLockReadGuard}; +use tokio::sync::Mutex; use crate::events::EventType; use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning}; @@ -12,7 +12,7 @@ use crate::quota::{ }; use crate::tools::time; use crate::{context::Context, log::LogExt}; -use crate::{scheduler::Scheduler, stock_str, tools}; +use crate::{stock_str, tools}; #[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)] pub enum Connectivity { @@ -156,19 +156,7 @@ impl ConnectivityStore { /// Set all folder states to InterruptingIdle in case they were `Connected` before. /// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()` /// returns false immediately after `dc_maybe_network()`. -pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>) { - let (inbox, oboxes) = match &*scheduler { - Some(Scheduler { inbox, oboxes, .. }) => ( - inbox.conn_state.state.connectivity.clone(), - oboxes - .iter() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect::>(), - ), - None => return, - }; - drop(scheduler); - +pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec) { let mut connectivity_lock = inbox.0.lock().await; // For the inbox, we also have to set the connectivity to InterruptingIdle if it was // NotConfigured before: If all folders are NotConfigured, dc_get_connectivity() @@ -195,19 +183,7 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>, -) { - let stores: Vec<_> = match &*scheduler { - Some(sched) => sched - .boxes() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect(), - None => return, - }; - drop(scheduler); - +pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec) { for store in &stores { let mut connectivity_lock = store.0.lock().await; if !matches!( @@ -249,9 +225,9 @@ impl Context { /// /// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted. pub async fn get_connectivity(&self) -> Connectivity { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| b.conn_state.state.connectivity.clone()) .collect(), @@ -332,9 +308,9 @@ impl Context { // Get the states from the RwLock // ============================================================================================= - let lock = self.scheduler.read().await; - let (folders_states, smtp) = match &*lock { - Some(sched) => ( + let lock = self.scheduler.inner.read().await; + let (folders_states, smtp) = match lock.scheduler { + Some(ref sched) => ( sched .boxes() .map(|b| (b.meaning, b.conn_state.state.connectivity.clone())) @@ -503,9 +479,9 @@ impl Context { /// Returns true if all background work is done. pub async fn all_work_done(&self) -> bool { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| &b.conn_state.state) .chain(once(&sched.smtp.state)) diff --git a/src/webxdc.rs b/src/webxdc.rs index bfaaa87413..d5873df554 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -421,7 +421,9 @@ impl Context { DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr", paramsv![instance.id, status_update_serial, status_update_serial, descr], ).await?; - self.interrupt_smtp(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(()) } From 097113f01e8b041dd829e7c2b3488e6c389583d2 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 Mar 2023 14:39:40 +0100 Subject: [PATCH 02/12] fixup paused flag use --- src/scheduler.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index 3970200b6e..5c2db37ea1 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -49,7 +49,7 @@ impl SchedulerState { pub(crate) async fn start(&self, context: Context) { let mut inner = self.inner.write().await; inner.started = true; - if inner.scheduler.is_none() { + if inner.scheduler.is_none() && !inner.paused { Self::do_start(inner, context).await; } } @@ -204,7 +204,8 @@ pub(crate) struct IoPausedGuard<'a> { impl<'a> IoPausedGuard<'a> { pub(crate) async fn resume(&mut self) { self.done = true; - let inner = self.context.scheduler.inner.write().await; + let mut inner = self.context.scheduler.inner.write().await; + inner.paused = false; if inner.started && inner.scheduler.is_none() { SchedulerState::do_start(inner, self.context.clone()).await; } @@ -218,7 +219,8 @@ impl<'a> Drop for IoPausedGuard<'a> { } let context = self.context.clone(); tokio::spawn(async move { - let inner = context.scheduler.inner.write().await; + let mut inner = context.scheduler.inner.write().await; + inner.paused = false; if inner.started && inner.scheduler.is_none() { SchedulerState::do_start(inner, context.clone()).await; } From 32a7e5ed824ccf724c674b99b3674e8c057031a2 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 Mar 2023 16:17:14 +0100 Subject: [PATCH 03/12] Remove requirement for stopping io for imex --- deltachat-ffi/deltachat.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 9208579c15..086f2cd985 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -2100,8 +2100,7 @@ dc_contact_t* dc_get_contact (dc_context_t* context, uint32_t co /** * Import/export things. - * During backup import/export IO must not be started, - * if needed stop IO using dc_accounts_stop_io() or dc_stop_io() first. + * * What to do is defined by the _what_ parameter which may be one of the following: * * - **DC_IMEX_EXPORT_BACKUP** (11) - Export a backup to the directory given as `param1` From 52fa58a3ce8c7695bc033881ebfc9ad6b73365e1 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 Mar 2023 16:24:24 +0100 Subject: [PATCH 04/12] No need for jsonrpc to do this manually --- deltachat-jsonrpc/src/api/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index 0f466d1d4b..b47ab6a5c6 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -1325,7 +1325,6 @@ impl CommandApi { passphrase: Option, ) -> Result<()> { let ctx = self.get_context(account_id).await?; - ctx.stop_io().await; let result = imex::imex( &ctx, imex::ImexMode::ExportBackup, @@ -1333,7 +1332,6 @@ impl CommandApi { passphrase, ) .await; - ctx.start_io().await; result } From 2c3b2b8c2dc97ed5ed1a8e69b3ffdf497af4a335 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 9 Mar 2023 15:44:17 +0100 Subject: [PATCH 05/12] move pause to only exist on Scheduler --- src/context.rs | 12 +----------- src/imex.rs | 2 +- src/scheduler.rs | 10 ++++++---- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/context.rs b/src/context.rs index 9c2eb33e2a..682d39f9c7 100644 --- a/src/context.rs +++ b/src/context.rs @@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; -use crate::scheduler::{IoPausedGuard, SchedulerState}; +use crate::scheduler::SchedulerState; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -409,16 +409,6 @@ impl Context { self.scheduler.restart(self).await; } - /// Pauses the IO scheduler. - /// - /// This temporarily pauses the IO scheduler and will make sure calls to - /// [`Context::start_io`] are no-ops while being paused. - /// - /// It is recommended to call [`IoPausedGuard::resume`] rather than simply dropping it. - pub(crate) async fn pause_io(&self) -> IoPausedGuard<'_> { - self.scheduler.pause(self).await - } - /// Indicate that the network likely has come back. pub async fn maybe_network(&self) { self.scheduler.maybe_network().await; diff --git a/src/imex.rs b/src/imex.rs index af02ba1d84..eeffbe2311 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -87,7 +87,7 @@ pub async fn imex( let cancel = context.alloc_ongoing().await?; let res = { - let mut guard = context.pause_io().await; + let mut guard = context.scheduler.pause(context).await; let res = imex_inner(context, what, path, passphrase) .race(async { cancel.recv().await.ok(); diff --git a/src/scheduler.rs b/src/scheduler.rs index 5c2db37ea1..9c9600d3ad 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -87,12 +87,14 @@ impl SchedulerState { } } - /// Pauses the scheduler. + /// Pauses the IO scheduler. /// /// If it is currently running the scheduler will be stopped. When - /// [`IoPausedGuard::resume`] is called the scheduler is started again. If in the - /// meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called resume will - /// do the right thing. + /// [`IoPausedGuard::resume`] is called the scheduler is started again. + /// + /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called + /// resume will do the right thing and restore the scheduler to the state requested by + /// the last call. pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> { let mut inner = self.inner.write().await; inner.paused = true; From 0079cd47660bbccefcca1f361e53d8581380fc36 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 9 Mar 2023 15:58:58 +0100 Subject: [PATCH 06/12] Add changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5538e67672..0eb34661a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ - Support non-persistent configuration with DELTACHAT_* env - Print deltachat-repl errors with causes. #4166 - Increase MSRV to 1.64. #4167 +- Core takes of stopping and re-starting IO itself where needed, + e.g. during backup creation. It is no longer needed to call + dc_stop_io(). dc_start_io() can now be called at any time without + harm. #4138 ### Fixes - Fix segmentation fault if `dc_context_unref()` is called during From 4bf38c0e2966d02ef5c087b2a76e5aed29d3c765 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sat, 11 Mar 2023 12:32:33 +0100 Subject: [PATCH 07/12] clippy --- deltachat-jsonrpc/src/api/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index b47ab6a5c6..cc0ad3b01a 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -1325,14 +1325,13 @@ impl CommandApi { passphrase: Option, ) -> Result<()> { let ctx = self.get_context(account_id).await?; - let result = imex::imex( + imex::imex( &ctx, imex::ImexMode::ExportBackup, destination.as_ref(), passphrase, ) - .await; - result + .await } async fn import_backup( From a2e7d914a0fb31c26891a669463a4834c2b2e168 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 09:37:09 +0000 Subject: [PATCH 08/12] Changelog fixup --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0eb34661a3..1fb0f71eb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ - Support non-persistent configuration with DELTACHAT_* env - Print deltachat-repl errors with causes. #4166 - Increase MSRV to 1.64. #4167 -- Core takes of stopping and re-starting IO itself where needed, +- Core takes care of stopping and re-starting IO itself where needed, e.g. during backup creation. It is no longer needed to call dc_stop_io(). dc_start_io() can now be called at any time without harm. #4138 From 81418d8ee55630984cb2ae6ceec5e504e1c5fc33 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 10:13:59 +0000 Subject: [PATCH 09/12] Log error on pause guard drop without resuming instead of working around I checked that tests still pass even if error! is replaced with panic! --- src/scheduler.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index 9c9600d3ad..e1638fd090 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -219,14 +219,9 @@ impl<'a> Drop for IoPausedGuard<'a> { if self.done { return; } - let context = self.context.clone(); - tokio::spawn(async move { - let mut inner = context.scheduler.inner.write().await; - inner.paused = false; - if inner.started && inner.scheduler.is_none() { - SchedulerState::do_start(inner, context.clone()).await; - } - }); + + // Async .resume() should be called manually due to lack of async drop. + error!(self.context, "Pause guard dropped without resuming."); } } From 3177f9967deb0c0ff0ee913a84a2d8ca64fc9285 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 10:16:27 +0000 Subject: [PATCH 10/12] Add a comment aronud IMAP loop task handle --- src/scheduler.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/scheduler.rs b/src/scheduler.rs index e1638fd090..41f0054343 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -229,6 +229,8 @@ impl<'a> Drop for IoPausedGuard<'a> { struct SchedBox { meaning: FolderMeaning, conn_state: ImapConnectionState, + + /// IMAP loop task handle. handle: task::JoinHandle<()>, } From 17de3d323642e0ae2d126bca88b6ff4e146abb16 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 10:17:18 +0000 Subject: [PATCH 11/12] Remove TODOs --- src/scheduler.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index 41f0054343..b57cda3713 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -131,7 +131,6 @@ impl SchedulerState { None => return, }; drop(inner); - // TODO: maybe this called code should move into scheduler.maybe_network() instead? connectivity::idle_interrupted(inbox, oboxes).await; } @@ -149,8 +148,6 @@ impl SchedulerState { None => return, }; drop(inner); - // TODO; maybe this called code should move into scheduler.maybe_network_lost() - // instead? connectivity::maybe_network_lost(context, stores).await; } From e39429c2e35a794b3f78cdc0ee69df310203f79e Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 10:18:49 +0000 Subject: [PATCH 12/12] rustfmt --- src/scheduler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index b57cda3713..d5571f8356 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -216,7 +216,7 @@ impl<'a> Drop for IoPausedGuard<'a> { if self.done { return; } - + // Async .resume() should be called manually due to lack of async drop. error!(self.context, "Pause guard dropped without resuming."); }