Skip to content

Commit fe684f8

Browse files
handle panic under async
1 parent d453c7d commit fe684f8

File tree

1 file changed

+15
-11
lines changed

1 file changed

+15
-11
lines changed

src/sync.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818

1919
use chrono::{TimeDelta, Timelike};
20+
use futures::FutureExt;
2021
use std::collections::HashMap;
2122
use std::future::Future;
2223
use std::panic::AssertUnwindSafe;
@@ -119,7 +120,7 @@ pub fn object_store_sync() -> (
119120
info!("Object store sync task started");
120121
let mut inbox_rx = inbox_rx;
121122

122-
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
123+
let result = tokio::spawn(async move {
123124
let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL);
124125

125126
loop {
@@ -153,11 +154,13 @@ pub fn object_store_sync() -> (
153154
}
154155
}
155156
}
156-
}));
157+
});
157158

158-
match result {
159-
Ok(future) => {
160-
future.await;
159+
match AssertUnwindSafe(result).catch_unwind().await {
160+
Ok(join_result) => {
161+
if let Err(join_err) = join_result {
162+
error!("Panic in object store sync task: {join_err:?}");
163+
}
161164
}
162165
Err(panic_error) => {
163166
error!("Panic in object store sync task: {panic_error:?}");
@@ -184,12 +187,11 @@ pub fn local_sync() -> (
184187
info!("Local sync task started");
185188
let mut inbox_rx = inbox_rx;
186189

187-
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
190+
let result = tokio::spawn(async move {
188191
let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL);
189192

190193
loop {
191194
select! {
192-
// Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds
193195
_ = sync_interval.tick() => {
194196
// Monitor the duration of flush_and_convert execution
195197
monitor_task_duration(
@@ -217,11 +219,13 @@ pub fn local_sync() -> (
217219
}
218220
}
219221
}
220-
}));
222+
});
221223

222-
match result {
223-
Ok(future) => {
224-
future.await;
224+
match AssertUnwindSafe(result).catch_unwind().await {
225+
Ok(join_result) => {
226+
if let Err(join_err) = join_result {
227+
error!("Panic in local sync task: {join_err:?}");
228+
}
225229
}
226230
Err(panic_error) => {
227231
error!("Panic in local sync task: {panic_error:?}");

0 commit comments

Comments
 (0)