Skip to content

Commit 5de56e5

Browse files
feat: add stats for each field (#1340)
read the record batches from arrow files in staging directory run datafusion queries to fetch count, distinct count and count for each distinct values for all fields in the dataset store in <dataset>_pmeta dataset UI to call below SQL query to fetch the stats from this dataset- ``` SELECT field_name, field_count distinct_count, distinct_value, distinct_value_count FROM ( SELECT field_stats_field_name as field_name, field_stats_distinct_stats_distinct_value as distinct_value, SUM(field_stats_count) as field_count, field_stats_distinct_count as distinct_count, SUM(field_stats_distinct_stats_count) as distinct_value_count, ROW_NUMBER() OVER ( PARTITION BY field_stats_field_name ORDER BY SUM(field_stats_count) DESC ) as rn FROM <dataset>_pmeta WHERE field_stats_field_name = 'status_code' AND field_stats_distinct_stats_distinct_value IS NOT NULL GROUP BY field_stats_field_name, field_stats_distinct_stats_distinct_value, field_stats_distinct_count ) ranked WHERE rn <= 5 ORDER BY field_name, distinct_value_count DESC; ```
1 parent 97e5d42 commit 5de56e5

File tree

12 files changed

+1054
-65
lines changed

12 files changed

+1054
-65
lines changed

src/cli.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,41 @@ pub struct Options {
388388
help = "Maximum level of flattening allowed for events"
389389
)]
390390
pub event_flatten_level: usize,
391+
392+
// maximum limit to store the statistics for a field
393+
#[arg(
394+
long,
395+
env = "P_MAX_FIELD_STATISTICS",
396+
default_value = "50",
397+
help = "Maximum number of field statistics to store"
398+
)]
399+
pub max_field_statistics: usize,
400+
401+
// collect dataset stats
402+
#[arg(
403+
long,
404+
env = "P_COLLECT_DATASET_STATS",
405+
default_value = "false",
406+
help = "Enable/Disable collecting dataset stats"
407+
)]
408+
pub collect_dataset_stats: bool,
409+
410+
// the duration during which local sync should be completed
411+
#[arg(
412+
long,
413+
env = "P_LOCAL_SYNC_THRESHOLD",
414+
default_value = "30",
415+
help = "Local sync threshold in seconds"
416+
)]
417+
pub local_sync_threshold: u64,
418+
// the duration during which object store sync should be completed
419+
#[arg(
420+
long,
421+
env = "P_OBJECT_STORE_SYNC_THRESHOLD",
422+
default_value = "15",
423+
help = "Object store sync threshold in seconds"
424+
)]
425+
pub object_store_sync_threshold: u64,
391426
}
392427

393428
#[derive(Parser, Debug)]

