diff --git a/src/chat.rs b/src/chat.rs index c6495cbf76..054218674d 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -35,7 +35,6 @@ use crate::mimeparser::SystemMessage; use crate::param::{Param, Params}; use crate::peerstate::{Peerstate, PeerstateVerifiedStatus}; use crate::receive_imf::ReceivedMsg; -use crate::scheduler::InterruptInfo; use crate::smtp::send_msg_to_smtp; use crate::sql; use crate::stock_str; @@ -702,10 +701,7 @@ impl ChatId { context.emit_msgs_changed_without_ids(); context.set_config(Config::LastHousekeeping, None).await?; - context - .scheduler - .interrupt_inbox(InterruptInfo::new(false)) - .await; + context.scheduler.interrupt_inbox().await; if chat.is_self_talk() { let mut msg = Message::new(Viewtype::Text); @@ -2503,10 +2499,7 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - context.emit_event(EventType::LocationChanged(Some(ContactId::SELF))); } - context - .scheduler - .interrupt_smtp(InterruptInfo::new(false)) - .await; + context.scheduler.interrupt_smtp().await; } Ok(msg.id) @@ -3736,10 +3729,7 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .await?; curr_timestamp += 1; if create_send_msg_job(context, &mut msg).await?.is_some() { - context - .scheduler - .interrupt_smtp(InterruptInfo::new(false)) - .await; + context.scheduler.interrupt_smtp().await; } } created_chats.push(chat_id); @@ -3796,10 +3786,7 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg_id: msg.id, }); if create_send_msg_job(context, &mut msg).await?.is_some() { - context - .scheduler - .interrupt_smtp(InterruptInfo::new(false)) - .await; + context.scheduler.interrupt_smtp().await; } } Ok(()) diff --git a/src/configure.rs b/src/configure.rs index 4c1c77090b..8d128a8359 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -31,7 +31,6 @@ use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam}; use crate::message::{Message, Viewtype}; use crate::oauth2::get_oauth2_addr; use crate::provider::{Protocol, Socket, UsernamePattern}; -use crate::scheduler::InterruptInfo; use crate::smtp::Smtp; use crate::socks::Socks5Config; use crate::stock_str; @@ -481,9 +480,7 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { ctx.set_config_bool(Config::FetchedExistingMsgs, false) .await?; - ctx.scheduler - .interrupt_inbox(InterruptInfo::new(false)) - .await; + ctx.scheduler.interrupt_inbox().await; progress!(ctx, 940); update_device_chats_handle.await??; diff --git a/src/context.rs b/src/context.rs index 556d6afa41..1e04c618bc 100644 --- a/src/context.rs +++ b/src/context.rs @@ -23,7 +23,7 @@ use crate::key::{load_self_public_key, DcKey as _}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; -use crate::scheduler::{InterruptInfo, SchedulerState}; +use crate::scheduler::SchedulerState; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -437,11 +437,7 @@ impl Context { pub(crate) async fn schedule_resync(&self) -> Result<()> { self.resync_request.store(true, Ordering::Relaxed); - self.scheduler - .interrupt_inbox(InterruptInfo { - probe_network: false, - }) - .await; + self.scheduler.interrupt_inbox().await; Ok(()) } diff --git a/src/download.rs b/src/download.rs index 09ab716c81..5c626fc711 100644 --- a/src/download.rs +++ b/src/download.rs @@ -12,7 +12,6 @@ use crate::context::Context; use crate::imap::{Imap, ImapActionResult}; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::{MimeMessage, Part}; -use crate::scheduler::InterruptInfo; use crate::tools::time; use crate::{stock_str, EventType}; @@ -93,10 +92,7 @@ impl MsgId { .sql .execute("INSERT INTO download (msg_id) VALUES (?)", (self,)) .await?; - context - .scheduler - .interrupt_inbox(InterruptInfo::new(false)) - .await; + context.scheduler.interrupt_inbox().await; } } Ok(()) diff --git a/src/imap.rs b/src/imap.rs index 0eeddd57ac..0cb3165a8a 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -35,7 +35,6 @@ use crate::receive_imf::{ from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner, ReceivedMsg, }; use crate::scheduler::connectivity::ConnectivityStore; -use crate::scheduler::InterruptInfo; use crate::socks::Socks5Config; use crate::sql; use crate::stock_str; @@ -86,7 +85,7 @@ const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])"; #[derive(Debug)] pub struct Imap { - pub(crate) idle_interrupt_receiver: Receiver, + pub(crate) idle_interrupt_receiver: Receiver<()>, config: ImapConfig, pub(crate) session: Option, login_failed_once: bool, @@ -228,7 +227,7 @@ impl Imap { socks5_config: Option, addr: &str, provider_strict_tls: bool, - idle_interrupt_receiver: Receiver, + idle_interrupt_receiver: Receiver<()>, ) -> Result { if lp.server.is_empty() || lp.user.is_empty() || lp.password.is_empty() { bail!("Incomplete IMAP connection parameters"); @@ -261,7 +260,7 @@ impl Imap { /// Creates new disconnected IMAP client using configured parameters. pub async fn new_configured( context: &Context, - idle_interrupt_receiver: Receiver, + idle_interrupt_receiver: Receiver<()>, ) -> Result { if !context.is_configured().await? { bail!("IMAP Connect without configured params"); @@ -2290,10 +2289,7 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) (message_id,), ) .await?; - context - .scheduler - .interrupt_inbox(InterruptInfo::new(false)) - .await; + context.scheduler.interrupt_inbox().await; Ok(()) } diff --git a/src/imap/idle.rs b/src/imap/idle.rs index 782935df5a..db7debe0dd 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -8,9 +8,9 @@ use futures_lite::FutureExt; use super::session::Session; use super::Imap; use crate::config::Config; +use crate::context::Context; use crate::imap::{client::IMAP_TIMEOUT, get_uid_next, FolderMeaning}; use crate::log::LogExt; -use crate::{context::Context, scheduler::InterruptInfo}; const IDLE_TIMEOUT: Duration = Duration::from_secs(23 * 60); @@ -18,25 +18,15 @@ impl Session { pub async fn idle( mut self, context: &Context, - idle_interrupt_receiver: Receiver, + idle_interrupt_receiver: Receiver<()>, folder: &str, - ) -> Result<(Self, InterruptInfo)> { + ) -> Result { use futures::future::FutureExt; - if context.get_config_bool(Config::DisableIdle).await? { - bail!("IMAP IDLE is disabled"); - } - - if !self.can_idle() { - bail!("IMAP server does not have IDLE capability"); - } - - let mut info = Default::default(); - self.select_folder(context, Some(folder)).await?; if self.server_sent_unsolicited_exists(context)? { - return Ok((self, info)); + return Ok(self); } // Despite checking for unsolicited EXISTS above, @@ -55,16 +45,16 @@ impl Session { context, "Skipping IDLE on {folder:?} because UIDNEXT {uid_next}>{expected_uid_next} indicates there are new messages." ); - return Ok((self, info)); + return Ok(self); } } else { warn!(context, "STATUS {folder} (UIDNEXT) did not return UIDNEXT"); // Go to IDLE anyway if STATUS is broken. } - if let Ok(info) = idle_interrupt_receiver.try_recv() { - info!(context, "skip idle, got interrupt {:?}", info); - return Ok((self, info)); + if let Ok(()) = idle_interrupt_receiver.try_recv() { + info!(context, "skip idle, got interrupt"); + return Ok(self); } let mut handle = self.inner.idle(); @@ -81,17 +71,17 @@ impl Session { enum Event { IdleResponse(IdleResponse), - Interrupt(InterruptInfo), + Interrupt, } info!(context, "{folder}: Idle entering wait-on-remote state"); let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(async { - let info = idle_interrupt_receiver.recv().await; + idle_interrupt_receiver.recv().await.ok(); // cancel imap idle connection properly drop(interrupt); - Ok(Event::Interrupt(info.unwrap_or_default())) + Ok(Event::Interrupt) }); match fut.await { @@ -104,9 +94,8 @@ impl Session { Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { info!(context, "{folder}: Idle wait was interrupted manually"); } - Ok(Event::Interrupt(i)) => { - info!(context, "{folder}: Idle wait was interrupted: {:?}", &i); - info = i; + Ok(Event::Interrupt) => { + info!(context, "{folder}: Idle wait was interrupted"); } Err(err) => { warn!(context, "{folder}: Idle wait errored: {err:?}"); @@ -120,7 +109,7 @@ impl Session { session.as_mut().set_read_timeout(Some(IMAP_TIMEOUT)); self.inner = session; - Ok((self, info)) + Ok(self) } } @@ -130,7 +119,7 @@ impl Imap { context: &Context, watch_folder: Option, folder_meaning: FolderMeaning, - ) -> InterruptInfo { + ) { // Idle using polling. This is also needed if we're not yet configured - // in this case, we're waiting for a configure job (and an interrupt). @@ -141,11 +130,8 @@ impl Imap { watch_folder } else { info!(context, "IMAP-fake-IDLE: no folder, waiting for interrupt"); - return self - .idle_interrupt_receiver - .recv() - .await - .unwrap_or_default(); + self.idle_interrupt_receiver.recv().await.ok(); + return; }; info!(context, "IMAP-fake-IDLEing folder={:?}", watch_folder); @@ -155,10 +141,10 @@ impl Imap { enum Event { Tick, - Interrupt(InterruptInfo), + Interrupt, } // loop until we are interrupted or if we fetched something - let info = loop { + loop { use futures::future::FutureExt; match interval .tick() @@ -166,7 +152,7 @@ impl Imap { .race( self.idle_interrupt_receiver .recv() - .map(|probe_network| Event::Interrupt(probe_network.unwrap_or_default())), + .map(|_| Event::Interrupt), ) .await { @@ -188,7 +174,7 @@ impl Imap { .unwrap_or_default() { // we only fake-idled because network was gone during IDLE, probably - break InterruptInfo::new(false); + break; } } info!(context, "fake_idle is connected"); @@ -203,7 +189,7 @@ impl Imap { Ok(res) => { info!(context, "fetch_new_messages returned {:?}", res); if res { - break InterruptInfo::new(false); + break; } } Err(err) => { @@ -212,13 +198,12 @@ impl Imap { } } } - Event::Interrupt(info) => { - // Interrupt + Event::Interrupt => { info!(context, "Fake IDLE interrupted"); - break info; + break; } } - }; + } info!( context, @@ -229,7 +214,5 @@ impl Imap { .as_millis() as f64 / 1000., ); - - info } } diff --git a/src/imap/scan_folders.rs b/src/imap/scan_folders.rs index 8b63098b8e..d3e3a1d8b0 100644 --- a/src/imap/scan_folders.rs +++ b/src/imap/scan_folders.rs @@ -10,8 +10,8 @@ use crate::log::LogExt; use crate::{context::Context, imap::FolderMeaning}; impl Imap { - /// Returns true if folders were scanned, false if scanning was postponed. - pub(crate) async fn scan_folders(&mut self, context: &Context) -> Result { + /// Scans not watched IMAP folders for new messages. + pub(crate) async fn scan_folders(&mut self, context: &Context) -> Result<()> { // First of all, debounce to once per minute: let mut last_scan = context.last_full_folder_scan.lock().await; if let Some(last_scan) = *last_scan { @@ -21,7 +21,7 @@ impl Imap { .await?; if elapsed_secs < debounce_secs { - return Ok(false); + return Ok(()); } } info!(context, "Starting full folder scan"); @@ -94,7 +94,7 @@ impl Imap { } last_scan.replace(Instant::now()); - Ok(true) + Ok(()) } /// Returns the names of all folders on the IMAP server. diff --git a/src/message.rs b/src/message.rs index 295f38a9d0..d16c53e727 100644 --- a/src/message.rs +++ b/src/message.rs @@ -24,7 +24,6 @@ use crate::mimeparser::{parse_message_id, SystemMessage}; use crate::param::{Param, Params}; use crate::pgp::split_armored_data; use crate::reaction::get_msg_reactions; -use crate::scheduler::InterruptInfo; use crate::sql; use crate::summary::Summary; use crate::tools::{ @@ -1527,10 +1526,7 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { } // Interrupt Inbox loop to start message deletion and run housekeeping. - context - .scheduler - .interrupt_inbox(InterruptInfo::new(false)) - .await; + context.scheduler.interrupt_inbox().await; Ok(()) } @@ -1648,10 +1644,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> ) .await .context("failed to insert into smtp_mdns")?; - context - .scheduler - .interrupt_smtp(InterruptInfo::new(false)) - .await; + context.scheduler.interrupt_smtp().await; } } updated_chat_ids.insert(curr_chat_id); diff --git a/src/scheduler.rs b/src/scheduler.rs index 0d80f42360..64798ed70e 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -233,17 +233,17 @@ impl SchedulerState { connectivity::maybe_network_lost(context, stores).await; } - pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { + pub(crate) async fn interrupt_inbox(&self) { let inner = self.inner.read().await; if let InnerSchedulerState::Started(ref scheduler) = *inner { - scheduler.interrupt_inbox(info); + scheduler.interrupt_inbox(); } } - pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { + pub(crate) async fn interrupt_smtp(&self) { let inner = self.inner.read().await; if let InnerSchedulerState::Started(ref scheduler) = *inner { - scheduler.interrupt_smtp(info); + scheduler.interrupt_smtp(); } } @@ -463,18 +463,15 @@ async fn inbox_loop( /// handling all the errors. In case of an error, it is logged, but not propagated upwards. If /// critical operation fails such as fetching new messages fails, connection is reset via /// `trigger_reconnect`, so a fresh one can be opened. -async fn fetch_idle( - ctx: &Context, - connection: &mut Imap, - folder_meaning: FolderMeaning, -) -> InterruptInfo { +async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: FolderMeaning) { let folder_config = match folder_meaning.to_config() { Some(c) => c, None => { error!(ctx, "Bad folder meaning: {}", folder_meaning); - return connection + connection .fake_idle(ctx, None, FolderMeaning::Unknown) .await; + return; } }; let folder = match ctx.get_config(folder_config).await { @@ -484,9 +481,10 @@ async fn fetch_idle( ctx, "Can not watch {} folder, failed to retrieve config: {:#}", folder_config, err ); - return connection + connection .fake_idle(ctx, None, FolderMeaning::Unknown) .await; + return; } }; @@ -495,9 +493,10 @@ async fn fetch_idle( } else { connection.connectivity.set_not_configured(ctx).await; info!(ctx, "Can not watch {} folder, not set", folder_config); - return connection + connection .fake_idle(ctx, None, FolderMeaning::Unknown) .await; + return; }; // connect and fake idle if unable to connect @@ -508,9 +507,10 @@ async fn fetch_idle( { warn!(ctx, "{:#}", err); connection.trigger_reconnect(ctx); - return connection + connection .fake_idle(ctx, Some(watch_folder), folder_meaning) .await; + return; } if folder_config == Config::ConfiguredInboxFolder { @@ -534,7 +534,7 @@ async fn fetch_idle( { connection.trigger_reconnect(ctx); warn!(ctx, "{:#}", err); - return InterruptInfo::new(false); + return; } // Mark expired messages for deletion. Marked messages will be deleted from the server @@ -553,30 +553,10 @@ async fn fetch_idle( // be able to scan all folders before time is up if there are many of them. if folder_config == Config::ConfiguredInboxFolder { // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages - match connection.scan_folders(ctx).await.context("scan_folders") { - Err(err) => { - // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing - // but maybe just one folder can't be selected or something - warn!(ctx, "{:#}", err); - } - Ok(true) => { - // Fetch the watched folder again in case scanning other folder moved messages - // there. - // - // In most cases this will select the watched folder and return because there are - // no new messages. We want to select the watched folder anyway before going IDLE - // there, so this does not take additional protocol round-trip. - if let Err(err) = connection - .fetch_move_delete(ctx, &watch_folder, folder_meaning) - .await - .context("fetch_move_delete after scan_folders") - { - connection.trigger_reconnect(ctx); - warn!(ctx, "{:#}", err); - return InterruptInfo::new(false); - } - } - Ok(false) => {} + if let Err(err) = connection.scan_folders(ctx).await.context("scan_folders") { + // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing + // but maybe just one folder can't be selected or something + warn!(ctx, "{:#}", err); } } @@ -591,55 +571,56 @@ async fn fetch_idle( connection.connectivity.set_connected(ctx).await; ctx.emit_event(EventType::ImapInboxIdle); - if let Some(session) = connection.session.take() { - if !session.can_idle() { - info!( - ctx, - "IMAP session does not support IDLE, going to fake idle." - ); - return connection - .fake_idle(ctx, Some(watch_folder), folder_meaning) - .await; - } + let Some(session) = connection.session.take() else { + warn!(ctx, "No IMAP session, going to fake idle."); + connection + .fake_idle(ctx, Some(watch_folder), folder_meaning) + .await; + return; + }; - if ctx - .get_config_bool(Config::DisableIdle) - .await - .context("Failed to get disable_idle config") - .log_err(ctx) - .unwrap_or_default() - { - info!(ctx, "IMAP IDLE is disabled, going to fake idle."); - return connection - .fake_idle(ctx, Some(watch_folder), folder_meaning) - .await; - } + if !session.can_idle() { + info!( + ctx, + "IMAP session does not support IDLE, going to fake idle." + ); + connection + .fake_idle(ctx, Some(watch_folder), folder_meaning) + .await; + return; + } - info!(ctx, "IMAP session supports IDLE, using it."); - match session - .idle( - ctx, - connection.idle_interrupt_receiver.clone(), - &watch_folder, - ) - .await - .context("idle") - { - Ok((session, info)) => { - connection.session = Some(session); - info - } - Err(err) => { - connection.trigger_reconnect(ctx); - warn!(ctx, "{:#}", err); - InterruptInfo::new(false) - } - } - } else { - warn!(ctx, "No IMAP session, going to fake idle."); + if ctx + .get_config_bool(Config::DisableIdle) + .await + .context("Failed to get disable_idle config") + .log_err(ctx) + .unwrap_or_default() + { + info!(ctx, "IMAP IDLE is disabled, going to fake idle."); connection .fake_idle(ctx, Some(watch_folder), folder_meaning) - .await + .await; + return; + } + + info!(ctx, "IMAP session supports IDLE, using it."); + match session + .idle( + ctx, + connection.idle_interrupt_receiver.clone(), + &watch_folder, + ) + .await + .context("idle") + { + Ok(session) => { + connection.session = Some(session); + } + Err(err) => { + connection.trigger_reconnect(ctx); + warn!(ctx, "{:#}", err); + } } } @@ -860,24 +841,24 @@ impl Scheduler { fn maybe_network(&self) { for b in self.boxes() { - b.conn_state.interrupt(InterruptInfo::new(true)); + b.conn_state.interrupt(); } - self.interrupt_smtp(InterruptInfo::new(true)); + self.interrupt_smtp(); } fn maybe_network_lost(&self) { for b in self.boxes() { - b.conn_state.interrupt(InterruptInfo::new(false)); + b.conn_state.interrupt(); } - self.interrupt_smtp(InterruptInfo::new(false)); + self.interrupt_smtp(); } - fn interrupt_inbox(&self, info: InterruptInfo) { - self.inbox.conn_state.interrupt(info); + fn interrupt_inbox(&self) { + self.inbox.conn_state.interrupt(); } - fn interrupt_smtp(&self, info: InterruptInfo) { - self.smtp.interrupt(info); + fn interrupt_smtp(&self) { + self.smtp.interrupt(); } fn interrupt_ephemeral_task(&self) { @@ -927,7 +908,7 @@ struct ConnectionState { /// Channel to interrupt the whole connection. stop_sender: Sender<()>, /// Channel to interrupt idle. - idle_interrupt_sender: Sender, + idle_interrupt_sender: Sender<()>, /// Mutex to pass connectivity info between IMAP/SMTP threads and the API connectivity: ConnectivityStore, } @@ -943,9 +924,9 @@ impl ConnectionState { Ok(()) } - fn interrupt(&self, info: InterruptInfo) { + fn interrupt(&self) { // Use try_send to avoid blocking on interrupts. - self.idle_interrupt_sender.try_send(info).ok(); + self.idle_interrupt_sender.try_send(()).ok(); } } @@ -977,8 +958,8 @@ impl SmtpConnectionState { } /// Interrupt any form of idle. - fn interrupt(&self, info: InterruptInfo) { - self.state.interrupt(info); + fn interrupt(&self) { + self.state.interrupt(); } /// Shutdown this connection completely. @@ -991,7 +972,7 @@ impl SmtpConnectionState { struct SmtpConnectionHandlers { connection: Smtp, stop_receiver: Receiver<()>, - idle_interrupt_receiver: Receiver, + idle_interrupt_receiver: Receiver<()>, } #[derive(Debug)] @@ -1022,8 +1003,8 @@ impl ImapConnectionState { } /// Interrupt any form of idle. - fn interrupt(&self, info: InterruptInfo) { - self.state.interrupt(info); + fn interrupt(&self) { + self.state.interrupt(); } /// Shutdown this connection completely. @@ -1038,14 +1019,3 @@ struct ImapConnectionHandlers { connection: Imap, stop_receiver: Receiver<()>, } - -#[derive(Default, Debug)] -pub struct InterruptInfo { - pub probe_network: bool, -} - -impl InterruptInfo { - pub fn new(probe_network: bool) -> Self { - Self { probe_network } - } -} diff --git a/src/webxdc.rs b/src/webxdc.rs index 107ae1680d..bb6b1b466a 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -37,7 +37,6 @@ use crate::mimefactory::wrapped_base64_encode; use crate::mimeparser::SystemMessage; use crate::param::Param; use crate::param::Params; -use crate::scheduler::InterruptInfo; use crate::tools::strip_rtlo_characters; use crate::tools::{create_smeared_timestamp, get_abs_path}; @@ -485,9 +484,7 @@ impl Context { DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr", (instance.id, status_update_serial, status_update_serial, descr), ).await?; - self.scheduler - .interrupt_smtp(InterruptInfo::new(false)) - .await; + self.scheduler.interrupt_smtp().await; } Ok(()) }