Skip to content

feat: add stats for each field #1340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,41 @@ pub struct Options {
help = "Maximum level of flattening allowed for events"
)]
pub event_flatten_level: usize,

// maximum limit to store the statistics for a field
#[arg(
long,
env = "P_MAX_FIELD_STATISTICS",
default_value = "50",
help = "Maximum number of field statistics to store"
)]
pub max_field_statistics: usize,

// collect dataset stats
#[arg(
long,
env = "P_COLLECT_DATASET_STATS",
default_value = "false",
help = "Enable/Disable collecting dataset stats"
)]
pub collect_dataset_stats: bool,

// the duration during which local sync should be completed
#[arg(
long,
env = "P_LOCAL_SYNC_THRESHOLD",
default_value = "30",
help = "Local sync threshold in seconds"
)]
pub local_sync_threshold: u64,
// the duration during which object store sync should be completed
#[arg(
long,
env = "P_OBJECT_STORE_SYNC_THRESHOLD",
default_value = "15",
help = "Object store sync threshold in seconds"
)]
pub object_store_sync_threshold: u64,
}

#[derive(Parser, Debug)]
Expand Down
1 change: 1 addition & 0 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl ParseableSinkProcessor {
.create_stream_if_not_exists(
stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry],
)
.await?;
Expand Down
31 changes: 28 additions & 3 deletions src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,35 @@ pub async fn check_shutdown_middleware(

// This function is called when the server is shutting down
pub async fn shutdown() {
// Set the shutdown flag to true
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
*shutdown_flag = true;
// Set shutdown flag to true
set_shutdown_flag().await;

//sleep for 5 secs to allow any ongoing requests to finish
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

// Perform sync operations
perform_sync_operations().await;

// If collect_dataset_stats is enabled, perform sync operations
// This is to ensure that all stats data is synced before the server shuts down
if PARSEABLE.options.collect_dataset_stats {
perform_sync_operations().await;
}
}

async fn set_shutdown_flag() {
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
*shutdown_flag = true;
}

async fn perform_sync_operations() {
// Perform local sync
perform_local_sync().await;
// Perform object store sync
perform_object_store_sync().await;
}

async fn perform_local_sync() {
let mut local_sync_joinset = JoinSet::new();

// Sync staging
Expand All @@ -76,7 +99,9 @@ pub async fn shutdown() {
Err(err) => error!("Failed to join async task: {err}"),
}
}
}

async fn perform_object_store_sync() {
// Sync object store
let mut object_store_joinset = JoinSet::new();
sync_all_streams(&mut object_store_joinset);
Expand Down
4 changes: 4 additions & 0 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub async fn ingest(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
)
.await?;
Expand Down Expand Up @@ -183,6 +184,7 @@ pub async fn handle_otel_logs_ingestion(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
)
.await?;
Expand Down Expand Up @@ -248,6 +250,7 @@ pub async fn handle_otel_metrics_ingestion(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
)
.await?;
Expand Down Expand Up @@ -313,6 +316,7 @@ pub async fn handle_otel_traces_ingestion(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
)
.await?;
Expand Down
13 changes: 9 additions & 4 deletions src/handlers/http/prism_home.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
};

const HOME_SEARCH_QUERY_PARAM: &str = "key";

pub const HOME_QUERY_PARAM: &str = "includeInternal";
/// Fetches the data to populate Prism's home
///
///
Expand All @@ -36,8 +36,12 @@ const HOME_SEARCH_QUERY_PARAM: &str = "key";
pub async fn home_api(req: HttpRequest) -> Result<impl Responder, PrismHomeError> {
let key = extract_session_key_from_req(&req)
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
.map_err(|_| PrismHomeError::InvalidQueryParameter(HOME_QUERY_PARAM.to_string()))?;

let res = generate_home_response(&key).await?;
let include_internal = query_map.get(HOME_QUERY_PARAM).is_some_and(|v| v == "true");

let res = generate_home_response(&key, include_internal).await?;

Ok(web::Json(res))
}
Expand All @@ -52,11 +56,12 @@ pub async fn home_search(req: HttpRequest) -> Result<impl Responder, PrismHomeEr
return Ok(web::Json(serde_json::json!({})));
}

let query_value = query_map
let query_key = query_map
.get(HOME_SEARCH_QUERY_PARAM)
.ok_or_else(|| PrismHomeError::InvalidQueryParameter(HOME_SEARCH_QUERY_PARAM.to_string()))?
.to_lowercase();
let res = generate_home_search_response(&key, &query_value).await?;

let res = generate_home_search_response(&key, &query_key).await?;
let json_res = serde_json::to_value(res)
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;