src/connectors/kafka/processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl ParseableSinkProcessor {
5555
.create_stream_if_not_exists(
5656
stream_name,
5757
StreamType::UserDefined,
58+
None,
5859
vec![log_source_entry],
5960
)
6061
.await?;

src/handlers/http/health_check.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,35 @@ pub async fn check_shutdown_middleware(
5656

5757
// This function is called when the server is shutting down
5858
pub async fn shutdown() {
59-
// Set the shutdown flag to true
60-
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
61-
*shutdown_flag = true;
59+
// Set shutdown flag to true
60+
set_shutdown_flag().await;
6261

6362
//sleep for 5 secs to allow any ongoing requests to finish
6463
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
64+
65+
// Perform sync operations
66+
perform_sync_operations().await;
67+
68+
// If collect_dataset_stats is enabled, perform sync operations
69+
// This is to ensure that all stats data is synced before the server shuts down
70+
if PARSEABLE.options.collect_dataset_stats {
71+
perform_sync_operations().await;
72+
}
73+
}
74+
75+
async fn set_shutdown_flag() {
76+
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
77+
*shutdown_flag = true;
78+
}
79+
80+
async fn perform_sync_operations() {
81+
// Perform local sync
82+
perform_local_sync().await;
83+
// Perform object store sync
84+
perform_object_store_sync().await;
85+
}
86+
87+
async fn perform_local_sync() {
6588
let mut local_sync_joinset = JoinSet::new();
6689

6790
// Sync staging
@@ -76,7 +99,9 @@ pub async fn shutdown() {
7699
Err(err) => error!("Failed to join async task: {err}"),
77100
}
78101
}
102+
}
79103

104+
async fn perform_object_store_sync() {
80105
// Sync object store
81106
let mut object_store_joinset = JoinSet::new();
82107
sync_all_streams(&mut object_store_joinset);

src/handlers/http/ingest.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub async fn ingest(
100100
.create_stream_if_not_exists(
101101
&stream_name,
102102
StreamType::UserDefined,
103+
None,
103104
vec![log_source_entry.clone()],
104105
)
105106
.await?;
@@ -183,6 +184,7 @@ pub async fn handle_otel_logs_ingestion(
183184
.create_stream_if_not_exists(
184185
&stream_name,
185186
StreamType::UserDefined,
187+
None,
186188
vec![log_source_entry.clone()],
187189
)
188190
.await?;
@@ -248,6 +250,7 @@ pub async fn handle_otel_metrics_ingestion(
248250
.create_stream_if_not_exists(
249251
&stream_name,
250252
StreamType::UserDefined,
253+
None,
251254
vec![log_source_entry.clone()],
252255
)
253256
.await?;
@@ -313,6 +316,7 @@ pub async fn handle_otel_traces_ingestion(
313316
.create_stream_if_not_exists(
314317
&stream_name,
315318
StreamType::UserDefined,
319+
None,
316320
vec![log_source_entry.clone()],
317321
)
318322
.await?;

src/handlers/http/prism_home.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
};
2727

2828
const HOME_SEARCH_QUERY_PARAM: &str = "key";
29-
29+
pub const HOME_QUERY_PARAM: &str = "includeInternal";
3030
/// Fetches the data to populate Prism's home
3131
///
3232
///
@@ -36,8 +36,12 @@ const HOME_SEARCH_QUERY_PARAM: &str = "key";
3636
pub async fn home_api(req: HttpRequest) -> Result<impl Responder, PrismHomeError> {
3737
let key = extract_session_key_from_req(&req)
3838
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;
39+
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
40+
.map_err(|_| PrismHomeError::InvalidQueryParameter(HOME_QUERY_PARAM.to_string()))?;
3941

40-
let res = generate_home_response(&key).await?;
42+
let include_internal = query_map.get(HOME_QUERY_PARAM).is_some_and(|v| v == "true");
43+
44+
let res = generate_home_response(&key, include_internal).await?;
4145

4246
Ok(web::Json(res))
4347
}
@@ -52,11 +56,12 @@ pub async fn home_search(req: HttpRequest) -> Result<impl Responder, PrismHomeEr
5256
return Ok(web::Json(serde_json::json!({})));
5357
}
5458

55-
let query_value = query_map
59+
let query_key = query_map
5660
.get(HOME_SEARCH_QUERY_PARAM)
5761
.ok_or_else(|| PrismHomeError::InvalidQueryParameter(HOME_SEARCH_QUERY_PARAM.to_string()))?
5862
.to_lowercase();
59-
let res = generate_home_search_response(&key, &query_value).await?;
63+
64+
let res = generate_home_search_response(&key, &query_key).await?;
6065
let json_res = serde_json::to_value(res)
6166
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;
6267

src/parseable/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ impl Parseable {
330330
.create_stream_if_not_exists(
331331
INTERNAL_STREAM_NAME,
332332
StreamType::Internal,
333+
None,
333334
vec![log_source_entry],
334335
)
335336
.await
@@ -354,12 +355,18 @@ impl Parseable {
354355
&self,
355356
stream_name: &str,
356357
stream_type: StreamType,
358+
custom_partition: Option<&String>,
357359
log_source: Vec<LogSourceEntry>,
358360
) -> Result<bool, PostError> {
359361
if self.streams.contains(stream_name) {
360362
return Ok(true);
361363
}
362364

365+
// validate custom partition if provided
366+
if let Some(partition) = custom_partition {
367+
validate_custom_partition(partition)?;
368+
}
369+
363370
// For distributed deployments, if the stream not found in memory map,
364371
//check if it exists in the storage
365372
//create stream and schema from storage
@@ -375,7 +382,7 @@ impl Parseable {
375382
stream_name.to_string(),
376383
"",
377384
None,
378-
None,
385+
custom_partition,
379386
false,
380387
Arc::new(Schema::empty()),
381388
stream_type,

src/parseable/streams.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use std::{
2929
use arrow_array::RecordBatch;
3030
use arrow_schema::{Field, Fields, Schema};
3131
use chrono::{NaiveDateTime, Timelike, Utc};
32-
use derive_more::{Deref, DerefMut};
32+
use derive_more::derive::{Deref, DerefMut};
3333
use itertools::Itertools;
3434
use parquet::{
3535
arrow::ArrowWriter,
@@ -478,15 +478,12 @@ impl Stream {
478478

479479
// read arrow files on disk
480480
// convert them to parquet
481-
let schema = self
482-
.convert_disk_files_to_parquet(
483-
time_partition.as_ref(),
484-
custom_partition.as_ref(),
485-
init_signal,
486-
shutdown_signal,
487-
)
488-
.inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?;
489-
481+
let schema = self.convert_disk_files_to_parquet(
482+
time_partition.as_ref(),
483+
custom_partition.as_ref(),
484+
init_signal,
485+
shutdown_signal,
486+
)?;
490487
// check if there is already a schema file in staging pertaining to this stream
491488
// if yes, then merge them and save
492489

@@ -640,7 +637,6 @@ impl Stream {
640637
}
641638

642639
self.update_staging_metrics(&staging_files);
643-
644640
for (parquet_path, arrow_files) in staging_files {
645641
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
646642
if record_reader.readers.is_empty() {
@@ -972,6 +968,7 @@ impl Stream {
972968
);
973969

974970
let start_convert = Instant::now();
971+
975972
self.prepare_parquet(init_signal, shutdown_signal)?;
976973
trace!(
977974
"Converting arrows to parquet on stream ({}) took: {}s",

src/prism/home/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
3434
parseable::PARSEABLE,
3535
rbac::{map::SessionKey, role::Action, Users},
36-
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
36+
storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
3737
users::{dashboards::DASHBOARDS, filters::FILTERS},
3838
};
3939

@@ -88,7 +88,10 @@ pub struct HomeSearchResponse {
8888
resources: Vec<Resource>,
8989
}
9090

91-
pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, PrismHomeError> {
91+
pub async fn generate_home_response(
92+
key: &SessionKey,
93+
include_internal: bool,
94+
) -> Result<HomeResponse, PrismHomeError> {
9295
// Execute these operations concurrently
9396
let (stream_titles_result, alerts_info_result) =
9497
tokio::join!(get_stream_titles(key), get_alerts_info());
@@ -120,6 +123,14 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
120123
for result in stream_metadata_results {
121124
match result {
122125
Ok((stream, metadata, dataset_type)) => {
126+
// Skip internal streams if the flag is false
127+
if !include_internal
128+
&& metadata
129+
.iter()
130+
.all(|m| m.stream_type == StreamType::Internal)
131+
{
132+
continue;
133+
}
123134
stream_wise_stream_json.insert(stream.clone(), metadata);
124135
datasets.push(DataSet {
125136
title: stream,

src/query/mod.rs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion::arrow::record_batch::RecordBatch;
2727
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
2828
use datafusion::error::DataFusionError;
2929
use datafusion::execution::disk_manager::DiskManagerConfig;
30-
use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
30+
use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder};
3131
use datafusion::logical_expr::expr::Alias;
3232
use datafusion::logical_expr::{
3333
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
@@ -61,6 +61,9 @@ use crate::utils::time::TimeRange;
6161
pub static QUERY_SESSION: Lazy<SessionContext> =
6262
Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));
6363

64+
pub static QUERY_SESSION_STATE: Lazy<SessionState> =
65+
Lazy::new(|| Query::create_session_state(PARSEABLE.storage()));
66+
6467
/// Dedicated multi-threaded runtime to run all queries on
6568
pub static QUERY_RUNTIME: Lazy<Runtime> =
6669
Lazy::new(|| Runtime::new().expect("Runtime should be constructible"));
@@ -96,10 +99,28 @@ pub struct Query {
9699
impl Query {
97100
// create session context for this query
98101
pub fn create_session_context(storage: Arc<dyn ObjectStorageProvider>) -> SessionContext {
102+
let state = Self::create_session_state(storage.clone());
103+
104+
let schema_provider = Arc::new(GlobalSchemaProvider {
105+
storage: storage.get_object_store(),
106+
});
107+
state
108+
.catalog_list()
109+
.catalog(&state.config_options().catalog.default_catalog)
110+
.expect("default catalog is provided by datafusion")
111+
.register_schema(
112+
&state.config_options().catalog.default_schema,
113+
schema_provider,
114+
)
115+
.unwrap();
116+
117+
SessionContext::new_with_state(state)
118+
}
119+
120+
fn create_session_state(storage: Arc<dyn ObjectStorageProvider>) -> SessionState {
99121
let runtime_config = storage
100122
.get_datafusion_runtime()
101123
.with_disk_manager(DiskManagerConfig::NewOs);
102-
103124
let (pool_size, fraction) = match PARSEABLE.options.query_memory_pool_size {
104125
Some(size) => (size, 1.),
105126
None => {
@@ -142,26 +163,11 @@ impl Query {
142163
.parquet
143164
.schema_force_view_types = true;
144165

145-
let state = SessionStateBuilder::new()
166+
SessionStateBuilder::new()
146167
.with_default_features()
147168
.with_config(config)
148169
.with_runtime_env(runtime)
149-
.build();
150-
151-
let schema_provider = Arc::new(GlobalSchemaProvider {
152-
storage: storage.get_object_store(),
153-
});
154-
state
155-
.catalog_list()
156-
.catalog(&state.config_options().catalog.default_catalog)
157-
.expect("default catalog is provided by datafusion")
158-
.register_schema(
159-
&state.config_options().catalog.default_schema,
160-
schema_provider,
161-
)
162-
.unwrap();
163-
164-
SessionContext::new_with_state(state)
170+
.build()
165171
}
166172

167173
/// this function returns the result of the query

0 commit comments

Comments
 (0)