diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index d99d62c7d..b4803e124 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -29,12 +29,12 @@ use actix_web::{ use http::StatusCode; use once_cell::sync::Lazy; use tokio::{sync::Mutex, task::JoinSet}; -use tracing::{error, info, warn}; +use tracing::{error, info}; -use crate::parseable::PARSEABLE; +use crate::{parseable::PARSEABLE, storage::object_storage::sync_all_streams}; // Create a global variable to store signal status -static SIGNAL_RECEIVED: Lazy>> = Lazy::new(|| Arc::new(Mutex::new(false))); +pub static SIGNAL_RECEIVED: Lazy>> = Lazy::new(|| Arc::new(Mutex::new(false))); pub async fn liveness() -> HttpResponse { HttpResponse::new(StatusCode::OK) @@ -60,28 +60,33 @@ pub async fn shutdown() { let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; *shutdown_flag = true; - let mut joinset = JoinSet::new(); + //sleep for 5 secs to allow any ongoing requests to finish + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let mut local_sync_joinset = JoinSet::new(); // Sync staging - PARSEABLE.streams.flush_and_convert(&mut joinset, true); + PARSEABLE + .streams + .flush_and_convert(&mut local_sync_joinset, false, true); - while let Some(res) = joinset.join_next().await { + while let Some(res) = local_sync_joinset.join_next().await { match res { Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), - Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), + Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"), Err(err) => error!("Failed to join async task: {err}"), } } - if let Err(e) = PARSEABLE - .storage - .get_object_store() - .upload_files_from_staging() - .await - { - warn!("Failed to sync local data with object store. {:?}", e); - } else { - info!("Successfully synced all data to S3."); + // Sync object store + let mut object_store_joinset = JoinSet::new(); + sync_all_streams(&mut object_store_joinset); + + while let Some(res) = object_store_joinset.join_next().await { + match res { + Ok(Ok(_)) => info!("Successfully synced all data to S3."), + Ok(Err(err)) => error!("Failed to sync local data with object store. {err:?}"), + Err(err) => error!("Failed to join async task: {err}"), + } } } diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index e9751ee13..8a8e1d1b1 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -31,6 +31,7 @@ use tokio::sync::oneshot; use tokio::sync::OnceCell; use crate::handlers::http::modal::NodeType; +use crate::sync::sync_start; use crate::{ analytics, handlers::{ @@ -114,6 +115,13 @@ impl ParseableServer for IngestServer { migration::run_migration(&PARSEABLE).await?; + // local sync on init + let startup_sync_handle = tokio::spawn(async { + if let Err(e) = sync_start().await { + tracing::warn!("local sync on server start failed: {e}"); + } + }); + // Run sync on a background thread let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); @@ -124,7 +132,9 @@ impl ParseableServer for IngestServer { let result = self.start(shutdown_rx, prometheus.clone(), None).await; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - + if let Err(join_err) = startup_sync_handle.await { + tracing::warn!("startup sync task panicked: {join_err}"); + } result } } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index b138a292c..b43fa68a9 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -27,6 +27,7 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; +use crate::sync::sync_start; use crate::{analytics, migration, storage, sync}; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; @@ -126,6 +127,13 @@ impl ParseableServer for QueryServer { if init_cluster_metrics_schedular().is_ok() { info!("Cluster metrics scheduler started successfully"); } + + // local sync on init + let startup_sync_handle = tokio::spawn(async { + if let Err(e) = sync_start().await { + tracing::warn!("local sync on server start failed: {e}"); + } + }); if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.put_internal_stream_hot_tier().await?; hot_tier_manager.download_from_s3()?; @@ -142,7 +150,9 @@ impl ParseableServer for QueryServer { .await?; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - + if let Err(join_err) = startup_sync_handle.await { + tracing::warn!("startup sync task panicked: {join_err}"); + } Ok(result) } } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 467712646..d22e5de02 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -33,6 +33,7 @@ use crate::metrics; use crate::migration; use crate::storage; use crate::sync; +use crate::sync::sync_start; use actix_web::web; use actix_web::web::resource; @@ -122,6 +123,13 @@ impl ParseableServer for Server { storage::retention::load_retention_from_global(); + // local sync on init + let startup_sync_handle = tokio::spawn(async { + if let Err(e) = sync_start().await { + tracing::warn!("local sync on server start failed: {e}"); + } + }); + if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.download_from_s3()?; }; @@ -142,7 +150,9 @@ impl ParseableServer for Server { .await; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - + if let Err(join_err) = startup_sync_handle.await { + tracing::warn!("startup sync task panicked: {join_err}"); + } return result; } } diff --git a/src/otel/traces.rs b/src/otel/traces.rs index d62efdab3..12c6e350f 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -336,7 +336,6 @@ fn flatten_span_record(span_record: &Span) -> Vec> { span_records_json } - #[cfg(test)] mod tests { use super::*; @@ -360,13 +359,21 @@ mod tests { KeyValue { key: "service.name".to_string(), value: Some(AnyValue { - value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue("test-service".to_string())), + value: Some( + opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue( + "test-service".to_string(), + ), + ), }), }, KeyValue { key: "http.method".to_string(), value: Some(AnyValue { - value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue("GET".to_string())), + value: Some( + opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue( + "GET".to_string(), + ), + ), }), }, ] @@ -398,7 +405,8 @@ mod tests { assert_eq!( result.get("span_status_description").unwrap(), &Value::String(expected_description.to_string()), - "Status description should match expected value for code {}", code + "Status description should match expected value for code {}", + code ); assert_eq!( result.get("span_status_message").unwrap(), @@ -432,7 +440,8 @@ mod tests { assert_eq!( result.get("span_kind_description").unwrap(), &Value::String(expected_description.to_string()), - "Span kind description should match expected value for kind {}", kind + "Span kind description should match expected value for kind {}", + kind ); } } @@ -459,7 +468,8 @@ mod tests { assert_eq!( result.get("span_flags_description").unwrap(), &Value::String(expected_description.to_string()), - "Span flags description should match expected value for flags {}", flags + "Span flags description should match expected value for flags {}", + flags ); } } @@ -488,7 +498,10 @@ mod tests { // Check first event let first_event = &result[0]; - assert!(first_event.contains_key("event_time_unix_nano"), "Should contain timestamp"); + assert!( + first_event.contains_key("event_time_unix_nano"), + "Should contain timestamp" + ); assert_eq!( first_event.get("event_name").unwrap(), &Value::String("request.start".to_string()), @@ -499,7 +512,10 @@ mod tests { &Value::Number(2.into()), "Dropped attributes count should be preserved" ); - assert!(first_event.contains_key("service.name"), "Should contain flattened attributes"); + assert!( + first_event.contains_key("service.name"), + "Should contain flattened attributes" + ); // Check second event let second_event = &result[1]; @@ -518,16 +534,14 @@ mod tests { #[test] fn test_flatten_links_structure() { // Test that links are properly flattened with all expected fields - let links = vec![ - Link { - trace_id: sample_trace_id(), - span_id: sample_span_id(), - trace_state: "state1".to_string(), - attributes: sample_attributes(), - dropped_attributes_count: 1, - flags: 0, - }, - ]; + let links = vec![Link { + trace_id: sample_trace_id(), + span_id: sample_span_id(), + trace_state: "state1".to_string(), + attributes: sample_attributes(), + dropped_attributes_count: 1, + flags: 0, + }]; let result = flatten_links(&links); @@ -549,7 +563,10 @@ mod tests { &Value::Number(1.into()), "Dropped attributes count should be preserved" ); - assert!(link.contains_key("service.name"), "Should contain flattened attributes"); + assert!( + link.contains_key("service.name"), + "Should contain flattened attributes" + ); } #[test] @@ -611,12 +628,30 @@ mod tests { &Value::String("SPAN_KIND_SERVER".to_string()), "All records should contain span kind description" ); - assert!(record.contains_key("span_trace_id"), "Should contain trace ID"); - assert!(record.contains_key("span_span_id"), "Should contain span ID"); - assert!(record.contains_key("span_start_time_unix_nano"), "Should contain start time"); - assert!(record.contains_key("span_end_time_unix_nano"), "Should contain end time"); - assert!(record.contains_key("service.name"), "Should contain span attributes"); - assert!(record.contains_key("span_status_code"), "Should contain status"); + assert!( + record.contains_key("span_trace_id"), + "Should contain trace ID" + ); + assert!( + record.contains_key("span_span_id"), + "Should contain span ID" + ); + assert!( + record.contains_key("span_start_time_unix_nano"), + "Should contain start time" + ); + assert!( + record.contains_key("span_end_time_unix_nano"), + "Should contain end time" + ); + assert!( + record.contains_key("service.name"), + "Should contain span attributes" + ); + assert!( + record.contains_key("span_status_code"), + "Should contain status" + ); } // One record should be an event, one should be a link @@ -650,7 +685,11 @@ mod tests { let result = flatten_span_record(&span); - assert_eq!(result.len(), 1, "Should have exactly one record for span without events/links"); + assert_eq!( + result.len(), + 1, + "Should have exactly one record for span without events/links" + ); let record = &result[0]; assert_eq!( @@ -658,9 +697,18 @@ mod tests { &Value::String("simple-span".to_string()), "Should contain span name" ); - assert!(!record.contains_key("event_name"), "Should not contain event fields"); - assert!(!record.contains_key("link_trace_id"), "Should not contain link fields"); - assert!(!record.contains_key("span_status_code"), "Should not contain status when none provided"); + assert!( + !record.contains_key("event_name"), + "Should not contain event fields" + ); + assert!( + !record.contains_key("link_trace_id"), + "Should not contain link fields" + ); + assert!( + !record.contains_key("span_status_code"), + "Should not contain status when none provided" + ); } #[test] @@ -705,10 +753,16 @@ mod tests { assert_eq!(hex_span_id, "12345678", "Span ID should be lowercase hex"); } if let Some(Value::String(hex_parent_span_id)) = record.get("span_parent_span_id") { - assert_eq!(hex_parent_span_id, "87654321", "Parent span ID should be lowercase hex"); + assert_eq!( + hex_parent_span_id, "87654321", + "Parent span ID should be lowercase hex" + ); } if let Some(Value::String(link_trace_id)) = record.get("link_trace_id") { - assert_eq!(link_trace_id, "ffabcdef", "Link trace ID should be lowercase hex"); + assert_eq!( + link_trace_id, "ffabcdef", + "Link trace ID should be lowercase hex" + ); } } } @@ -823,15 +877,36 @@ mod tests { fn test_known_field_list_completeness() { // Test that the OTEL_TRACES_KNOWN_FIELD_LIST contains all expected fields let expected_fields = [ - "scope_name", "scope_version", "scope_schema_url", "scope_dropped_attributes_count", - "resource_schema_url", "resource_dropped_attributes_count", - "span_trace_id", "span_span_id", "span_name", "span_parent_span_id", "name", - "span_kind", "span_kind_description", "span_start_time_unix_nano", "span_end_time_unix_nano", - "event_name", "event_time_unix_nano", "event_dropped_attributes_count", - "link_span_id", "link_trace_id", "link_dropped_attributes_count", - "span_dropped_events_count", "span_dropped_links_count", "span_dropped_attributes_count", - "span_trace_state", "span_flags", "span_flags_description", - "span_status_code", "span_status_description", "span_status_message", + "scope_name", + "scope_version", + "scope_schema_url", + "scope_dropped_attributes_count", + "resource_schema_url", + "resource_dropped_attributes_count", + "span_trace_id", + "span_span_id", + "span_name", + "span_parent_span_id", + "name", + "span_kind", + "span_kind_description", + "span_start_time_unix_nano", + "span_end_time_unix_nano", + "event_name", + "event_time_unix_nano", + "event_dropped_attributes_count", + "link_span_id", + "link_trace_id", + "link_dropped_attributes_count", + "span_dropped_events_count", + "span_dropped_links_count", + "span_dropped_attributes_count", + "span_trace_state", + "span_flags", + "span_flags_description", + "span_status_code", + "span_status_description", + "span_status_message", ]; assert_eq!( diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 372bfa885..777070e75 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -30,7 +30,7 @@ use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; use arrow_schema::Schema; use byteorder::{LittleEndian, ReadBytesExt}; use itertools::kmerge_by; -use tracing::{error, warn}; +use tracing::error; use crate::{ event::DEFAULT_TIMESTAMP_KEY, @@ -48,18 +48,26 @@ impl MergedRecordReader { for file in files { //remove empty files before reading - if file.metadata().unwrap().len() == 0 { - error!("Invalid file detected, removing it: {:?}", file); - remove_file(file).unwrap(); - } else { - let Ok(reader) = - StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) - else { - error!("Invalid file detected, ignoring it: {:?}", file); + match file.metadata() { + Err(err) => { + error!("Error when trying to read file: {file:?}; error = {err}"); continue; - }; - - readers.push(reader); + } + Ok(metadata) if metadata.len() == 0 => { + error!("Empty file detected, removing it: {:?}", file); + remove_file(file).unwrap(); + continue; + } + Ok(_) => { + let Ok(reader) = + StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) + else { + error!("Invalid file detected, ignoring it: {:?}", file); + continue; + }; + + readers.push(reader); + } } } @@ -85,20 +93,22 @@ impl MergedReverseRecordReader { pub fn try_new(file_paths: &[PathBuf]) -> Self { let mut readers = Vec::with_capacity(file_paths.len()); for path in file_paths { - let Ok(file) = File::open(path) else { - warn!("Error when trying to read file: {path:?}"); - continue; - }; - - let reader = match get_reverse_reader(file) { - Ok(r) => r, + match File::open(path) { Err(err) => { - error!("Invalid file detected, ignoring it: {path:?}; error = {err}"); + error!("Error when trying to read file: {path:?}; error = {err}"); continue; } - }; - - readers.push(reader); + Ok(file) => { + let reader = match get_reverse_reader(file) { + Ok(r) => r, + Err(err) => { + error!("Invalid file detected, ignoring it: {path:?}; error = {err}"); + continue; + } + }; + readers.push(reader); + } + } } Self { readers } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..2ed089145 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -19,10 +19,9 @@ use std::{ collections::{HashMap, HashSet}, - fs::{remove_file, write, File, OpenOptions}, + fs::{self, remove_file, write, File, OpenOptions}, num::NonZeroU32, path::{Path, PathBuf}, - process, sync::{Arc, Mutex, RwLock}, time::{Instant, SystemTime, UNIX_EPOCH}, }; @@ -39,7 +38,6 @@ use parquet::{ format::SortingColumn, schema::types::ColumnPath, }; -use rand::distributions::DistString; use relative_path::RelativePathBuf; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; @@ -50,6 +48,7 @@ use crate::{ format::{LogSource, LogSourceEntry}, DEFAULT_TIMESTAMP_KEY, }, + handlers::http::modal::{ingest_server::INGESTOR_META, query_server::QUERIER_META}, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, @@ -67,15 +66,23 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; +const INPROCESS_DIR_PREFIX: &str = "processing_"; + /// Returns the filename for parquet if provided arrows file path is valid as per our expectation -fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { +fn arrow_path_to_parquet( + stream_staging_path: &Path, + path: &Path, + random_string: &str, +) -> Option { let filename = path.file_stem()?.to_str()?; let (_, front) = filename.split_once('.')?; - assert!(front.contains('.'), "contains the delim `.`"); + if !front.contains('.') { + warn!("Skipping unexpected arrow file without `.`: {}", filename); + return None; + } let filename_with_random_number = format!("{front}.{random_string}.parquet"); - let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename_with_random_number); - + let mut parquet_path = stream_staging_path.to_owned(); + parquet_path.push(filename_with_random_number); Some(parquet_path) } @@ -114,7 +121,7 @@ impl Stream { let data_path = options.local_stream_data_path(&stream_name); Arc::new(Self { - stream_name, + stream_name: stream_name.clone(), metadata: RwLock::new(metadata), data_path, options, @@ -132,7 +139,16 @@ impl Stream { custom_partition_values: &HashMap, stream_type: StreamType, ) -> Result<(), StagingError> { - let mut guard = self.writer.lock().unwrap(); + let mut guard = match self.writer.lock() { + Ok(guard) => guard, + Err(poisoned) => { + error!( + "Writer lock poisoned while ingesting data for stream {}", + self.stream_name + ); + poisoned.into_inner() + } + }; if self.options.mode != Mode::Query || stream_type == StreamType::Internal { let filename = self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); @@ -191,17 +207,52 @@ impl Stream { return vec![]; }; - let paths = dir - .flatten() + dir.flatten() .map(|file| file.path()) .filter(|file| { file.extension() .is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION)) }) - .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) - .collect(); + .filter_map(|f| { + let modified = f.metadata().ok().and_then(|m| m.modified().ok()); + modified.map(|modified_time| (f, modified_time)) + }) + .sorted_by_key(|(_, modified_time)| *modified_time) + .map(|(f, _)| f) + .collect() + } + + pub fn inprocess_arrow_files(&self) -> Vec { + let Ok(dir) = self.data_path.read_dir() else { + return vec![]; + }; - paths + //iterate through all the inprocess_ directories and collect all arrow files + dir.filter_map(|entry| { + let path = entry.ok()?.path(); + if path.is_dir() + && path + .file_name()? + .to_str()? + .starts_with(INPROCESS_DIR_PREFIX) + { + Some(path) + } else { + None + } + }) + .flat_map(|dir| { + fs::read_dir(dir) + .ok() + .into_iter() + .flatten() + .filter_map(|entry| entry.ok().map(|file| file.path())) + }) + .filter(|file| { + file.extension() + .is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION)) + }) + .collect::>() } /// Groups arrow files which are to be included in one parquet @@ -212,44 +263,150 @@ impl Stream { pub fn arrow_files_grouped_exclude_time( &self, exclude: SystemTime, + group_minute: u128, + init_signal: bool, shutdown_signal: bool, ) -> HashMap> { - let mut grouped_arrow_file: HashMap> = HashMap::new(); - let mut arrow_files = self.arrow_files(); + let random_string = self.get_node_id_string(); + let inprocess_dir = Self::inprocess_folder(&self.data_path, group_minute); + + let arrow_files = self.fetch_arrow_files_for_conversion(exclude, shutdown_signal); + if !arrow_files.is_empty() { + if let Err(e) = fs::create_dir_all(&inprocess_dir) { + error!("Failed to create inprocess directory: {e}"); + return HashMap::new(); + } + + self.move_arrow_files(arrow_files, &inprocess_dir); + } + if init_signal { + // Group from all inprocess folders + return self.group_inprocess_arrow_files(&random_string); + } + + self.group_single_inprocess_arrow_files(&inprocess_dir, &random_string) + } + + /// Groups arrow files only from the specified inprocess folder + fn group_single_inprocess_arrow_files( + &self, + inprocess_dir: &Path, + random_string: &str, + ) -> HashMap> { + let mut grouped: HashMap> = HashMap::new(); + let Ok(dir) = fs::read_dir(inprocess_dir) else { + return grouped; + }; + for entry in dir.flatten() { + let path = entry.path(); + if path + .extension() + .is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION)) + { + if let Some(parquet_path) = + arrow_path_to_parquet(&self.data_path, &path, random_string) + { + grouped.entry(parquet_path).or_default().push(path); + } else { + warn!("Unexpected arrow file: {}", path.display()); + } + } + } + grouped + } + + /// Returns the node id string for file naming. + fn get_node_id_string(&self) -> String { + match self.options.mode { + Mode::Query => QUERIER_META + .get() + .map(|querier_metadata| querier_metadata.get_node_id()) + .expect("Querier metadata should be set"), + Mode::Ingest => INGESTOR_META + .get() + .map(|ingestor_metadata| ingestor_metadata.get_node_id()) + .expect("Ingestor metadata should be set"), + _ => "000000000000000".to_string(), + } + } + + /// Returns a mapping for inprocess arrow files (init_signal=true). + fn group_inprocess_arrow_files(&self, random_string: &str) -> HashMap> { + let mut grouped: HashMap> = HashMap::new(); + for inprocess_file in self.inprocess_arrow_files() { + if let Some(parquet_path) = + arrow_path_to_parquet(&self.data_path, &inprocess_file, random_string) + { + grouped + .entry(parquet_path) + .or_default() + .push(inprocess_file); + } else { + warn!("Unexpected arrow file: {}", inprocess_file.display()); + } + } + grouped + } - // if the shutdown signal is false i.e. normal condition - // don't keep the ones for the current minute + /// Returns arrow files for conversion, filtering by time and removing invalid files. + fn fetch_arrow_files_for_conversion( + &self, + exclude: SystemTime, + shutdown_signal: bool, + ) -> Vec { + let mut arrow_files = self.arrow_files(); if !shutdown_signal { arrow_files.retain(|path| { let creation = path .metadata() - .expect("Arrow file should exist on disk") - .created() - .expect("Creation time should be accessible"); + .ok() + .and_then(|meta| meta.created().or_else(|_| meta.modified()).ok()) + .expect("Arrow file should have a valid creation or modified time"); + // Compare if creation time is actually from previous minute minute_from_system_time(creation) < minute_from_system_time(exclude) }); } + arrow_files + } - let random_string = - rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); + /// Moves eligible arrow files to the inprocess folder and groups them by parquet path. + fn move_arrow_files(&self, arrow_files: Vec, inprocess_dir: &Path) { for arrow_file_path in arrow_files { - if arrow_file_path.metadata().unwrap().len() == 0 { - error!( - "Invalid arrow file {:?} detected for stream {}, removing it", - &arrow_file_path, self.stream_name - ); - remove_file(&arrow_file_path).unwrap(); - } else if let Some(key) = arrow_path_to_parquet(&arrow_file_path, &random_string) { - grouped_arrow_file - .entry(key) - .or_default() - .push(arrow_file_path); - } else { - warn!("Unexpected arrows file: {}", arrow_file_path.display()); + match arrow_file_path.metadata() { + Ok(meta) if meta.len() == 0 => { + error!( + "Invalid arrow file {:?} detected for stream {}, removing it", + &arrow_file_path, self.stream_name + ); + remove_file(&arrow_file_path).expect("File should be removed"); + } + Ok(_) => { + let new_path = inprocess_dir.join( + arrow_file_path + .file_name() + .expect("Arrow file should have a name"), + ); + if let Err(e) = fs::rename(&arrow_file_path, &new_path) { + error!( + "Failed to rename arrow file to inprocess directory: {} -> {}: {e}", + arrow_file_path.display(), + new_path.display() + ); + } + } + Err(e) => { + warn!( + "Could not get metadata for arrow file {}: {e}", + arrow_file_path.display() + ); + } } } - grouped_arrow_file + } + + fn inprocess_folder(base: &Path, minute: u128) -> PathBuf { + base.join(format!("{INPROCESS_DIR_PREFIX}{minute}")) } pub fn parquet_files(&self) -> Vec { @@ -306,7 +463,11 @@ impl Stream { } /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` - pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { + pub fn prepare_parquet( + &self, + init_signal: bool, + shutdown_signal: bool, + ) -> Result<(), StagingError> { info!( "Starting arrow_conversion job for stream- {}", self.stream_name @@ -321,6 +482,7 @@ impl Stream { .convert_disk_files_to_parquet( time_partition.as_ref(), custom_partition.as_ref(), + init_signal, shutdown_signal, ) .inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?; @@ -339,11 +501,6 @@ impl Stream { let staging_schemas = self.get_schemas_if_present(); if let Some(mut staging_schemas) = staging_schemas { - warn!( - "Found {} schemas in staging for stream- {}", - staging_schemas.len(), - self.stream_name - ); staging_schemas.push(schema); schema = Schema::try_merge(staging_schemas)?; } @@ -367,7 +524,16 @@ impl Stream { } pub fn flush(&self, forced: bool) { - let mut writer = self.writer.lock().unwrap(); + let mut writer = match self.writer.lock() { + Ok(guard) => guard, + Err(poisoned) => { + error!( + "Writer lock poisoned while flushing data for stream {}", + self.stream_name + ); + poisoned.into_inner() + } + }; // Flush memory writer.mem.clear(); // Drop schema -> disk writer mapping, triggers flush to disk @@ -421,106 +587,85 @@ impl Stream { props.set_sorting_columns(Some(sorting_column_vec)).build() } - /// This function reads arrow files, groups their schemas - /// - /// converts them into parquet files and returns a merged schema - pub fn convert_disk_files_to_parquet( - &self, - time_partition: Option<&String>, - custom_partition: Option<&String>, - shutdown_signal: bool, - ) -> Result, StagingError> { - let mut schemas = Vec::new(); - - let now = SystemTime::now(); - let staging_files = self.arrow_files_grouped_exclude_time(now, shutdown_signal); - if staging_files.is_empty() { - metrics::STAGING_FILES - .with_label_values(&[&self.stream_name]) - .set(0); - metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, "arrows"]) - .set(0); - metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, "parquet"]) - .set(0); - } + fn reset_staging_metrics(&self) { + metrics::STAGING_FILES + .with_label_values(&[&self.stream_name]) + .set(0); + metrics::STORAGE_SIZE + .with_label_values(&["staging", &self.stream_name, "arrows"]) + .set(0); + metrics::STORAGE_SIZE + .with_label_values(&["staging", &self.stream_name, "parquet"]) + .set(0); + } - //find sum of arrow files in staging directory for a stream + fn update_staging_metrics(&self, staging_files: &HashMap>) { let total_arrow_files = staging_files.values().map(|v| v.len()).sum::(); metrics::STAGING_FILES .with_label_values(&[&self.stream_name]) .set(total_arrow_files as i64); - //find sum of file sizes of all arrow files in staging_files let total_arrow_files_size = staging_files .values() .map(|v| { v.iter() - .map(|file| file.metadata().unwrap().len()) + .filter_map(|file| file.metadata().ok().map(|meta| meta.len())) .sum::() }) .sum::(); metrics::STORAGE_SIZE .with_label_values(&["staging", &self.stream_name, "arrows"]) .set(total_arrow_files_size as i64); + } + + /// This function reads arrow files, groups their schemas + /// + /// converts them into parquet files and returns a merged schema + pub fn convert_disk_files_to_parquet( + &self, + time_partition: Option<&String>, + custom_partition: Option<&String>, + init_signal: bool, + shutdown_signal: bool, + ) -> Result, StagingError> { + let mut schemas = Vec::new(); + + let now = SystemTime::now(); + let group_minute = minute_from_system_time(now) - 1; + let staging_files = + self.arrow_files_grouped_exclude_time(now, group_minute, init_signal, shutdown_signal); + if staging_files.is_empty() { + self.reset_staging_metrics(); + return Ok(None); + } + + self.update_staging_metrics(&staging_files); - // warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { let record_reader = MergedReverseRecordReader::try_new(&arrow_files); if record_reader.readers.is_empty() { continue; } let merged_schema = record_reader.merged_schema(); - let props = self.parquet_writer_props(&merged_schema, time_partition, custom_partition); schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); - let mut part_path = parquet_path.to_owned(); - part_path.set_extension("part"); - let mut part_file = OpenOptions::new() - .create(true) - .append(true) - .open(&part_path) - .map_err(|_| StagingError::Create)?; - let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?; - for ref record in record_reader.merged_iter(schema, time_partition.cloned()) { - writer.write(record)?; + + let part_path = parquet_path.with_extension("part"); + if !self.write_parquet_part_file( + &part_path, + record_reader, + &schema, + &props, + time_partition, + )? { + continue; } - writer.close()?; - if part_file.metadata().expect("File was just created").len() - < parquet::file::FOOTER_SIZE as u64 - { - error!( - "Invalid parquet file {part_path:?} detected for stream {}, removing it", - &self.stream_name - ); - remove_file(part_path).unwrap(); + if let Err(e) = self.finalize_parquet_file(&part_path, &parquet_path) { + error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"); } else { - trace!("Parquet file successfully constructed"); - if let Err(e) = std::fs::rename(&part_path, &parquet_path) { - error!( - "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}" - ); - } - - for file in arrow_files { - let file_size = match file.metadata() { - Ok(meta) => meta.len(), - Err(err) => { - warn!("File ({}) not found; Error = {err}", file.display()); - continue; - } - }; - if remove_file(&file).is_err() { - error!("Failed to delete file. Unstable state"); - process::abort() - } - metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION]) - .sub(file_size as i64); - } + self.cleanup_arrow_files_and_dir(&arrow_files); } } @@ -531,6 +676,94 @@ impl Stream { Ok(Some(Schema::try_merge(schemas).unwrap())) } + fn write_parquet_part_file( + &self, + part_path: &Path, + record_reader: MergedReverseRecordReader, + schema: &Arc, + props: &WriterProperties, + time_partition: Option<&String>, + ) -> Result { + let mut part_file = OpenOptions::new() + .create(true) + .append(true) + .open(part_path) + .map_err(|_| StagingError::Create)?; + let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?; + for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { + writer.write(record)?; + } + writer.close()?; + + if part_file.metadata().expect("File was just created").len() + < parquet::file::FOOTER_SIZE as u64 + { + error!( + "Invalid parquet file {part_path:?} detected for stream {}, removing it", + &self.stream_name + ); + remove_file(part_path).expect("File should be removable if it is invalid"); + return Ok(false); + } + trace!("Parquet file successfully constructed"); + Ok(true) + } + + fn finalize_parquet_file(&self, part_path: &Path, parquet_path: &Path) -> std::io::Result<()> { + std::fs::rename(part_path, parquet_path) + } + + fn cleanup_arrow_files_and_dir(&self, arrow_files: &[PathBuf]) { + for (i, file) in arrow_files.iter().enumerate() { + match file.metadata() { + Ok(meta) => { + let file_size = meta.len(); + match remove_file(file) { + Ok(_) => { + metrics::STORAGE_SIZE + .with_label_values(&[ + "staging", + &self.stream_name, + ARROW_FILE_EXTENSION, + ]) + .sub(file_size as i64); + } + Err(e) => { + warn!("Failed to delete file {}: {e}", file.display()); + } + } + } + Err(err) => { + warn!("File ({}) not found; Error = {err}", file.display()); + } + } + + // After deleting the last file, try to remove the inprocess directory if empty + if i == arrow_files.len() - 1 { + if let Some(parent_dir) = file.parent() { + match fs::read_dir(parent_dir) { + Ok(mut entries) => { + if entries.next().is_none() { + if let Err(err) = fs::remove_dir(parent_dir) { + warn!( + "Failed to remove inprocess directory {}: {err}", + parent_dir.display() + ); + } + } + } + Err(err) => { + warn!( + "Failed to read inprocess directory {}: {err}", + parent_dir.display() + ); + } + } + } + } + } + } + pub fn updated_schema(&self, current_schema: Schema) -> Schema { let staging_files = self.arrow_files(); let record_reader = MergedRecordReader::try_new(&staging_files).unwrap(); @@ -725,7 +958,11 @@ impl Stream { } /// First flushes arrows onto disk and then converts the arrow into parquet - pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> { + pub fn flush_and_convert( + &self, + init_signal: bool, + shutdown_signal: bool, + ) -> Result<(), StagingError> { let start_flush = Instant::now(); self.flush(shutdown_signal); trace!( @@ -735,7 +972,7 @@ impl Stream { ); let start_convert = Instant::now(); - self.prepare_parquet(shutdown_signal)?; + self.prepare_parquet(init_signal, shutdown_signal)?; trace!( "Converting arrows to parquet on stream ({}) took: {}s", self.stream_name, @@ -820,6 +1057,7 @@ impl Streams { pub fn flush_and_convert( &self, joinset: &mut JoinSet>, + init_signal: bool, shutdown_signal: bool, ) { let streams: Vec> = self @@ -829,7 +1067,7 @@ impl Streams { .map(Arc::clone) .collect(); for stream in streams { - joinset.spawn(async move { stream.flush_and_convert(shutdown_signal) }); + joinset.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal) }); } } } @@ -1018,7 +1256,7 @@ mod tests { LogStreamMetadata::default(), None, ) - .convert_disk_files_to_parquet(None, None, false)?; + .convert_disk_files_to_parquet(None, None, false, false)?; assert!(result.is_none()); // Verify metrics were set to 0 let staging_files = metrics::STAGING_FILES.with_label_values(&[&stream]).get(); @@ -1097,7 +1335,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, true) + .convert_disk_files_to_parquet(None, None, false, true) .unwrap(); assert!(result.is_some()); @@ -1146,7 +1384,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, true) + .convert_disk_files_to_parquet(None, None, false, true) .unwrap(); assert!(result.is_some()); @@ -1200,7 +1438,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, false) + .convert_disk_files_to_parquet(None, None, false, false) .unwrap(); assert!(result.is_some()); @@ -1228,7 +1466,7 @@ mod tests { let file_path = create_test_file(&temp_dir, filename); let random_string = "random123"; - let result = arrow_path_to_parquet(&file_path, random_string); + let result = arrow_path_to_parquet(&file_path, &file_path, random_string); assert!(result.is_some()); let parquet_path = result.unwrap(); @@ -1253,7 +1491,7 @@ mod tests { let random_string = "random456"; - let result = arrow_path_to_parquet(&file_path, random_string); + let result = arrow_path_to_parquet(&file_path, &file_path, random_string); assert!(result.is_some()); let parquet_path = result.unwrap(); diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 49da6b8fd..00e262631 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -38,6 +38,7 @@ use object_store::ObjectMeta; use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; +use tokio::task::JoinSet; use tracing::info; use tracing::{error, warn}; use ulid::Ulid; @@ -798,79 +799,75 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(correlation_bytes) } - async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> { + async fn upload_files_from_staging(&self, stream_name: &str) -> Result<(), ObjectStorageError> { if !PARSEABLE.options.staging_dir().exists() { return Ok(()); } + info!("Starting object_store_sync for stream- {stream_name}"); + + let stream = PARSEABLE.get_or_create_stream(stream_name); + let custom_partition = stream.get_custom_partition(); + for path in stream.parquet_files() { + let filename = path + .file_name() + .expect("only parquet files are returned by iterator") + .to_str() + .expect("filename is valid string"); + + let mut file_suffix = str::replacen(filename, ".", "/", 3); + + let custom_partition_clone = custom_partition.clone(); + if custom_partition_clone.is_some() { + let custom_partition_fields = custom_partition_clone.unwrap(); + let custom_partition_list = + custom_partition_fields.split(',').collect::>(); + file_suffix = str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + } - // get all streams - for stream_name in PARSEABLE.streams.list() { - info!("Starting object_store_sync for stream- {stream_name}"); - - let stream = PARSEABLE.get_or_create_stream(&stream_name); - let custom_partition = stream.get_custom_partition(); - for path in stream.parquet_files() { - let filename = path - .file_name() - .expect("only parquet files are returned by iterator") - .to_str() - .expect("filename is valid string"); - - let mut file_date_part = filename.split('.').collect::>()[0]; - file_date_part = file_date_part.split('=').collect::>()[1]; - let compressed_size = path.metadata().map_or(0, |meta| meta.len()); - STORAGE_SIZE - .with_label_values(&["data", &stream_name, "parquet"]) - .add(compressed_size as i64); - EVENTS_STORAGE_SIZE_DATE - .with_label_values(&["data", &stream_name, "parquet", file_date_part]) - .add(compressed_size as i64); - LIFETIME_EVENTS_STORAGE_SIZE - .with_label_values(&["data", &stream_name, "parquet"]) - .add(compressed_size as i64); - let mut file_suffix = str::replacen(filename, ".", "/", 3); - - let custom_partition_clone = custom_partition.clone(); - if custom_partition_clone.is_some() { - let custom_partition_fields = custom_partition_clone.unwrap(); - let custom_partition_list = - custom_partition_fields.split(',').collect::>(); - file_suffix = - str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); - } - - let stream_relative_path = format!("{stream_name}/{file_suffix}"); - - // Try uploading the file, handle potential errors without breaking the loop - // if let Err(e) = self.upload_multipart(key, path) - if let Err(e) = self - .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) - .await - { - error!("Failed to upload file {filename:?}: {e}"); - continue; // Skip to the next file - } + let stream_relative_path = format!("{stream_name}/{file_suffix}"); - let absolute_path = self - .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) - .to_string(); - let store = PARSEABLE.storage().get_object_store(); - let manifest = - catalog::create_from_parquet_file(absolute_path.clone(), &path).unwrap(); - catalog::update_snapshot(store, &stream_name, manifest).await?; + // Try uploading the file, handle potential errors without breaking the loop + // if let Err(e) = self.upload_multipart(key, path) + if let Err(e) = self + .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) + .await + { + error!("Failed to upload file {filename:?}: {e}"); + continue; // Skip to the next file + } + let mut file_date_part = filename.split('.').collect::>()[0]; + file_date_part = file_date_part.split('=').collect::>()[1]; + let compressed_size = path.metadata().map_or(0, |meta| meta.len()); + STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .add(compressed_size as i64); + EVENTS_STORAGE_SIZE_DATE + .with_label_values(&["data", stream_name, "parquet", file_date_part]) + .add(compressed_size as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .add(compressed_size as i64); + + let absolute_path = self + .absolute_url( + RelativePath::from_path(&stream_relative_path).expect("valid relative path"), + ) + .to_string(); + let store = PARSEABLE.storage().get_object_store(); + let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &path)?; + catalog::update_snapshot(store, stream_name, manifest).await?; - if let Err(e) = remove_file(path) { - warn!("Failed to remove staged file: {e}"); - } + if let Err(e) = remove_file(path) { + warn!("Failed to remove staged file: {e}"); } + } - for path in stream.schema_files() { - let file = File::open(&path)?; - let schema: Schema = serde_json::from_reader(file)?; - commit_schema_to_storage(&stream_name, schema).await?; - if let Err(e) = remove_file(path) { - warn!("Failed to remove staged file: {e}"); - } + for path in stream.schema_files() { + let file = File::open(&path)?; + let schema: Schema = serde_json::from_reader(file)?; + commit_schema_to_storage(stream_name, schema).await?; + if let Err(e) = remove_file(path) { + warn!("Failed to remove staged file: {e}"); } } @@ -878,6 +875,26 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } } +pub fn sync_all_streams(joinset: &mut JoinSet>) { + let object_store = PARSEABLE.storage.get_object_store(); + for stream_name in PARSEABLE.streams.list() { + let object_store = object_store.clone(); + joinset.spawn(async move { + let start = Instant::now(); + info!("Starting object_store_sync for stream- {stream_name}"); + let result = object_store.upload_files_from_staging(&stream_name).await; + if let Err(ref e) = result { + error!("Failed to upload files from staging for stream {stream_name}: {e}"); + } else { + info!( + "Completed object_store_sync for stream- {stream_name} in {} ms", + start.elapsed().as_millis() + ); + } + result + }); + } +} pub async fn commit_schema_to_storage( stream_name: &str, schema: Schema, diff --git a/src/sync.rs b/src/sync.rs index 6fa568331..3f6fd5922 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -17,6 +17,7 @@ */ use chrono::{TimeDelta, Timelike}; +use futures::FutureExt; use std::collections::HashMap; use std::future::Future; use std::panic::AssertUnwindSafe; @@ -28,6 +29,7 @@ use tracing::{error, info, trace, warn}; use crate::alerts::{alerts_utils, AlertTask}; use crate::parseable::PARSEABLE; +use crate::storage::object_storage::sync_all_streams; use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; // Calculates the instant that is the start of the next minute @@ -75,7 +77,7 @@ where /// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every /// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds. -#[tokio::main(flavor = "multi_thread", worker_threads = 2)] +#[tokio::main(flavor = "multi_thread")] pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> { let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = @@ -83,7 +85,6 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> loop { select! { _ = &mut cancel_rx => { - // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { @@ -95,12 +96,9 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> return Ok(()); }, _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) }, _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again if let Err(e) = remote_sync_handler.await { error!("Error joining remote_sync_handler: {e:?}"); } @@ -119,51 +117,57 @@ pub fn object_store_sync() -> ( let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL); + info!("Object store sync task started"); + let mut inbox_rx = inbox_rx; - let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let result = tokio::spawn(async move { + let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL); loop { select! { _ = sync_interval.tick() => { trace!("Syncing Parquets to Object Store... "); - if let Err(e) = monitor_task_duration( - "object_store_sync", + + // Monitor the duration of sync_all_streams execution + monitor_task_duration( + "object_store_sync_all_streams", Duration::from_secs(15), || async { - PARSEABLE - .storage - .get_object_store() - .upload_files_from_staging().await - }, - ) - .await - { - warn!("failed to upload local data with object store. {e:?}"); - } + let mut joinset = JoinSet::new(); + sync_all_streams(&mut joinset); + + // Wait for all spawned tasks to complete + while let Some(res) = joinset.join_next().await { + log_join_result(res, "object store sync"); + } + } + ).await; }, - res = &mut inbox_rx => {match res{ - Ok(_) => break, - Err(_) => { - warn!("Inbox channel closed unexpectedly"); - break; - }} + res = &mut inbox_rx => { + match res { + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + } + } } } } - })); + }); - match result { - Ok(future) => { - future.await; + match AssertUnwindSafe(result).catch_unwind().await { + Ok(join_result) => { + if let Err(join_err) = join_result { + error!("Panic in object store sync task: {join_err:?}"); + } } Err(panic_error) => { error!("Panic in object store sync task: {panic_error:?}"); - let _ = outbox_tx.send(()); } } + let _ = outbox_tx.send(()); info!("Object store sync task ended"); }); @@ -183,38 +187,45 @@ pub fn local_sync() -> ( info!("Local sync task started"); let mut inbox_rx = inbox_rx; - let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let result = tokio::spawn(async move { let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL); - let mut joinset = JoinSet::new(); loop { select! { - // Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds _ = sync_interval.tick() => { - PARSEABLE.streams.flush_and_convert(&mut joinset, false) + // Monitor the duration of flush_and_convert execution + monitor_task_duration( + "local_sync_flush_and_convert", + Duration::from_secs(15), + || async { + let mut joinset = JoinSet::new(); + PARSEABLE.streams.flush_and_convert(&mut joinset, false, false); + + // Wait for all spawned tasks to complete + while let Some(res) = joinset.join_next().await { + log_join_result(res, "flush and convert"); + } + } + ).await; }, - // Joins and logs errors in spawned tasks - Some(res) = joinset.join_next(), if !joinset.is_empty() => { + res = &mut inbox_rx => { match res { - Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), - Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), - Err(err) => error!("Issue joining flush+conversion task: {err}"), + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + } } } - res = &mut inbox_rx => {match res{ - Ok(_) => break, - Err(_) => { - warn!("Inbox channel closed unexpectedly"); - break; - }} - } } } - })); + }); - match result { - Ok(future) => { - future.await; + match AssertUnwindSafe(result).catch_unwind().await { + Ok(join_result) => { + if let Err(join_err) = join_result { + error!("Panic in local sync task: {join_err:?}"); + } } Err(panic_error) => { error!("Panic in local sync task: {panic_error:?}"); @@ -228,6 +239,48 @@ pub fn local_sync() -> ( (handle, outbox_rx, inbox_tx) } +// local and object store sync at the start of the server +pub async fn sync_start() -> anyhow::Result<()> { + // Monitor local sync duration at startup + monitor_task_duration("startup_local_sync", Duration::from_secs(15), || async { + let mut local_sync_joinset = JoinSet::new(); + PARSEABLE + .streams + .flush_and_convert(&mut local_sync_joinset, true, false); + while let Some(res) = local_sync_joinset.join_next().await { + log_join_result(res, "flush and convert"); + } + }) + .await; + + // Monitor object store sync duration at startup + monitor_task_duration( + "startup_object_store_sync", + Duration::from_secs(15), + || async { + let mut object_store_joinset = JoinSet::new(); + sync_all_streams(&mut object_store_joinset); + while let Some(res) = object_store_joinset.join_next().await { + log_join_result(res, "object store sync"); + } + }, + ) + .await; + + Ok(()) +} + +fn log_join_result(res: Result, tokio::task::JoinError>, context: &str) +where + E: std::fmt::Debug, +{ + match res { + Ok(Ok(_)) => info!("Successfully completed {context}."), + Ok(Err(err)) => warn!("Failed to complete {context}. {err:?}"), + Err(err) => error!("Issue joining {context} task: {err}"), + } +} + /// A separate runtime for running all alert tasks #[tokio::main(flavor = "multi_thread")] pub async fn alert_runtime(mut rx: mpsc::Receiver) -> Result<(), anyhow::Error> {