Skip to content

Allow pausing IO scheduler from inside core #4138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 19, 2023
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
- Support non-persistent configuration with DELTACHAT_* env
- Print deltachat-repl errors with causes. #4166
- Increase MSRV to 1.64. #4167
- Core takes care of stopping and re-starting IO itself where needed,
e.g. during backup creation. It is no longer needed to call
dc_stop_io(). dc_start_io() can now be called at any time without
harm. #4138

### Fixes
- Fix segmentation fault if `dc_context_unref()` is called during
Expand Down
3 changes: 1 addition & 2 deletions deltachat-ffi/deltachat.h
Original file line number Diff line number Diff line change
Expand Up @@ -2100,8 +2100,7 @@ dc_contact_t* dc_get_contact (dc_context_t* context, uint32_t co

/**
* Import/export things.
* During backup import/export IO must not be started,
* if needed stop IO using dc_accounts_stop_io() or dc_stop_io() first.
*
* What to do is defined by the _what_ parameter which may be one of the following:
*
* - **DC_IMEX_EXPORT_BACKUP** (11) - Export a backup to the directory given as `param1`
Expand Down
7 changes: 2 additions & 5 deletions deltachat-jsonrpc/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,16 +1325,13 @@ impl CommandApi {
passphrase: Option<String>,
) -> Result<()> {
let ctx = self.get_context(account_id).await?;
ctx.stop_io().await;
let result = imex::imex(
imex::imex(
&ctx,
imex::ImexMode::ExportBackup,
destination.as_ref(),
passphrase,
)
.await;
ctx.start_io().await;
result
.await
}

async fn import_backup(
Expand Down
4 changes: 2 additions & 2 deletions src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,14 @@ impl Accounts {
/// Notifies all accounts that the network may have become available.
pub async fn maybe_network(&self) {
for account in self.accounts.values() {
account.maybe_network().await;
account.scheduler.maybe_network().await;
}
}

/// Notifies all accounts that the network connection may have been lost.
pub async fn maybe_network_lost(&self) {
for account in self.accounts.values() {
account.maybe_network_lost().await;
account.scheduler.maybe_network_lost(account).await;
}
}

Expand Down
22 changes: 17 additions & 5 deletions src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,10 @@ impl ChatId {
context.emit_msgs_changed_without_ids();

context.set_config(Config::LastHousekeeping, None).await?;
context.interrupt_inbox(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;

if chat.is_self_talk() {
let mut msg = Message::new(Viewtype::Text);
Expand Down Expand Up @@ -1667,7 +1670,7 @@ impl Chat {

maybe_set_logging_xdc(context, msg, self.id).await?;
}
context.interrupt_ephemeral_task().await;
context.scheduler.interrupt_ephemeral_task().await;
Ok(msg.id)
}
}
Expand Down Expand Up @@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -
context.emit_event(EventType::LocationChanged(Some(ContactId::SELF)));
}

context.interrupt_smtp(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}

Ok(msg.id)
Expand Down Expand Up @@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
.await?;
curr_timestamp += 1;
if create_send_msg_job(context, new_msg_id).await?.is_some() {
context.interrupt_smtp(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}
}
created_chats.push(chat_id);
Expand Down Expand Up @@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
msg_id: msg.id,
});
if create_send_msg_job(context, msg.id).await?.is_some() {
context.interrupt_smtp(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl Context {
Config::DeleteDeviceAfter => {
let ret = self.sql.set_raw_config(key.as_ref(), value).await;
// Interrupt ephemeral loop to delete old messages immediately.
self.interrupt_ephemeral_task().await;
self.scheduler.interrupt_ephemeral_task().await;
ret?
}
Config::Displayname => {
Expand Down
6 changes: 4 additions & 2 deletions src/configure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Context {
/// Configures this account with the currently set parameters.
pub async fn configure(&self) -> Result<()> {
ensure!(
self.scheduler.read().await.is_none(),
!self.scheduler.is_running().await,
"cannot configure, already running"
);
ensure!(
Expand Down Expand Up @@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {

ctx.set_config_bool(Config::FetchedExistingMsgs, false)
.await?;
ctx.interrupt_inbox(InterruptInfo::new(false)).await;
ctx.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;

progress!(ctx, 940);
update_device_chats_handle.await??;
Expand Down
5 changes: 4 additions & 1 deletion src/contact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen(
> 0
&& timestamp > time() - SEEN_RECENTLY_SECONDS
{
context.interrupt_recently_seen(contact_id, timestamp).await;
context
.scheduler
.interrupt_recently_seen(contact_id, timestamp)
.await;
}
Ok(())
}
Expand Down
41 changes: 11 additions & 30 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey};
use crate::login_param::LoginParam;
use crate::message::{self, MessageState, MsgId};
use crate::quota::QuotaInfo;
use crate::scheduler::Scheduler;
use crate::scheduler::SchedulerState;
use crate::sql::Sql;
use crate::stock_str::StockStrings;
use crate::timesmearing::SmearedTimestamp;
Expand Down Expand Up @@ -201,7 +201,7 @@ pub struct InnerContext {
pub(crate) translated_stockstrings: StockStrings,
pub(crate) events: Events,

pub(crate) scheduler: RwLock<Option<Scheduler>>,
pub(crate) scheduler: SchedulerState,
pub(crate) ratelimit: RwLock<Ratelimit>,

/// Recently loaded quota information, if any.
Expand Down Expand Up @@ -370,7 +370,7 @@ impl Context {
wrong_pw_warning_mutex: Mutex::new(()),
translated_stockstrings: stockstrings,
events,
scheduler: RwLock::new(None),
scheduler: SchedulerState::new(),
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds.
quota: RwLock::new(None),
quota_update_request: AtomicBool::new(false),
Expand All @@ -395,42 +395,23 @@ impl Context {
warn!(self, "can not start io on a context that is not configured");
return;
}

info!(self, "starting IO");
let mut lock = self.inner.scheduler.write().await;
if lock.is_none() {
match Scheduler::start(self.clone()).await {
Err(err) => error!(self, "Failed to start IO: {:#}", err),
Ok(scheduler) => *lock = Some(scheduler),
}
}
self.scheduler.start(self.clone()).await;
}

/// Stops the IO scheduler.
pub async fn stop_io(&self) {
// Sending an event wakes up event pollers (get_next_event)
// so the caller of stop_io() can arrange for proper termination.
// For this, the caller needs to instruct the event poller
// to terminate on receiving the next event and then call stop_io()
// which will emit the below event(s)
info!(self, "stopping IO");
if let Some(debug_logging) = self.debug_logging.read().await.as_ref() {
debug_logging.loop_handle.abort();
}
if let Some(scheduler) = self.inner.scheduler.write().await.take() {
scheduler.stop(self).await;
}
self.scheduler.stop(self).await;
}

/// Restarts the IO scheduler if it was running before
/// when it is not running this is an no-op
pub async fn restart_io_if_running(&self) {
info!(self, "restarting IO");
let is_running = { self.inner.scheduler.read().await.is_some() };
if is_running {
self.stop_io().await;
self.start_io().await;
}
self.scheduler.restart(self).await;
}

/// Indicate that the network likely has come back.
pub async fn maybe_network(&self) {
self.scheduler.maybe_network().await;
}

/// Returns a reference to the underlying SQL instance.
Expand Down
4 changes: 2 additions & 2 deletions src/ephemeral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl MsgId {
paramsv![ephemeral_timestamp, ephemeral_timestamp, self],
)
.await?;
context.interrupt_ephemeral_task().await;
context.scheduler.interrupt_ephemeral_task().await;
}
Ok(())
}
Expand Down Expand Up @@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids(
)
.await?;
if count > 0 {
context.interrupt_ephemeral_task().await;
context.scheduler.interrupt_ephemeral_task().await;
}
Ok(())
}
Expand Down
7 changes: 5 additions & 2 deletions src/imap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ impl Imap {
// Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are
// fetched while the per-chat ephemeral timers start as soon as the messages are marked
// as noticed.
context.interrupt_ephemeral_task().await;
context.scheduler.interrupt_ephemeral_task().await;
}

let session = self
Expand Down Expand Up @@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str)
paramsv![message_id],
)
.await?;
context.interrupt_inbox(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;

Ok(())
}
Expand Down
22 changes: 13 additions & 9 deletions src/imex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ pub async fn imex(
) -> Result<()> {
let cancel = context.alloc_ongoing().await?;

let res = imex_inner(context, what, path, passphrase)
.race(async {
cancel.recv().await.ok();
Err(format_err!("canceled"))
})
.await;

let res = {
let mut guard = context.scheduler.pause(context).await;
let res = imex_inner(context, what, path, passphrase)
.race(async {
cancel.recv().await.ok();
Err(format_err!("canceled"))
})
.await;
guard.resume().await;
res
};
context.free_ongoing().await;

if let Err(err) = res.as_ref() {
Expand Down Expand Up @@ -413,7 +417,7 @@ async fn import_backup(
"Cannot import backups to accounts in use."
);
ensure!(
context.scheduler.read().await.is_none(),
!context.scheduler.is_running().await,
"cannot import backup, IO is running"
);

Expand Down Expand Up @@ -523,7 +527,7 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
sql::housekeeping(context).await.ok_or_log(context);

ensure!(
context.scheduler.read().await.is_none(),
!context.scheduler.is_running().await,
"cannot export backup, IO is running"
);

Expand Down
6 changes: 5 additions & 1 deletion src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 {
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
context.resync_request.store(true, Ordering::Relaxed);
context
.scheduler
.interrupt_inbox(InterruptInfo {
probe_network: false,
})
Expand All @@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
job.save(context).await.context("failed to save job")?;

info!(context, "interrupt: imap");
context.interrupt_inbox(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub async fn send_locations_to_chat(
}
context.emit_event(EventType::ChatModified(chat_id));
if 0 != seconds {
context.interrupt_location().await;
context.scheduler.interrupt_location().await;
}
Ok(())
}
Expand Down
10 changes: 8 additions & 2 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
}

// Interrupt Inbox loop to start message deletion and run housekeeping.
context.interrupt_inbox(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
Ok(())
}

Expand Down Expand Up @@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
)
.await
.context("failed to insert into smtp_mdns")?;
context.interrupt_smtp(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}
}
updated_chat_ids.insert(curr_chat_id);
Expand Down
4 changes: 3 additions & 1 deletion src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ impl Context {
let requested = self.quota_update_request.swap(true, Ordering::Relaxed);
if !requested {
// Quota update was not requested before.
self.interrupt_inbox(InterruptInfo::new(false)).await;
self.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
}
Ok(())
}
Expand Down
Loading