diff --git a/CHANGELOG.md b/CHANGELOG.md index 5538e67672..1fb0f71eb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ - Support non-persistent configuration with DELTACHAT_* env - Print deltachat-repl errors with causes. #4166 - Increase MSRV to 1.64. #4167 +- Core takes care of stopping and re-starting IO itself where needed, + e.g. during backup creation. It is no longer needed to call + dc_stop_io(). dc_start_io() can now be called at any time without + harm. #4138 ### Fixes - Fix segmentation fault if `dc_context_unref()` is called during diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 9208579c15..086f2cd985 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -2100,8 +2100,7 @@ dc_contact_t* dc_get_contact (dc_context_t* context, uint32_t co /** * Import/export things. - * During backup import/export IO must not be started, - * if needed stop IO using dc_accounts_stop_io() or dc_stop_io() first. + * * What to do is defined by the _what_ parameter which may be one of the following: * * - **DC_IMEX_EXPORT_BACKUP** (11) - Export a backup to the directory given as `param1` diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index 0f466d1d4b..cc0ad3b01a 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -1325,16 +1325,13 @@ impl CommandApi { passphrase: Option, ) -> Result<()> { let ctx = self.get_context(account_id).await?; - ctx.stop_io().await; - let result = imex::imex( + imex::imex( &ctx, imex::ImexMode::ExportBackup, destination.as_ref(), passphrase, ) - .await; - ctx.start_io().await; - result + .await } async fn import_backup( diff --git a/src/accounts.rs b/src/accounts.rs index e1620526d1..a379b44e1b 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -271,14 +271,14 @@ impl Accounts { /// Notifies all accounts that the network may have become available. pub async fn maybe_network(&self) { for account in self.accounts.values() { - account.maybe_network().await; + account.scheduler.maybe_network().await; } } /// Notifies all accounts that the network connection may have been lost. pub async fn maybe_network_lost(&self) { for account in self.accounts.values() { - account.maybe_network_lost().await; + account.scheduler.maybe_network_lost(account).await; } } diff --git a/src/chat.rs b/src/chat.rs index 0442dea3ca..9d49ec5529 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -639,7 +639,10 @@ impl ChatId { context.emit_msgs_changed_without_ids(); context.set_config(Config::LastHousekeeping, None).await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; if chat.is_self_talk() { let mut msg = Message::new(Viewtype::Text); @@ -1667,7 +1670,7 @@ impl Chat { maybe_set_logging_xdc(context, msg, self.id).await?; } - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; Ok(msg.id) } } @@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - context.emit_event(EventType::LocationChanged(Some(ContactId::SELF))); } - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(msg.id) @@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .await?; curr_timestamp += 1; if create_send_msg_job(context, new_msg_id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } created_chats.push(chat_id); @@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg_id: msg.id, }); if create_send_msg_job(context, msg.id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } } diff --git a/src/config.rs b/src/config.rs index 76dca83a2f..035caf3143 100644 --- a/src/config.rs +++ b/src/config.rs @@ -443,7 +443,7 @@ impl Context { Config::DeleteDeviceAfter => { let ret = self.sql.set_raw_config(key.as_ref(), value).await; // Interrupt ephemeral loop to delete old messages immediately. - self.interrupt_ephemeral_task().await; + self.scheduler.interrupt_ephemeral_task().await; ret? } Config::Displayname => { diff --git a/src/configure.rs b/src/configure.rs index 48b91a3f72..eb69ad25cd 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -59,7 +59,7 @@ impl Context { /// Configures this account with the currently set parameters. pub async fn configure(&self) -> Result<()> { ensure!( - self.scheduler.read().await.is_none(), + !self.scheduler.is_running().await, "cannot configure, already running" ); ensure!( @@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { ctx.set_config_bool(Config::FetchedExistingMsgs, false) .await?; - ctx.interrupt_inbox(InterruptInfo::new(false)).await; + ctx.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; progress!(ctx, 940); update_device_chats_handle.await??; diff --git a/src/contact.rs b/src/contact.rs index 1d00a80f00..07430d3082 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen( > 0 && timestamp > time() - SEEN_RECENTLY_SECONDS { - context.interrupt_recently_seen(contact_id, timestamp).await; + context + .scheduler + .interrupt_recently_seen(contact_id, timestamp) + .await; } Ok(()) } diff --git a/src/context.rs b/src/context.rs index 13479a7eac..682d39f9c7 100644 --- a/src/context.rs +++ b/src/context.rs @@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; -use crate::scheduler::Scheduler; +use crate::scheduler::SchedulerState; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -201,7 +201,7 @@ pub struct InnerContext { pub(crate) translated_stockstrings: StockStrings, pub(crate) events: Events, - pub(crate) scheduler: RwLock>, + pub(crate) scheduler: SchedulerState, pub(crate) ratelimit: RwLock, /// Recently loaded quota information, if any. @@ -370,7 +370,7 @@ impl Context { wrong_pw_warning_mutex: Mutex::new(()), translated_stockstrings: stockstrings, events, - scheduler: RwLock::new(None), + scheduler: SchedulerState::new(), ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds. quota: RwLock::new(None), quota_update_request: AtomicBool::new(false), @@ -395,42 +395,23 @@ impl Context { warn!(self, "can not start io on a context that is not configured"); return; } - - info!(self, "starting IO"); - let mut lock = self.inner.scheduler.write().await; - if lock.is_none() { - match Scheduler::start(self.clone()).await { - Err(err) => error!(self, "Failed to start IO: {:#}", err), - Ok(scheduler) => *lock = Some(scheduler), - } - } + self.scheduler.start(self.clone()).await; } /// Stops the IO scheduler. pub async fn stop_io(&self) { - // Sending an event wakes up event pollers (get_next_event) - // so the caller of stop_io() can arrange for proper termination. - // For this, the caller needs to instruct the event poller - // to terminate on receiving the next event and then call stop_io() - // which will emit the below event(s) - info!(self, "stopping IO"); - if let Some(debug_logging) = self.debug_logging.read().await.as_ref() { - debug_logging.loop_handle.abort(); - } - if let Some(scheduler) = self.inner.scheduler.write().await.take() { - scheduler.stop(self).await; - } + self.scheduler.stop(self).await; } /// Restarts the IO scheduler if it was running before /// when it is not running this is an no-op pub async fn restart_io_if_running(&self) { - info!(self, "restarting IO"); - let is_running = { self.inner.scheduler.read().await.is_some() }; - if is_running { - self.stop_io().await; - self.start_io().await; - } + self.scheduler.restart(self).await; + } + + /// Indicate that the network likely has come back. + pub async fn maybe_network(&self) { + self.scheduler.maybe_network().await; } /// Returns a reference to the underlying SQL instance. diff --git a/src/ephemeral.rs b/src/ephemeral.rs index f7dadf5d8d..75fb1c1272 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -317,7 +317,7 @@ impl MsgId { paramsv![ephemeral_timestamp, ephemeral_timestamp, self], ) .await?; - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } @@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids( ) .await?; if count > 0 { - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } diff --git a/src/imap.rs b/src/imap.rs index 91f6ae9ccc..6e3437fc0d 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -475,7 +475,7 @@ impl Imap { // Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are // fetched while the per-chat ephemeral timers start as soon as the messages are marked // as noticed. - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } let session = self @@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) paramsv![message_id], ) .await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/imex.rs b/src/imex.rs index 7ca3bb15c4..eeffbe2311 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -86,13 +86,17 @@ pub async fn imex( ) -> Result<()> { let cancel = context.alloc_ongoing().await?; - let res = imex_inner(context, what, path, passphrase) - .race(async { - cancel.recv().await.ok(); - Err(format_err!("canceled")) - }) - .await; - + let res = { + let mut guard = context.scheduler.pause(context).await; + let res = imex_inner(context, what, path, passphrase) + .race(async { + cancel.recv().await.ok(); + Err(format_err!("canceled")) + }) + .await; + guard.resume().await; + res + }; context.free_ongoing().await; if let Err(err) = res.as_ref() { @@ -413,7 +417,7 @@ async fn import_backup( "Cannot import backups to accounts in use." ); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot import backup, IO is running" ); @@ -523,7 +527,7 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res sql::housekeeping(context).await.ok_or_log(context); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot export backup, IO is running" ); diff --git a/src/job.rs b/src/job.rs index 25f3814a21..029272d177 100644 --- a/src/job.rs +++ b/src/job.rs @@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 { pub(crate) async fn schedule_resync(context: &Context) -> Result<()> { context.resync_request.store(true, Ordering::Relaxed); context + .scheduler .interrupt_inbox(InterruptInfo { probe_network: false, }) @@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { job.save(context).await.context("failed to save job")?; info!(context, "interrupt: imap"); - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/location.rs b/src/location.rs index 52315585ba..375c40662c 100644 --- a/src/location.rs +++ b/src/location.rs @@ -267,7 +267,7 @@ pub async fn send_locations_to_chat( } context.emit_event(EventType::ChatModified(chat_id)); if 0 != seconds { - context.interrupt_location().await; + context.scheduler.interrupt_location().await; } Ok(()) } diff --git a/src/message.rs b/src/message.rs index b287e6c572..415618f08f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { } // Interrupt Inbox loop to start message deletion and run housekeeping. - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } @@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> ) .await .context("failed to insert into smtp_mdns")?; - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } updated_chat_ids.insert(curr_chat_id); diff --git a/src/quota.rs b/src/quota.rs index b4a2be2256..f192fd1d79 100644 --- a/src/quota.rs +++ b/src/quota.rs @@ -115,7 +115,9 @@ impl Context { let requested = self.quota_update_request.swap(true, Ordering::Relaxed); if !requested { // Quota update was not requested before. - self.interrupt_inbox(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; } Ok(()) } diff --git a/src/scheduler.rs b/src/scheduler.rs index daa5429405..d5571f8356 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Context as _, Result}; use async_channel::{self as channel, Receiver, Sender}; use futures::future::try_join_all; use futures_lite::FutureExt; +use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio::task; use self::connectivity::ConnectivityStore; @@ -23,79 +24,229 @@ use crate::tools::{duration_to_str, maybe_add_time_based_warnings}; pub(crate) mod connectivity; -#[derive(Debug)] -struct SchedBox { - meaning: FolderMeaning, - conn_state: ImapConnectionState, - handle: task::JoinHandle<()>, +/// State of the IO scheduler, as stored on the [`Context`]. +/// +/// The IO scheduler can be stopped or started, but core can also pause it. After pausing +/// the IO scheduler will be restarted only if it was running before paused or +/// [`Context::start_io`] was called in the meantime while it was paused. +#[derive(Debug, Default)] +pub(crate) struct SchedulerState { + inner: RwLock, } -/// Job and connection scheduler. -#[derive(Debug)] -pub(crate) struct Scheduler { - inbox: SchedBox, - /// Optional boxes -- mvbox, sentbox. - oboxes: Vec, - smtp: SmtpConnectionState, - smtp_handle: task::JoinHandle<()>, - ephemeral_handle: task::JoinHandle<()>, - ephemeral_interrupt_send: Sender<()>, - location_handle: task::JoinHandle<()>, - location_interrupt_send: Sender<()>, +impl SchedulerState { + pub(crate) fn new() -> Self { + Default::default() + } - recently_seen_loop: RecentlySeenLoop, -} + /// Whether the scheduler is currently running. + pub(crate) async fn is_running(&self) -> bool { + let inner = self.inner.read().await; + inner.scheduler.is_some() + } -impl Context { - /// Indicate that the network likely has come back. - pub async fn maybe_network(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network(); + /// Starts the scheduler if it is not yet started. + pub(crate) async fn start(&self, context: Context) { + let mut inner = self.inner.write().await; + inner.started = true; + if inner.scheduler.is_none() && !inner.paused { + Self::do_start(inner, context).await; } - connectivity::idle_interrupted(lock).await; } - /// Indicate that the network likely is lost. - pub async fn maybe_network_lost(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network_lost(); + /// Starts the scheduler if it is not yet started. + async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) { + info!(context, "starting IO"); + let ctx = context.clone(); + match Scheduler::start(context).await { + Ok(scheduler) => inner.scheduler = Some(scheduler), + Err(err) => error!(&ctx, "Failed to start IO: {:#}", err), + } + } + + /// Stops the scheduler if it is currently running. + pub(crate) async fn stop(&self, context: &Context) { + let mut inner = self.inner.write().await; + inner.started = false; + Self::do_stop(inner, context).await; + } + + /// Stops the scheduler if it is currently running. + async fn do_stop(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: &Context) { + // Sending an event wakes up event pollers (get_next_event) + // so the caller of stop_io() can arrange for proper termination. + // For this, the caller needs to instruct the event poller + // to terminate on receiving the next event and then call stop_io() + // which will emit the below event(s) + info!(context, "stopping IO"); + if let Some(debug_logging) = context.debug_logging.read().await.as_ref() { + debug_logging.loop_handle.abort(); + } + if let Some(scheduler) = inner.scheduler.take() { + scheduler.stop(context).await; + } + } + + /// Pauses the IO scheduler. + /// + /// If it is currently running the scheduler will be stopped. When + /// [`IoPausedGuard::resume`] is called the scheduler is started again. + /// + /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called + /// resume will do the right thing and restore the scheduler to the state requested by + /// the last call. + pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> { + let mut inner = self.inner.write().await; + inner.paused = true; + Self::do_stop(inner, context).await; + IoPausedGuard { + context, + done: false, + } + } + + /// Restarts the scheduler, only if it is running. + pub(crate) async fn restart(&self, context: &Context) { + info!(context, "restarting IO"); + if self.is_running().await { + self.stop(context).await; + self.start(context.clone()).await; } - connectivity::maybe_network_lost(self, lock).await; + } + + /// Indicate that the network likely has come back. + pub(crate) async fn maybe_network(&self) { + let inner = self.inner.read().await; + let (inbox, oboxes) = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network(); + let inbox = scheduler.inbox.conn_state.state.connectivity.clone(); + let oboxes = scheduler + .oboxes + .iter() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect::>(); + (inbox, oboxes) + } + None => return, + }; + drop(inner); + connectivity::idle_interrupted(inbox, oboxes).await; + } + + /// Indicate that the network likely is lost. + pub(crate) async fn maybe_network_lost(&self, context: &Context) { + let inner = self.inner.read().await; + let stores = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network_lost(); + scheduler + .boxes() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect() + } + None => return, + }; + drop(inner); + connectivity::maybe_network_lost(context, stores).await; } pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_inbox(info); } } pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_smtp(info); } } pub(crate) async fn interrupt_ephemeral_task(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_ephemeral_task(); } } pub(crate) async fn interrupt_location(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_location(); } } pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { - if let Some(scheduler) = &*self.scheduler.read().await { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { scheduler.interrupt_recently_seen(contact_id, timestamp); } } } +#[derive(Debug, Default)] +struct InnerSchedulerState { + scheduler: Option, + started: bool, + paused: bool, +} + +#[derive(Debug)] +pub(crate) struct IoPausedGuard<'a> { + context: &'a Context, + done: bool, +} + +impl<'a> IoPausedGuard<'a> { + pub(crate) async fn resume(&mut self) { + self.done = true; + let mut inner = self.context.scheduler.inner.write().await; + inner.paused = false; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, self.context.clone()).await; + } + } +} + +impl<'a> Drop for IoPausedGuard<'a> { + fn drop(&mut self) { + if self.done { + return; + } + + // Async .resume() should be called manually due to lack of async drop. + error!(self.context, "Pause guard dropped without resuming."); + } +} + +#[derive(Debug)] +struct SchedBox { + meaning: FolderMeaning, + conn_state: ImapConnectionState, + + /// IMAP loop task handle. + handle: task::JoinHandle<()>, +} + +/// Job and connection scheduler. +#[derive(Debug)] +pub(crate) struct Scheduler { + inbox: SchedBox, + /// Optional boxes -- mvbox, sentbox. + oboxes: Vec, + smtp: SmtpConnectionState, + smtp_handle: task::JoinHandle<()>, + ephemeral_handle: task::JoinHandle<()>, + ephemeral_interrupt_send: Sender<()>, + location_handle: task::JoinHandle<()>, + location_interrupt_send: Sender<()>, + + recently_seen_loop: RecentlySeenLoop, +} + async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { use futures::future::FutureExt; diff --git a/src/scheduler/connectivity.rs b/src/scheduler/connectivity.rs index 6ede2074f0..91edcad614 100644 --- a/src/scheduler/connectivity.rs +++ b/src/scheduler/connectivity.rs @@ -3,7 +3,7 @@ use std::{iter::once, ops::Deref, sync::Arc}; use anyhow::{anyhow, Result}; use humansize::{format_size, BINARY}; -use tokio::sync::{Mutex, RwLockReadGuard}; +use tokio::sync::Mutex; use crate::events::EventType; use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning}; @@ -12,7 +12,7 @@ use crate::quota::{ }; use crate::tools::time; use crate::{context::Context, log::LogExt}; -use crate::{scheduler::Scheduler, stock_str, tools}; +use crate::{stock_str, tools}; #[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)] pub enum Connectivity { @@ -156,19 +156,7 @@ impl ConnectivityStore { /// Set all folder states to InterruptingIdle in case they were `Connected` before. /// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()` /// returns false immediately after `dc_maybe_network()`. -pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>) { - let (inbox, oboxes) = match &*scheduler { - Some(Scheduler { inbox, oboxes, .. }) => ( - inbox.conn_state.state.connectivity.clone(), - oboxes - .iter() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect::>(), - ), - None => return, - }; - drop(scheduler); - +pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec) { let mut connectivity_lock = inbox.0.lock().await; // For the inbox, we also have to set the connectivity to InterruptingIdle if it was // NotConfigured before: If all folders are NotConfigured, dc_get_connectivity() @@ -195,19 +183,7 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>, -) { - let stores: Vec<_> = match &*scheduler { - Some(sched) => sched - .boxes() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect(), - None => return, - }; - drop(scheduler); - +pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec) { for store in &stores { let mut connectivity_lock = store.0.lock().await; if !matches!( @@ -249,9 +225,9 @@ impl Context { /// /// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted. pub async fn get_connectivity(&self) -> Connectivity { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| b.conn_state.state.connectivity.clone()) .collect(), @@ -332,9 +308,9 @@ impl Context { // Get the states from the RwLock // ============================================================================================= - let lock = self.scheduler.read().await; - let (folders_states, smtp) = match &*lock { - Some(sched) => ( + let lock = self.scheduler.inner.read().await; + let (folders_states, smtp) = match lock.scheduler { + Some(ref sched) => ( sched .boxes() .map(|b| (b.meaning, b.conn_state.state.connectivity.clone())) @@ -503,9 +479,9 @@ impl Context { /// Returns true if all background work is done. pub async fn all_work_done(&self) -> bool { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| &b.conn_state.state) .chain(once(&sched.smtp.state)) diff --git a/src/webxdc.rs b/src/webxdc.rs index bfaaa87413..d5873df554 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -421,7 +421,9 @@ impl Context { DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr", paramsv![instance.id, status_update_serial, status_update_serial, descr], ).await?; - self.interrupt_smtp(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(()) }