Skip to content

fix: multiple fixes around system stability under load #1346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use actix_web::{
use http::StatusCode;
use once_cell::sync::Lazy;
use tokio::{sync::Mutex, task::JoinSet};
use tracing::{error, info, warn};
use tracing::{error, info};

use crate::parseable::PARSEABLE;
use crate::{parseable::PARSEABLE, storage::object_storage::sync_all_streams};

// Create a global variable to store signal status
static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));
pub static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));

pub async fn liveness() -> HttpResponse {
HttpResponse::new(StatusCode::OK)
Expand All @@ -60,28 +60,33 @@ pub async fn shutdown() {
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
*shutdown_flag = true;

let mut joinset = JoinSet::new();
//sleep for 5 secs to allow any ongoing requests to finish
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let mut local_sync_joinset = JoinSet::new();

// Sync staging
PARSEABLE.streams.flush_and_convert(&mut joinset, true);
PARSEABLE
.streams
.flush_and_convert(&mut local_sync_joinset, false, true);

while let Some(res) = joinset.join_next().await {
while let Some(res) = local_sync_joinset.join_next().await {
match res {
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"),
Err(err) => error!("Failed to join async task: {err}"),
}
}

if let Err(e) = PARSEABLE
.storage
.get_object_store()
.upload_files_from_staging()
.await
{
warn!("Failed to sync local data with object store. {:?}", e);
} else {
info!("Successfully synced all data to S3.");
// Sync object store
let mut object_store_joinset = JoinSet::new();
sync_all_streams(&mut object_store_joinset);

while let Some(res) = object_store_joinset.join_next().await {
match res {
Ok(Ok(_)) => info!("Successfully synced all data to S3."),
Ok(Err(err)) => error!("Failed to sync local data with object store. {err:?}"),
Err(err) => error!("Failed to join async task: {err}"),
}
}
}

Expand Down
12 changes: 11 additions & 1 deletion src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio::sync::oneshot;
use tokio::sync::OnceCell;

use crate::handlers::http::modal::NodeType;
use crate::sync::sync_start;
use crate::{
analytics,
handlers::{
Expand Down Expand Up @@ -114,6 +115,13 @@ impl ParseableServer for IngestServer {

migration::run_migration(&PARSEABLE).await?;

// local sync on init
let startup_sync_handle = tokio::spawn(async {
if let Err(e) = sync_start().await {
tracing::warn!("local sync on server start failed: {e}");
}
});

// Run sync on a background thread
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));
Expand All @@ -124,7 +132,9 @@ impl ParseableServer for IngestServer {
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

if let Err(join_err) = startup_sync_handle.await {
tracing::warn!("startup sync task panicked: {join_err}");
}
result
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
use crate::handlers::http::{rbac, role};
use crate::hottier::HotTierManager;
use crate::rbac::role::Action;
use crate::sync::sync_start;
use crate::{analytics, migration, storage, sync};
use actix_web::web::{resource, ServiceConfig};
use actix_web::{web, Scope};
Expand Down Expand Up @@ -126,6 +127,13 @@ impl ParseableServer for QueryServer {
if init_cluster_metrics_schedular().is_ok() {
info!("Cluster metrics scheduler started successfully");
}

// local sync on init
let startup_sync_handle = tokio::spawn(async {
if let Err(e) = sync_start().await {
tracing::warn!("local sync on server start failed: {e}");
}
});
if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.put_internal_stream_hot_tier().await?;
hot_tier_manager.download_from_s3()?;
Expand All @@ -142,7 +150,9 @@ impl ParseableServer for QueryServer {
.await?;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

if let Err(join_err) = startup_sync_handle.await {
tracing::warn!("startup sync task panicked: {join_err}");
}
Ok(result)
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::metrics;
use crate::migration;
use crate::storage;
use crate::sync;
use crate::sync::sync_start;

use actix_web::web;
use actix_web::web::resource;
Expand Down Expand Up @@ -122,6 +123,13 @@ impl ParseableServer for Server {

storage::retention::load_retention_from_global();

// local sync on init
let startup_sync_handle = tokio::spawn(async {
if let Err(e) = sync_start().await {
tracing::warn!("local sync on server start failed: {e}");
}
});

if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.download_from_s3()?;
};
Expand All @@ -142,7 +150,9 @@ impl ParseableServer for Server {
.await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

if let Err(join_err) = startup_sync_handle.await {
tracing::warn!("startup sync task panicked: {join_err}");
}
return result;
}
}
Expand Down
Loading
Loading