diff --git a/src/cli.rs b/src/cli.rs index 26cab2e95..52b6a05ca 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -388,6 +388,41 @@ pub struct Options { help = "Maximum level of flattening allowed for events" )] pub event_flatten_level: usize, + + // maximum limit to store the statistics for a field + #[arg( + long, + env = "P_MAX_FIELD_STATISTICS", + default_value = "50", + help = "Maximum number of field statistics to store" + )] + pub max_field_statistics: usize, + + // collect dataset stats + #[arg( + long, + env = "P_COLLECT_DATASET_STATS", + default_value = "false", + help = "Enable/Disable collecting dataset stats" + )] + pub collect_dataset_stats: bool, + + // the duration during which local sync should be completed + #[arg( + long, + env = "P_LOCAL_SYNC_THRESHOLD", + default_value = "30", + help = "Local sync threshold in seconds" + )] + pub local_sync_threshold: u64, + // the duration during which object store sync should be completed + #[arg( + long, + env = "P_OBJECT_STORE_SYNC_THRESHOLD", + default_value = "15", + help = "Object store sync threshold in seconds" + )] + pub object_store_sync_threshold: u64, } #[derive(Parser, Debug)] diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 09d826b46..5b01725ed 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -55,6 +55,7 @@ impl ParseableSinkProcessor { .create_stream_if_not_exists( stream_name, StreamType::UserDefined, + None, vec![log_source_entry], ) .await?; diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index b4803e124..82825b415 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -56,12 +56,35 @@ pub async fn check_shutdown_middleware( // This function is called when the server is shutting down pub async fn shutdown() { - // Set the shutdown flag to true - let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; - *shutdown_flag = true; + // Set shutdown flag to true + set_shutdown_flag().await; //sleep for 5 secs to allow any ongoing requests to finish tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + // Perform sync operations + perform_sync_operations().await; + + // If collect_dataset_stats is enabled, perform sync operations + // This is to ensure that all stats data is synced before the server shuts down + if PARSEABLE.options.collect_dataset_stats { + perform_sync_operations().await; + } +} + +async fn set_shutdown_flag() { + let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; + *shutdown_flag = true; +} + +async fn perform_sync_operations() { + // Perform local sync + perform_local_sync().await; + // Perform object store sync + perform_object_store_sync().await; +} + +async fn perform_local_sync() { let mut local_sync_joinset = JoinSet::new(); // Sync staging @@ -76,7 +99,9 @@ pub async fn shutdown() { Err(err) => error!("Failed to join async task: {err}"), } } +} +async fn perform_object_store_sync() { // Sync object store let mut object_store_joinset = JoinSet::new(); sync_all_streams(&mut object_store_joinset); diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index c23eb6659..34b832f6a 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -100,6 +100,7 @@ pub async fn ingest( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, + None, vec![log_source_entry.clone()], ) .await?; @@ -183,6 +184,7 @@ pub async fn handle_otel_logs_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, + None, vec![log_source_entry.clone()], ) .await?; @@ -248,6 +250,7 @@ pub async fn handle_otel_metrics_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, + None, vec![log_source_entry.clone()], ) .await?; @@ -313,6 +316,7 @@ pub async fn handle_otel_traces_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, + None, vec![log_source_entry.clone()], ) .await?; diff --git a/src/handlers/http/prism_home.rs b/src/handlers/http/prism_home.rs index 10ff0f04c..e83063685 100644 --- a/src/handlers/http/prism_home.rs +++ b/src/handlers/http/prism_home.rs @@ -26,7 +26,7 @@ use crate::{ }; const HOME_SEARCH_QUERY_PARAM: &str = "key"; - +pub const HOME_QUERY_PARAM: &str = "includeInternal"; /// Fetches the data to populate Prism's home /// /// @@ -36,8 +36,12 @@ const HOME_SEARCH_QUERY_PARAM: &str = "key"; pub async fn home_api(req: HttpRequest) -> Result { let key = extract_session_key_from_req(&req) .map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?; + let query_map = web::Query::>::from_query(req.query_string()) + .map_err(|_| PrismHomeError::InvalidQueryParameter(HOME_QUERY_PARAM.to_string()))?; - let res = generate_home_response(&key).await?; + let include_internal = query_map.get(HOME_QUERY_PARAM).is_some_and(|v| v == "true"); + + let res = generate_home_response(&key, include_internal).await?; Ok(web::Json(res)) } @@ -52,11 +56,12 @@ pub async fn home_search(req: HttpRequest) -> Result, log_source: Vec, ) -> Result { if self.streams.contains(stream_name) { return Ok(true); } + // validate custom partition if provided + if let Some(partition) = custom_partition { + validate_custom_partition(partition)?; + } + // For distributed deployments, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage @@ -375,7 +382,7 @@ impl Parseable { stream_name.to_string(), "", None, - None, + custom_partition, false, Arc::new(Schema::empty()), stream_type, diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 2ed089145..28cceefcd 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -29,7 +29,7 @@ use std::{ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; -use derive_more::{Deref, DerefMut}; +use derive_more::derive::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ arrow::ArrowWriter, @@ -478,15 +478,12 @@ impl Stream { // read arrow files on disk // convert them to parquet - let schema = self - .convert_disk_files_to_parquet( - time_partition.as_ref(), - custom_partition.as_ref(), - init_signal, - shutdown_signal, - ) - .inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?; - + let schema = self.convert_disk_files_to_parquet( + time_partition.as_ref(), + custom_partition.as_ref(), + init_signal, + shutdown_signal, + )?; // check if there is already a schema file in staging pertaining to this stream // if yes, then merge them and save @@ -640,7 +637,6 @@ impl Stream { } self.update_staging_metrics(&staging_files); - for (parquet_path, arrow_files) in staging_files { let record_reader = MergedReverseRecordReader::try_new(&arrow_files); if record_reader.readers.is_empty() { @@ -972,6 +968,7 @@ impl Stream { ); let start_convert = Instant::now(); + self.prepare_parquet(init_signal, shutdown_signal)?; trace!( "Converting arrows to parquet on stream ({}) took: {}s", diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 656e50cae..397fed2c1 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -33,7 +33,7 @@ use crate::{ handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError}, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, - storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, + storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, users::{dashboards::DASHBOARDS, filters::FILTERS}, }; @@ -88,7 +88,10 @@ pub struct HomeSearchResponse { resources: Vec, } -pub async fn generate_home_response(key: &SessionKey) -> Result { +pub async fn generate_home_response( + key: &SessionKey, + include_internal: bool, +) -> Result { // Execute these operations concurrently let (stream_titles_result, alerts_info_result) = tokio::join!(get_stream_titles(key), get_alerts_info()); @@ -120,6 +123,14 @@ pub async fn generate_home_response(key: &SessionKey) -> Result { + // Skip internal streams if the flag is false + if !include_internal + && metadata + .iter() + .all(|m| m.stream_type == StreamType::Internal) + { + continue; + } stream_wise_stream_json.insert(stream.clone(), metadata); datasets.push(DataSet { title: stream, diff --git a/src/query/mod.rs b/src/query/mod.rs index e9c5632e7..c9410eb2a 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -27,7 +27,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::DiskManagerConfig; -use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder}; +use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder}; use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::{ Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, @@ -61,6 +61,9 @@ use crate::utils::time::TimeRange; pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(PARSEABLE.storage())); +pub static QUERY_SESSION_STATE: Lazy = + Lazy::new(|| Query::create_session_state(PARSEABLE.storage())); + /// Dedicated multi-threaded runtime to run all queries on pub static QUERY_RUNTIME: Lazy = Lazy::new(|| Runtime::new().expect("Runtime should be constructible")); @@ -96,10 +99,28 @@ pub struct Query { impl Query { // create session context for this query pub fn create_session_context(storage: Arc) -> SessionContext { + let state = Self::create_session_state(storage.clone()); + + let schema_provider = Arc::new(GlobalSchemaProvider { + storage: storage.get_object_store(), + }); + state + .catalog_list() + .catalog(&state.config_options().catalog.default_catalog) + .expect("default catalog is provided by datafusion") + .register_schema( + &state.config_options().catalog.default_schema, + schema_provider, + ) + .unwrap(); + + SessionContext::new_with_state(state) + } + + fn create_session_state(storage: Arc) -> SessionState { let runtime_config = storage .get_datafusion_runtime() .with_disk_manager(DiskManagerConfig::NewOs); - let (pool_size, fraction) = match PARSEABLE.options.query_memory_pool_size { Some(size) => (size, 1.), None => { @@ -142,26 +163,11 @@ impl Query { .parquet .schema_force_view_types = true; - let state = SessionStateBuilder::new() + SessionStateBuilder::new() .with_default_features() .with_config(config) .with_runtime_env(runtime) - .build(); - - let schema_provider = Arc::new(GlobalSchemaProvider { - storage: storage.get_object_store(), - }); - state - .catalog_list() - .catalog(&state.config_options().catalog.default_catalog) - .expect("default catalog is provided by datafusion") - .register_schema( - &state.config_options().catalog.default_schema, - schema_provider, - ) - .unwrap(); - - SessionContext::new_with_state(state) + .build() } /// this function returns the result of the query diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 00e262631..c4a2ab5d2 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -16,28 +16,44 @@ * */ -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::collections::HashSet; -use std::fmt::Debug; -use std::fs::{remove_file, File}; -use std::num::NonZeroU32; -use std::path::Path; -use std::sync::Arc; -use std::time::Duration; -use std::time::Instant; - use actix_web_prometheus::PrometheusMetrics; +use arrow_array::Array; +use arrow_array::BinaryArray; +use arrow_array::BinaryViewArray; +use arrow_array::BooleanArray; +use arrow_array::Date32Array; +use arrow_array::Float64Array; +use arrow_array::Int64Array; +use arrow_array::StringArray; +use arrow_array::StringViewArray; +use arrow_array::TimestampMillisecondArray; +use arrow_schema::DataType; use arrow_schema::Schema; +use arrow_schema::TimeUnit; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; +use datafusion::prelude::ParquetReadOptions; +use datafusion::prelude::SessionContext; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; +use futures::StreamExt; use object_store::buffered::BufReader; use object_store::ObjectMeta; use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; +use serde::Serialize; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::HashSet; +use std::fmt::Debug; +use std::fs::{remove_file, File}; +use std::num::NonZeroU32; +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use tokio::task; use tokio::task::JoinSet; use tracing::info; use tracing::{error, warn}; @@ -48,8 +64,10 @@ use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; use crate::event::format::LogSourceEntry; +use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::handlers::http::modal::utils::ingest_utils::flatten_and_push_logs; use crate::handlers::http::users::CORRELATION_DIR; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::storage::StorageMetrics; @@ -57,7 +75,10 @@ use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STO use crate::option::Mode; use crate::parseable::LogStream; use crate::parseable::PARSEABLE; +use crate::query::QUERY_SESSION_STATE; use crate::stats::FullStats; +use crate::storage::StreamType; +use crate::utils::DATASET_STATS_STREAM_NAME; use super::{ retention::Retention, ObjectStorageError, ObjectStoreFormat, StorageMetadata, @@ -804,9 +825,10 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { return Ok(()); } info!("Starting object_store_sync for stream- {stream_name}"); - + let mut stats_calculated = false; let stream = PARSEABLE.get_or_create_stream(stream_name); let custom_partition = stream.get_custom_partition(); + let schema = stream.get_schema(); for path in stream.parquet_files() { let filename = path .file_name() @@ -827,7 +849,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let stream_relative_path = format!("{stream_name}/{file_suffix}"); // Try uploading the file, handle potential errors without breaking the loop - // if let Err(e) = self.upload_multipart(key, path) if let Err(e) = self .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) .await @@ -857,6 +878,19 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &path)?; catalog::update_snapshot(store, stream_name, manifest).await?; + // If stats collection is enabled, calculate field stats + if stream_name != DATASET_STATS_STREAM_NAME && PARSEABLE.options.collect_dataset_stats { + let max_field_statistics = PARSEABLE.options.max_field_statistics; + match calculate_field_stats(stream_name, &path, &schema, max_field_statistics).await + { + Ok(stats) if stats => stats_calculated = true, + Err(err) => warn!( + "Error calculating field stats for stream {}: {}", + stream_name, err + ), + _ => {} + } + } if let Err(e) = remove_file(path) { warn!("Failed to remove staged file: {e}"); } @@ -871,6 +905,17 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } } + if stats_calculated { + // perform local sync for the `pstats` dataset + task::spawn(async move { + if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) { + if let Err(err) = stats_stream.flush_and_convert(false, false) { + error!("Failed in local sync for dataset stats stream: {err}"); + } + } + }); + } + Ok(()) } } @@ -895,6 +940,325 @@ pub fn sync_all_streams(joinset: &mut JoinSet>) { }); } } + +const MAX_CONCURRENT_FIELD_STATS: usize = 10; + +#[derive(Serialize, Debug)] +struct DistinctStat { + distinct_value: String, + count: i64, +} + +#[derive(Serialize, Debug)] +struct FieldStat { + field_name: String, + count: i64, + distinct_count: i64, + distinct_stats: Vec, +} + +#[derive(Serialize, Debug)] +struct DatasetStats { + dataset_name: String, + field_stats: Vec, +} + +/// Calculates field statistics for the stream and pushes them to the internal stats dataset. +/// This function creates a new internal stream for stats if it doesn't exist. +/// It collects statistics for each field in the stream +async fn calculate_field_stats( + stream_name: &str, + parquet_path: &Path, + schema: &Schema, + max_field_statistics: usize, +) -> Result { + let field_stats = { + let ctx = SessionContext::new_with_state(QUERY_SESSION_STATE.clone()); + let table_name = Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().expect("valid path"), + ParquetReadOptions::default(), + ) + .await + .map_err(|e| PostError::Invalid(e.into()))?; + + collect_all_field_stats(&table_name, &ctx, schema, max_field_statistics).await + }; + let mut stats_calculated = false; + let stats = DatasetStats { + dataset_name: stream_name.to_string(), + field_stats, + }; + if stats.field_stats.is_empty() { + return Ok(stats_calculated); + } + stats_calculated = true; + let stats_value = + serde_json::to_value(&stats).map_err(|e| ObjectStorageError::Invalid(e.into()))?; + let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); + PARSEABLE + .create_stream_if_not_exists( + DATASET_STATS_STREAM_NAME, + StreamType::Internal, + Some(&"dataset_name".into()), + vec![log_source_entry], + ) + .await?; + flatten_and_push_logs( + stats_value, + DATASET_STATS_STREAM_NAME, + &LogSource::Json, + &HashMap::new(), + ) + .await?; + Ok(stats_calculated) +} + +/// Collects statistics for all fields in the stream. +/// Returns a vector of `FieldStat` for each field with non-zero count. +/// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently. +async fn collect_all_field_stats( + stream_name: &str, + ctx: &SessionContext, + schema: &Schema, + max_field_statistics: usize, +) -> Vec { + // Collect field names into an owned Vec to avoid lifetime issues + let field_names: Vec = schema + .fields() + .iter() + .map(|field| field.name().clone()) + .collect(); + let field_futures = field_names.into_iter().map(|field_name| { + let ctx = ctx.clone(); + async move { + calculate_single_field_stats(ctx, stream_name, &field_name, max_field_statistics).await + } + }); + + futures::stream::iter(field_futures) + .buffer_unordered(MAX_CONCURRENT_FIELD_STATS) + .filter_map(std::future::ready) + .collect::>() + .await +} + +/// This function is used to fetch distinct values and their counts for a field in the stream. +/// Returns a vector of `DistinctStat` containing distinct values and their counts. +/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`. +async fn calculate_single_field_stats( + ctx: SessionContext, + stream_name: &str, + field_name: &str, + max_field_statistics: usize, +) -> Option { + let mut total_count = 0; + let mut distinct_count = 0; + let mut distinct_stats = Vec::new(); + + let combined_sql = get_stats_sql(stream_name, field_name, max_field_statistics); + match ctx.sql(&combined_sql).await { + Ok(df) => { + let mut stream = match df.execute_stream().await { + Ok(stream) => stream, + Err(e) => { + warn!("Failed to execute distinct stats query: {e}"); + return None; // Return empty if query fails + } + }; + while let Some(batch_result) = stream.next().await { + let rb = match batch_result { + Ok(batch) => batch, + Err(e) => { + warn!("Failed to fetch batch in distinct stats query: {e}"); + continue; // Skip this batch if there's an error + } + }; + let total_count_array = rb.column(0).as_any().downcast_ref::()?; + let distinct_count_array = rb.column(1).as_any().downcast_ref::()?; + + total_count = total_count_array.value(0); + distinct_count = distinct_count_array.value(0); + if distinct_count == 0 { + return None; + } + + let field_value_array = rb.column(2).as_ref(); + let value_count_array = rb.column(3).as_any().downcast_ref::()?; + + for i in 0..rb.num_rows() { + let value = format_arrow_value(field_value_array, i); + let count = value_count_array.value(i); + + distinct_stats.push(DistinctStat { + distinct_value: value, + count, + }); + } + } + } + Err(e) => { + warn!("Failed to execute distinct stats query for field: {field_name}, error: {e}"); + return None; + } + } + Some(FieldStat { + field_name: field_name.to_string(), + count: total_count, + distinct_count, + distinct_stats, + }) +} + +fn get_stats_sql(stream_name: &str, field_name: &str, max_field_statistics: usize) -> String { + let escaped_field_name = field_name.replace('"', "\"\""); + let escaped_stream_name = stream_name.replace('"', "\"\""); + + format!( + r#" + WITH field_groups AS ( + SELECT + "{escaped_field_name}" as field_value, + COUNT(*) as value_count + FROM "{escaped_stream_name}" + GROUP BY "{escaped_field_name}" + ), + field_summary AS ( + SELECT + field_value, + value_count, + SUM(value_count) OVER () as total_count, + COUNT(*) OVER () as distinct_count, + ROW_NUMBER() OVER (ORDER BY value_count DESC) as rn + FROM field_groups + ) + SELECT + total_count, + distinct_count, + field_value, + value_count + FROM field_summary + WHERE rn <= {} + ORDER BY value_count DESC + "#, + max_field_statistics + ) +} + +macro_rules! try_downcast { + ($ty:ty, $arr:expr, $body:expr) => { + if let Some(arr) = $arr.as_any().downcast_ref::<$ty>() { + $body(arr) + } else { + warn!( + "Expected {} for {:?}, but found {:?}", + stringify!($ty), + $arr.data_type(), + $arr.data_type() + ); + "UNSUPPORTED".to_string() + } + }; +} + +/// Function to format an Arrow value at a given index into a string. +/// Handles null values and different data types by downcasting the array to the appropriate type. +fn format_arrow_value(array: &dyn Array, idx: usize) -> String { + if array.is_null(idx) { + return "NULL".to_string(); + } + + match array.data_type() { + DataType::Utf8 => try_downcast!(StringArray, array, |arr: &StringArray| arr + .value(idx) + .to_string()), + DataType::Utf8View => try_downcast!(StringViewArray, array, |arr: &StringViewArray| arr + .value(idx) + .to_string()), + DataType::Binary => try_downcast!(BinaryArray, array, |arr: &BinaryArray| { + String::from_utf8_lossy(arr.value(idx)).to_string() + }), + DataType::BinaryView => try_downcast!(BinaryViewArray, array, |arr: &BinaryViewArray| { + String::from_utf8_lossy(arr.value(idx)).to_string() + }), + DataType::Int64 => try_downcast!(Int64Array, array, |arr: &Int64Array| arr + .value(idx) + .to_string()), + DataType::Int32 => try_downcast!( + arrow_array::Int32Array, + array, + |arr: &arrow_array::Int32Array| arr.value(idx).to_string() + ), + DataType::Int16 => try_downcast!( + arrow_array::Int16Array, + array, + |arr: &arrow_array::Int16Array| arr.value(idx).to_string() + ), + DataType::Int8 => try_downcast!( + arrow_array::Int8Array, + array, + |arr: &arrow_array::Int8Array| arr.value(idx).to_string() + ), + DataType::UInt64 => try_downcast!( + arrow_array::UInt64Array, + array, + |arr: &arrow_array::UInt64Array| arr.value(idx).to_string() + ), + DataType::UInt32 => try_downcast!( + arrow_array::UInt32Array, + array, + |arr: &arrow_array::UInt32Array| arr.value(idx).to_string() + ), + DataType::UInt16 => try_downcast!( + arrow_array::UInt16Array, + array, + |arr: &arrow_array::UInt16Array| arr.value(idx).to_string() + ), + DataType::UInt8 => try_downcast!( + arrow_array::UInt8Array, + array, + |arr: &arrow_array::UInt8Array| arr.value(idx).to_string() + ), + DataType::Float64 => try_downcast!(Float64Array, array, |arr: &Float64Array| arr + .value(idx) + .to_string()), + DataType::Float32 => try_downcast!( + arrow_array::Float32Array, + array, + |arr: &arrow_array::Float32Array| arr.value(idx).to_string() + ), + DataType::Timestamp(TimeUnit::Millisecond, _) => try_downcast!( + TimestampMillisecondArray, + array, + |arr: &TimestampMillisecondArray| { + let timestamp = arr.value(idx); + chrono::DateTime::from_timestamp_millis(timestamp) + .map(|dt| dt.to_string()) + .unwrap_or_else(|| "INVALID_TIMESTAMP".to_string()) + } + ), + DataType::Date32 => try_downcast!(Date32Array, array, |arr: &Date32Array| arr + .value(idx) + .to_string()), + DataType::Boolean => try_downcast!(BooleanArray, array, |arr: &BooleanArray| if arr + .value(idx) + { + "true".to_string() + } else { + "false".to_string() + }), + DataType::Null => "NULL".to_string(), + _ => { + warn!( + "Unsupported array type for statistics: {:?}", + array.data_type() + ); + "UNSUPPORTED".to_string() + } + } +} + pub async fn commit_schema_to_storage( stream_name: &str, schema: Schema, @@ -988,3 +1352,531 @@ pub fn manifest_path(prefix: &str) -> RelativePathBuf { _ => RelativePathBuf::from_iter([prefix, MANIFEST_FILE]), } } +#[cfg(test)] +mod tests { + use std::{fs::OpenOptions, sync::Arc}; + + use arrow_array::{ + BooleanArray, Float64Array, Int64Array, RecordBatch, StringArray, TimestampMillisecondArray, + }; + use arrow_schema::{DataType, Field, Schema, TimeUnit}; + use datafusion::prelude::{ParquetReadOptions, SessionContext}; + use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; + use temp_dir::TempDir; + use ulid::Ulid; + + use crate::storage::object_storage::calculate_single_field_stats; + + async fn create_test_parquet_with_data() -> (TempDir, std::path::PathBuf) { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("test_data.parquet"); + let schema = Arc::new(create_test_schema()); + + // Create test data with various patterns + let id_array = Int64Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let name_array = StringArray::from(vec![ + Some("Alice"), + Some("Bob"), + Some("Alice"), + Some("Charlie"), + Some("Alice"), + Some("Bob"), + Some("David"), + None, + Some("Eve"), + Some("Frank"), + ]); + let score_array = Float64Array::from(vec![ + Some(95.5), + Some(87.2), + Some(95.5), + Some(78.9), + Some(92.1), + Some(88.8), + Some(91.0), + None, + Some(89.5), + Some(94.2), + ]); + let active_array = BooleanArray::from(vec![ + Some(true), + Some(false), + Some(true), + Some(true), + Some(true), + Some(false), + Some(true), + None, // null value + Some(false), + Some(true), + ]); + let timestamp_array = TimestampMillisecondArray::from(vec![ + Some(1640995200000), + Some(1640995260000), + Some(1640995200000), + Some(1640995320000), + Some(1640995380000), + Some(1640995440000), + Some(1640995500000), + None, + Some(1640995560000), + Some(1640995620000), + ]); + // Field with single value (all same) + let single_value_array = StringArray::from(vec![ + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + ]); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(id_array), + Arc::new(name_array), + Arc::new(score_array), + Arc::new(active_array), + Arc::new(timestamp_array), + Arc::new(single_value_array), + ], + ) + .unwrap(); + let props = WriterProperties::new(); + let mut parquet_file = OpenOptions::new() + .create(true) + .append(true) + .open(file_path.clone()) + .unwrap(); + let mut writer = + ArrowWriter::try_new(&mut parquet_file, schema.clone(), Some(props.clone())).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + (temp_dir, file_path) + } + + fn create_test_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("score", DataType::Float64, true), + Field::new("active", DataType::Boolean, true), + Field::new( + "created_at", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("single_value", DataType::Utf8, true), + ]) + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_multiple_values() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let random_suffix = Ulid::new().to_string(); + let ctx = SessionContext::new(); + ctx.register_parquet( + &random_suffix, + parquet_path.to_str().expect("valid path"), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test name field with multiple distinct values and different frequencies + let result = calculate_single_field_stats(ctx.clone(), &random_suffix, "name", 50).await; + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "name"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 7); + assert_eq!(stats.distinct_stats.len(), 7); + + // Verify ordering by count (descending) + assert!(stats.distinct_stats[0].count >= stats.distinct_stats[1].count); + assert!(stats.distinct_stats[1].count >= stats.distinct_stats[2].count); + + // Verify specific counts + let alice_stat = stats + .distinct_stats + .iter() + .find(|s| s.distinct_value == "Alice"); + assert!(alice_stat.is_some()); + assert_eq!(alice_stat.unwrap().count, 3); + + let bob_stat = stats + .distinct_stats + .iter() + .find(|s| s.distinct_value == "Bob"); + assert!(bob_stat.is_some()); + assert_eq!(bob_stat.unwrap().count, 2); + + let charlie_stat = stats + .distinct_stats + .iter() + .find(|s| s.distinct_value == "Charlie"); + assert!(charlie_stat.is_some()); + assert_eq!(charlie_stat.unwrap().count, 1); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_numeric_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test score field (Float64) + let result = calculate_single_field_stats(ctx.clone(), &table_name, "score", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "score"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 9); + + // Verify that 95.5 appears twice (should be first due to highest count) + let highest_count_stat = &stats.distinct_stats[0]; + assert_eq!(highest_count_stat.distinct_value, "95.5"); + assert_eq!(highest_count_stat.count, 2); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_boolean_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test active field (Boolean) + let result = calculate_single_field_stats(ctx.clone(), &table_name, "active", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "active"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 3); + assert_eq!(stats.distinct_stats.len(), 3); + + assert_eq!(stats.distinct_stats[0].distinct_value, "true"); + assert_eq!(stats.distinct_stats[0].count, 6); + assert_eq!(stats.distinct_stats[1].distinct_value, "false"); + assert_eq!(stats.distinct_stats[1].count, 3); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_timestamp_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test created_at field (Timestamp) + let result = calculate_single_field_stats(ctx.clone(), &table_name, "created_at", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "created_at"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 9); + + // Verify that the duplicate timestamp appears twice + let duplicate_timestamp = &stats.distinct_stats[0]; + assert_eq!(duplicate_timestamp.count, 2); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_single_value_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test field with single distinct value + let result = + calculate_single_field_stats(ctx.clone(), &table_name, "single_value", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "single_value"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 1); + assert_eq!(stats.distinct_stats.len(), 1); + assert_eq!(stats.distinct_stats[0].distinct_value, "constant"); + assert_eq!(stats.distinct_stats[0].count, 10); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_nonexistent_table() { + let ctx = SessionContext::new(); + + // Test with non-existent table + let result = + calculate_single_field_stats(ctx.clone(), "non_existent_table", "field", 50).await; + + // Should return None due to SQL execution failure + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_nonexistent_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test with non-existent field + let result = + calculate_single_field_stats(ctx.clone(), &table_name, "non_existent_field", 50).await; + + // Should return None due to SQL execution failure + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_special_characters() { + // Create a schema with field names containing special characters + let schema = Arc::new(Schema::new(vec![ + Field::new("field with spaces", DataType::Utf8, true), + Field::new("field\"with\"quotes", DataType::Utf8, true), + Field::new("field'with'apostrophes", DataType::Utf8, true), + ])); + + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("special_chars.parquet"); + + use parquet::arrow::AsyncArrowWriter; + use tokio::fs::File; + + let space_array = StringArray::from(vec![Some("value1"), Some("value2"), Some("value1")]); + let quote_array = StringArray::from(vec![Some("quote1"), Some("quote2"), Some("quote1")]); + let apostrophe_array = StringArray::from(vec![Some("apos1"), Some("apos2"), Some("apos1")]); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(space_array), + Arc::new(quote_array), + Arc::new(apostrophe_array), + ], + ) + .unwrap(); + + let file = File::create(&file_path).await.unwrap(); + let mut writer = AsyncArrowWriter::try_new(file, schema, None).unwrap(); + writer.write(&batch).await.unwrap(); + writer.close().await.unwrap(); + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test field with spaces + let result = + calculate_single_field_stats(ctx.clone(), &table_name, "field with spaces", 50).await; + assert!(result.is_some()); + let stats = result.unwrap(); + assert_eq!(stats.field_name, "field with spaces"); + assert_eq!(stats.count, 3); + assert_eq!(stats.distinct_count, 2); + + // Test field with quotes + let result = + calculate_single_field_stats(ctx.clone(), &table_name, "field\"with\"quotes", 50).await; + assert!(result.is_some()); + let stats = result.unwrap(); + assert_eq!(stats.field_name, "field\"with\"quotes"); + assert_eq!(stats.count, 3); + assert_eq!(stats.distinct_count, 2); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_empty_table() { + // Create empty table + let schema = Arc::new(create_test_schema()); + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("empty_data.parquet"); + + use parquet::arrow::AsyncArrowWriter; + use tokio::fs::File; + + let file = File::create(&file_path).await.unwrap(); + let mut writer = AsyncArrowWriter::try_new(file, schema.clone(), None).unwrap(); + + // Create empty batch + let empty_batch = RecordBatch::new_empty(schema.clone()); + writer.write(&empty_batch).await.unwrap(); + writer.close().await.unwrap(); + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let result = calculate_single_field_stats(ctx.clone(), &table_name, "name", 50).await; + assert!(result.unwrap().distinct_stats.is_empty()); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_streaming_behavior() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test that the function handles streaming properly by checking + // that all data is collected correctly across multiple batches + let result = calculate_single_field_stats(ctx.clone(), &table_name, "name", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + // Verify that the streaming collected all the data + let total_distinct_count: i64 = stats.distinct_stats.iter().map(|s| s.count).sum(); + assert_eq!(total_distinct_count, stats.count); + + // Verify that distinct_stats are properly ordered by count + for i in 1..stats.distinct_stats.len() { + assert!(stats.distinct_stats[i - 1].count >= stats.distinct_stats[i].count); + } + } + + #[tokio::test] + async fn test_calculate_single_field_stats_large_dataset() { + // Create a larger dataset to test streaming behavior + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("category", DataType::Utf8, true), + ])); + + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("large_data.parquet"); + + use parquet::arrow::AsyncArrowWriter; + use tokio::fs::File; + + // Create 1000 rows with 10 distinct categories + let ids: Vec = (0..1000).collect(); + let categories: Vec> = (0..1000) + .map(|i| { + Some(match i % 10 { + 0 => "cat_0", + 1 => "cat_1", + 2 => "cat_2", + 3 => "cat_3", + 4 => "cat_4", + 5 => "cat_5", + 6 => "cat_6", + 7 => "cat_7", + 8 => "cat_8", + _ => "cat_9", + }) + }) + .collect(); + + let id_array = Int64Array::from(ids); + let category_array = StringArray::from(categories); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(id_array), Arc::new(category_array)], + ) + .unwrap(); + + let file = File::create(&file_path).await.unwrap(); + let mut writer = AsyncArrowWriter::try_new(file, schema, None).unwrap(); + writer.write(&batch).await.unwrap(); + writer.close().await.unwrap(); + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let result = calculate_single_field_stats(ctx.clone(), &table_name, "category", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.count, 1000); + assert_eq!(stats.distinct_count, 10); + assert_eq!(stats.distinct_stats.len(), 10); + + // Each category should appear 100 times + for distinct_stat in &stats.distinct_stats { + assert_eq!(distinct_stat.count, 100); + } + } +} diff --git a/src/sync.rs b/src/sync.rs index 3f6fd5922..c6ae55c9e 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -131,7 +131,7 @@ pub fn object_store_sync() -> ( // Monitor the duration of sync_all_streams execution monitor_task_duration( "object_store_sync_all_streams", - Duration::from_secs(15), + Duration::from_secs(PARSEABLE.options.object_store_sync_threshold), || async { let mut joinset = JoinSet::new(); sync_all_streams(&mut joinset); @@ -196,7 +196,7 @@ pub fn local_sync() -> ( // Monitor the duration of flush_and_convert execution monitor_task_duration( "local_sync_flush_and_convert", - Duration::from_secs(15), + Duration::from_secs(PARSEABLE.options.local_sync_threshold), || async { let mut joinset = JoinSet::new(); PARSEABLE.streams.flush_and_convert(&mut joinset, false, false); @@ -242,21 +242,25 @@ pub fn local_sync() -> ( // local and object store sync at the start of the server pub async fn sync_start() -> anyhow::Result<()> { // Monitor local sync duration at startup - monitor_task_duration("startup_local_sync", Duration::from_secs(15), || async { - let mut local_sync_joinset = JoinSet::new(); - PARSEABLE - .streams - .flush_and_convert(&mut local_sync_joinset, true, false); - while let Some(res) = local_sync_joinset.join_next().await { - log_join_result(res, "flush and convert"); - } - }) + monitor_task_duration( + "startup_local_sync", + Duration::from_secs(PARSEABLE.options.local_sync_threshold), + || async { + let mut local_sync_joinset = JoinSet::new(); + PARSEABLE + .streams + .flush_and_convert(&mut local_sync_joinset, true, false); + while let Some(res) = local_sync_joinset.join_next().await { + log_join_result(res, "flush and convert"); + } + }, + ) .await; // Monitor object store sync duration at startup monitor_task_duration( "startup_object_store_sync", - Duration::from_secs(15), + Duration::from_secs(PARSEABLE.options.object_store_sync_threshold), || async { let mut object_store_joinset = JoinSet::new(); sync_all_streams(&mut object_store_joinset); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 010e9f594..4cd5cac3a 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -37,6 +37,8 @@ use datafusion::common::tree_node::TreeNode; use regex::Regex; use sha2::{Digest, Sha256}; +pub const DATASET_STATS_STREAM_NAME: &str = "pstats"; + pub fn get_node_id() -> String { let now = Utc::now().to_rfc3339(); let id = get_hash(&now).to_string().split_at(15).0.to_string();