diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index 68bc6910e..8865c83ff 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -16,6 +16,8 @@ * */ +use std::fmt::Display; + use arrow_array::{Float64Array, Int64Array, RecordBatch}; use datafusion::{ functions_aggregate::{ @@ -339,46 +341,97 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr { } } +pub fn get_filter_string(where_clause: &Conditions) -> Result { + match &where_clause.operator { + Some(op) => match op { + LogicalOperator::And => { + let mut exprs = vec![]; + for condition in &where_clause.condition_config { + if let Some(value) = &condition.value { + // ad-hoc error check in case value is some and operator is either `is null` or `is not null` + if condition.operator.eq(&WhereConfigOperator::IsNull) + || condition.operator.eq(&WhereConfigOperator::IsNotNull) + { + return Err("value must be null when operator is either `is null` or `is not null`" + .into()); + } + let value = NumberOrString::from_string(value.to_owned()); + match value { + NumberOrString::Number(val) => exprs.push(format!( + "\"{}\" {} {}", + condition.column, condition.operator, val + )), + NumberOrString::String(val) => exprs.push(format!( + "\"{}\" {} '{}'", + condition.column, + condition.operator, + val.replace('\'', "''") + )), + } + } else { + exprs.push(format!("\"{}\" {}", condition.column, condition.operator)) + } + } + + Ok(exprs.join(" AND ")) + } + _ => Err(String::from("Invalid option 'or', only 'and' is supported")), + }, + _ => Err(String::from( + "Invalid option 'null', only 'and' is supported", + )), + } +} + fn match_alert_operator(expr: &ConditionConfig) -> Expr { // the form accepts value as a string // if it can be parsed as a number, then parse it // else keep it as a string - let value = NumberOrString::from_string(expr.value.clone()); - - // for maintaining column case - let column = format!(r#""{}""#, expr.column); - match expr.operator { - WhereConfigOperator::Equal => col(column).eq(lit(value)), - WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)), - WhereConfigOperator::LessThan => col(column).lt(lit(value)), - WhereConfigOperator::GreaterThan => col(column).gt(lit(value)), - WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)), - WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)), - WhereConfigOperator::IsNull => col(column).is_null(), - WhereConfigOperator::IsNotNull => col(column).is_not_null(), - WhereConfigOperator::ILike => col(column).ilike(lit(&expr.value)), - WhereConfigOperator::Contains => col(column).like(lit(&expr.value)), - WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new( - Box::new(col(column)), - Operator::RegexIMatch, - Box::new(lit(format!("^{}", expr.value))), - )), - WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new( - Box::new(col(column)), - Operator::RegexIMatch, - Box::new(lit(format!("{}$", expr.value))), - )), - WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(&expr.value)), - WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new( - Box::new(col(column)), - Operator::RegexNotIMatch, - Box::new(lit(format!("^{}", expr.value))), - )), - WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new( - Box::new(col(column)), - Operator::RegexNotIMatch, - Box::new(lit(format!("{}$", expr.value))), - )), + if let Some(value) = &expr.value { + let value = NumberOrString::from_string(value.clone()); + + // for maintaining column case + let column = format!(r#""{}""#, expr.column); + match expr.operator { + WhereConfigOperator::Equal => col(column).eq(lit(value)), + WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)), + WhereConfigOperator::LessThan => col(column).lt(lit(value)), + WhereConfigOperator::GreaterThan => col(column).gt(lit(value)), + WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)), + WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)), + WhereConfigOperator::ILike => col(column).ilike(lit(value)), + WhereConfigOperator::Contains => col(column).like(lit(value)), + WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new( + Box::new(col(column)), + Operator::RegexIMatch, + Box::new(lit(format!("^{}", value))), + )), + WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new( + Box::new(col(column)), + Operator::RegexIMatch, + Box::new(lit(format!("{}$", value))), + )), + WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(value)), + WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new( + Box::new(col(column)), + Operator::RegexNotIMatch, + Box::new(lit(format!("^{}", value))), + )), + WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new( + Box::new(col(column)), + Operator::RegexNotIMatch, + Box::new(lit(format!("{}$", value))), + )), + _ => unreachable!("value must not be null for operators other than `is null` and `is not null`. Should've been caught in validation") + } + } else { + // for maintaining column case + let column = format!(r#""{}""#, expr.column); + match expr.operator { + WhereConfigOperator::IsNull => col(column).is_null(), + WhereConfigOperator::IsNotNull => col(column).is_not_null(), + _ => unreachable!("value must be null for `is null` and `is not null`. Should've been caught in validation") + } } } @@ -417,3 +470,12 @@ impl NumberOrString { } } } + +impl Display for NumberOrString { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + NumberOrString::Number(v) => write!(f, "{v}"), + NumberOrString::String(v) => write!(f, "{v}"), + } + } +} diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index e9324e96b..af1af9387 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -220,7 +220,7 @@ impl Display for AlertOperator { } } -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub enum WhereConfigOperator { #[serde(rename = "=")] @@ -326,7 +326,7 @@ pub struct FilterConfig { pub struct ConditionConfig { pub column: String, pub operator: WhereConfigOperator, - pub value: String, + pub value: Option, } #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] @@ -343,20 +343,28 @@ impl Conditions { LogicalOperator::And | LogicalOperator::Or => { let expr1 = &self.condition_config[0]; let expr2 = &self.condition_config[1]; - format!( - "[{} {} {} {op} {} {} {}]", - expr1.column, - expr1.operator, - expr1.value, - expr2.column, - expr2.operator, - expr2.value - ) + let expr1_msg = if let Some(val) = &expr1.value { + format!("{} {} {}", expr1.column, expr1.operator, val) + } else { + format!("{} {}", expr1.column, expr1.operator) + }; + + let expr2_msg = if let Some(val) = &expr2.value { + format!("{} {} {}", expr2.column, expr2.operator, val) + } else { + format!("{} {}", expr2.column, expr2.operator) + }; + + format!("[{} {op} {}]", expr1_msg, expr2_msg) } }, None => { let expr = &self.condition_config[0]; - format!("[{} {} {}]", expr.column, expr.operator, expr.value) + if let Some(val) = &expr.value { + format!("{} {} {}", expr.column, expr.operator, val) + } else { + format!("{} {}", expr.column, expr.operator) + } } } } @@ -645,6 +653,27 @@ impl AlertConfig { } } } + + // validate that the value should be None in case of `is null` and `is not null` + for condition in config.condition_config.iter() { + let needs_no_value = matches!( + condition.operator, + WhereConfigOperator::IsNull | WhereConfigOperator::IsNotNull + ); + + if needs_no_value && condition.value.is_some() { + return Err(AlertError::CustomError( + "value must be null when operator is either `is null` or `is not null`" + .into(), + )); + } + if !needs_no_value && condition.value.as_ref().is_none_or(|v| v.trim().is_empty()) { + return Err(AlertError::CustomError( + "value must not be null when operator is neither `is null` nor `is not null`" + .into(), + )); + } + } Ok(()) } diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 9d151349b..a817546d0 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -18,6 +18,7 @@ use crate::event::error::EventError; use crate::handlers::http::fetch_schema; +use crate::utils::arrow::record_batches_to_json; use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder}; @@ -31,8 +32,9 @@ use futures::stream::once; use futures::{future, Stream, StreamExt}; use futures_util::Future; use http::StatusCode; +use itertools::Itertools; use serde::{Deserialize, Serialize}; -use serde_json::json; +use serde_json::{json, Value}; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -44,7 +46,7 @@ use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::error::ExecuteError; -use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery}; +use crate::query::{execute, CountsRequest, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -193,6 +195,7 @@ async fn handle_count_query( start_time: query_request.start_time.clone(), end_time: query_request.end_time.clone(), num_bins: 1, + conditions: None, }; let count_records = counts_req.get_bin_density().await?; let count = count_records[0].count; @@ -358,6 +361,7 @@ fn create_batch_processor( Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), } } + pub async fn get_counts( req: HttpRequest, counts_request: Json, @@ -365,15 +369,51 @@ pub async fn get_counts( let creds = extract_session_key_from_req(&req)?; let permissions = Users.get_permissions(&creds); + let body = counts_request.into_inner(); + // does user have access to table? - user_auth_for_datasets(&permissions, &[counts_request.stream.clone()])?; + user_auth_for_datasets(&permissions, &[body.stream.clone()])?; + + // if the user has given a sql query (counts call with filters applied), then use this flow + // this could include filters or group by + if body.conditions.is_some() { + let sql = body.get_df_sql().await?; + + let query_request = Query { + query: sql, + start_time: body.start_time, + end_time: body.end_time, + send_null: true, + fields: true, + streaming: false, + filter_tags: None, + }; - let records = counts_request.get_bin_density().await?; + let (records, _) = get_records_and_fields(&query_request, &req).await?; - Ok(web::Json(CountsResponse { - fields: vec!["start_time".into(), "end_time".into(), "count".into()], - records, - })) + if let Some(records) = records { + let json_records = record_batches_to_json(&records)?; + let records = json_records.into_iter().map(Value::Object).collect_vec(); + + let res = json!({ + "fields": vec!["start_time", "endTime", "count"], + "records": records, + }); + + return Ok(web::Json(res)); + } else { + return Err(QueryError::CustomError( + "No data returned for counts SQL".into(), + )); + } + } + + let records = body.get_bin_density().await?; + let res = json!({ + "fields": vec!["start_time", "endTime", "count"], + "records": records, + }); + Ok(web::Json(res)) } pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), EventError> { diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 4f409e892..7b3e0ee49 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -349,6 +349,7 @@ impl PrismDatasetRequest { start_time: "1h".to_owned(), end_time: "now".to_owned(), num_bins: 10, + conditions: None, }; let records = count_request.get_bin_density().await?; diff --git a/src/query/mod.rs b/src/query/mod.rs index e9c5632e7..304239585 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -47,6 +47,8 @@ use tokio::runtime::Runtime; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; +use crate::alerts::alerts_utils::get_filter_string; +use crate::alerts::Conditions; use crate::catalog::column::{Int64Type, TypedStatistics}; use crate::catalog::manifest::Manifest; use crate::catalog::snapshot::Snapshot; @@ -297,7 +299,7 @@ impl Query { } /// Record of counts for a given time bin. -#[derive(Debug, Serialize, Clone)] +#[derive(Debug, Serialize, Clone, Deserialize)] pub struct CountsRecord { /// Start time of the bin pub start_time: String, @@ -312,6 +314,15 @@ struct TimeBounds { end: DateTime, } +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct CountConditions { + /// Optional conditions for filters + pub conditions: Option, + /// GroupBy columns + pub group_by: Option>, +} + /// Request for counts, received from API/SQL query. #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -324,6 +335,8 @@ pub struct CountsRequest { pub end_time: String, /// Number of bins to divide the time range into pub num_bins: u64, + /// Conditions + pub conditions: Option, } impl CountsRequest { @@ -429,6 +442,35 @@ impl CountsRequest { bounds } + + /// This function will get executed only if self.conditions is some + pub async fn get_df_sql(&self) -> Result { + // unwrap because we have asserted that it is some + let count_conditions = self.conditions.as_ref().unwrap(); + + let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; + + let dur = time_range.end.signed_duration_since(time_range.start); + + let date_bin = if dur.num_minutes() <= 60 * 10 { + // date_bin 1 minute + format!("CAST(DATE_BIN('1 minute', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time", self.stream) + } else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 { + // date_bin 1 hour + format!("CAST(DATE_BIN('1 hour', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time", self.stream) + } else { + // date_bin 1 day + format!("CAST(DATE_BIN('1 day', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time", self.stream) + }; + + let query = if let Some(conditions) = &count_conditions.conditions { + let f = get_filter_string(conditions).map_err(QueryError::CustomError)?; + format!("SELECT {date_bin}, COUNT(*) as count FROM \"{}\" WHERE {} GROUP BY end_time,start_time ORDER BY end_time",self.stream,f) + } else { + format!("SELECT {date_bin}, COUNT(*) as count FROM \"{}\" GROUP BY end_time,start_time ORDER BY end_time",self.stream) + }; + Ok(query) + } } /// Response for the counts API