Expand Down
9 changes: 8 additions & 1 deletion src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ impl Parseable {
.create_stream_if_not_exists(
INTERNAL_STREAM_NAME,
StreamType::Internal,
None,
vec![log_source_entry],
)
.await
Expand All @@ -354,12 +355,18 @@ impl Parseable {
&self,
stream_name: &str,
stream_type: StreamType,
custom_partition: Option<&String>,
log_source: Vec<LogSourceEntry>,
) -> Result<bool, PostError> {
if self.streams.contains(stream_name) {
return Ok(true);
}

// validate custom partition if provided
if let Some(partition) = custom_partition {
validate_custom_partition(partition)?;
}

// For distributed deployments, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
Expand All @@ -375,7 +382,7 @@ impl Parseable {
stream_name.to_string(),
"",
None,
None,
custom_partition,
false,
Arc::new(Schema::empty()),
stream_type,
Expand Down
19 changes: 8 additions & 11 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
use derive_more::{Deref, DerefMut};
use derive_more::derive::{Deref, DerefMut};
use itertools::Itertools;
use parquet::{
arrow::ArrowWriter,
Expand Down Expand Up @@ -478,15 +478,12 @@ impl Stream {

// read arrow files on disk
// convert them to parquet
let schema = self
.convert_disk_files_to_parquet(
time_partition.as_ref(),
custom_partition.as_ref(),
init_signal,
shutdown_signal,
)
.inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?;

let schema = self.convert_disk_files_to_parquet(
time_partition.as_ref(),
custom_partition.as_ref(),
init_signal,
shutdown_signal,
)?;
// check if there is already a schema file in staging pertaining to this stream
// if yes, then merge them and save

Expand Down Expand Up @@ -640,7 +637,6 @@ impl Stream {
}

self.update_staging_metrics(&staging_files);

for (parquet_path, arrow_files) in staging_files {
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
if record_reader.readers.is_empty() {
Expand Down Expand Up @@ -972,6 +968,7 @@ impl Stream {
);

let start_convert = Instant::now();

self.prepare_parquet(init_signal, shutdown_signal)?;
trace!(
"Converting arrows to parquet on stream ({}) took: {}s",
Expand Down
15 changes: 13 additions & 2 deletions src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
parseable::PARSEABLE,
rbac::{map::SessionKey, role::Action, Users},
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
users::{dashboards::DASHBOARDS, filters::FILTERS},
};

Expand Down Expand Up @@ -88,7 +88,10 @@ pub struct HomeSearchResponse {
resources: Vec<Resource>,
}

pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, PrismHomeError> {
pub async fn generate_home_response(
key: &SessionKey,
include_internal: bool,
) -> Result<HomeResponse, PrismHomeError> {
// Execute these operations concurrently
let (stream_titles_result, alerts_info_result) =
tokio::join!(get_stream_titles(key), get_alerts_info());
Expand Down Expand Up @@ -120,6 +123,14 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
for result in stream_metadata_results {
match result {
Ok((stream, metadata, dataset_type)) => {
// Skip internal streams if the flag is false
if !include_internal
&& metadata
.iter()
.all(|m| m.stream_type == StreamType::Internal)
{
continue;
}
stream_wise_stream_json.insert(stream.clone(), metadata);
datasets.push(DataSet {
title: stream,
Expand Down
44 changes: 25 additions & 19 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder};
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::{
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
Expand Down Expand Up @@ -61,6 +61,9 @@ use crate::utils::time::TimeRange;
pub static QUERY_SESSION: Lazy<SessionContext> =
Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));

pub static QUERY_SESSION_STATE: Lazy<SessionState> =
Lazy::new(|| Query::create_session_state(PARSEABLE.storage()));

/// Dedicated multi-threaded runtime to run all queries on
pub static QUERY_RUNTIME: Lazy<Runtime> =
Lazy::new(|| Runtime::new().expect("Runtime should be constructible"));
Expand Down Expand Up @@ -96,10 +99,28 @@ pub struct Query {
impl Query {
// create session context for this query
pub fn create_session_context(storage: Arc<dyn ObjectStorageProvider>) -> SessionContext {
let state = Self::create_session_state(storage.clone());

let schema_provider = Arc::new(GlobalSchemaProvider {
storage: storage.get_object_store(),
});
state
.catalog_list()
.catalog(&state.config_options().catalog.default_catalog)
.expect("default catalog is provided by datafusion")
.register_schema(
&state.config_options().catalog.default_schema,
schema_provider,
)
.unwrap();

SessionContext::new_with_state(state)
}

fn create_session_state(storage: Arc<dyn ObjectStorageProvider>) -> SessionState {
let runtime_config = storage
.get_datafusion_runtime()
.with_disk_manager(DiskManagerConfig::NewOs);

let (pool_size, fraction) = match PARSEABLE.options.query_memory_pool_size {
Some(size) => (size, 1.),
None => {
Expand Down Expand Up @@ -142,26 +163,11 @@ impl Query {
.parquet
.schema_force_view_types = true;

let state = SessionStateBuilder::new()
SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_runtime_env(runtime)
.build();

let schema_provider = Arc::new(GlobalSchemaProvider {
storage: storage.get_object_store(),
});
state
.catalog_list()
.catalog(&state.config_options().catalog.default_catalog)
.expect("default catalog is provided by datafusion")
.register_schema(
&state.config_options().catalog.default_schema,
schema_provider,
)
.unwrap();

SessionContext::new_with_state(state)
.build()
}

/// this function returns the result of the query
Expand Down
Loading
Loading