Skip to content

Updates for counts API #1347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,33 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr {
}
}

pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
match &where_clause.operator {
Some(op) => match op {
LogicalOperator::And => {
let mut exprs = vec![];
for condition in &where_clause.condition_config {
let value = NumberOrString::from_string(condition.value.to_string());
match value {
NumberOrString::Number(val) => exprs.push(format!(
"{} {} {}",
condition.column, condition.operator, val
)),
NumberOrString::String(val) => exprs.push(format!(
"{} {} '{}'",
condition.column, condition.operator, val
)),
}
}

Ok(exprs.join(" AND "))
}
_ => Err(String::from("Invalid option 'or'")),
},
_ => Err(String::from("Invalid option 'null'")),
}
}
Comment on lines +342 to +367
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

get_filter_string omits identifier quoting & OR-support – may break queries and is inconsistent with get_filter_expr

  1. Column names are emitted verbatim (condition.column).
    • This loses the double-quote casing that match_alert_operator applies and will silently fail for mixed-case columns or reserved keywords.
  2. Only the AND path is accepted; OR and single-condition (None) cases return an error, whereas the expression-based sibling get_filter_expr handles them gracefully.
  3. String values are not SQL-escaped – a single quote in user input will break the query.
 pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
-    match &where_clause.operator {
-        Some(op) => match op {
-            LogicalOperator::And => {
-                let mut exprs = vec![];
-                for condition in &where_clause.condition_config {
-                    let value = NumberOrString::from_string(condition.value.to_string());
-                    match value {
-                        NumberOrString::Number(val) => exprs.push(format!(
-                            "{} {} {}",
-                            condition.column, condition.operator, val
-                        )),
-                        NumberOrString::String(val) => exprs.push(format!(
-                            "{} {} '{}'",
-                            condition.column, condition.operator, val
-                        )),
-                    }
-                }
-                Ok(exprs.join(" AND "))
-            }
-            _ => Err(String::from("Invalid option 'or'")),
-        },
-        _ => Err(String::from("Invalid option 'null'")),
+    use LogicalOperator::*;
+
+    // helper – quote identifiers and escape single quotes
+    fn fmt_cond(c: &ConditionConfig) -> String {
+        let col = format!(r#""{}""#, c.column);          // preserve case
+        let val = match NumberOrString::from_string(c.value.clone()) {
+            NumberOrString::Number(n) => n.to_string(),
+            NumberOrString::String(s) => format!("'{}'", s.replace('\'', "''")),
+        };
+        format!("{col} {} {val}", c.operator)
+    }
+
+    let parts: Vec<String> = where_clause
+        .condition_config
+        .iter()
+        .map(fmt_cond)
+        .collect();
+
+    match where_clause.operator {
+        Some(And) | None => Ok(parts.join(" AND ")),
+        Some(Or) => Ok(parts.join(" OR ")),
     }
 }

This keeps behaviour aligned with get_filter_expr, avoids casing pitfalls and closes a basic SQL-injection surface.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
match &where_clause.operator {
Some(op) => match op {
LogicalOperator::And => {
let mut exprs = vec![];
for condition in &where_clause.condition_config {
let value = NumberOrString::from_string(condition.value.to_string());
match value {
NumberOrString::Number(val) => exprs.push(format!(
"{} {} {}",
condition.column, condition.operator, val
)),
NumberOrString::String(val) => exprs.push(format!(
"{} {} '{}'",
condition.column, condition.operator, val
)),
}
}
Ok(exprs.join(" AND "))
}
_ => Err(String::from("Invalid option 'or'")),
},
_ => Err(String::from("Invalid option 'null'")),
}
}
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
use LogicalOperator::*;
// helper – quote identifiers and escape single quotes
fn fmt_cond(c: &ConditionConfig) -> String {
let col = format!(r#""{}""#, c.column); // preserve case
let val = match NumberOrString::from_string(c.value.clone()) {
NumberOrString::Number(n) => n.to_string(),
NumberOrString::String(s) => format!("'{}'", s.replace('\'', "''")),
};
format!("{col} {} {val}", c.operator)
}
let parts: Vec<String> = where_clause
.condition_config
.iter()
.map(fmt_cond)
.collect();
match where_clause.operator {
Some(And) | None => Ok(parts.join(" AND ")),
Some(Or) => Ok(parts.join(" OR ")),
}
}
🤖 Prompt for AI Agents
In src/alerts/alerts_utils.rs lines 342 to 367, update get_filter_string to
quote column identifiers consistently with get_filter_expr by applying double
quotes around column names, add support for the OR logical operator and handle
the None case for single conditions instead of returning errors, and ensure
string values are properly SQL-escaped to prevent injection and syntax errors by
escaping single quotes within string literals. Adjust the logic to build the
filter string accordingly for AND, OR, and single conditions.


fn match_alert_operator(expr: &ConditionConfig) -> Expr {
// the form accepts value as a string
// if it can be parsed as a number, then parse it
Expand Down
56 changes: 48 additions & 8 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use crate::utils::arrow::record_batches_to_json;
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder};
Expand All @@ -31,8 +32,9 @@ use futures::stream::once;
use futures::{future, Stream, StreamExt};
use futures_util::Future;
use http::StatusCode;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::json;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -44,7 +46,7 @@ use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::error::ExecuteError;
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{execute, CountsRequest, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand Down Expand Up @@ -193,6 +195,7 @@ async fn handle_count_query(
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
conditions: None,
};
let count_records = counts_req.get_bin_density().await?;
let count = count_records[0].count;
Expand Down Expand Up @@ -358,22 +361,59 @@ fn create_batch_processor(
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
}
}

pub async fn get_counts(
req: HttpRequest,
counts_request: Json<CountsRequest>,
) -> Result<impl Responder, QueryError> {
let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

let body = counts_request.into_inner();

// does user have access to table?
user_auth_for_datasets(&permissions, &[counts_request.stream.clone()])?;
user_auth_for_datasets(&permissions, &[body.stream.clone()])?;

// if the user has given a sql query (counts call with filters applied), then use this flow
// this could include filters or group by
if body.conditions.is_some() {
let sql = body.get_df_sql().await?;

let query_request = Query {
query: sql,
start_time: body.start_time,
end_time: body.end_time,
send_null: true,
fields: true,
streaming: false,
filter_tags: None,
};

let records = counts_request.get_bin_density().await?;
let (records, _) = get_records_and_fields(&query_request, &req).await?;

Ok(web::Json(CountsResponse {
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
records,
}))
if let Some(records) = records {
let json_records = record_batches_to_json(&records)?;
let records = json_records.into_iter().map(Value::Object).collect_vec();

let res = json!({
"fields": vec!["start_time", "endTime", "count"],
"records": records,
});

return Ok(web::Json(res));
} else {
return Err(QueryError::CustomError(
"No data returned for counts SQL".into(),
));
}
}

let records = body.get_bin_density().await?;
let res = json!({
"fields": vec!["start_time", "endTime", "count"],
"records": records,
});
Ok(web::Json(res))
}

pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), EventError> {
Expand Down
1 change: 1 addition & 0 deletions src/prism/logstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ impl PrismDatasetRequest {
start_time: "1h".to_owned(),
end_time: "now".to_owned(),
num_bins: 10,
conditions: None,
};

let records = count_request.get_bin_density().await?;
Expand Down
43 changes: 42 additions & 1 deletion src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ use tokio::runtime::Runtime;
use self::error::ExecuteError;
use self::stream_schema_provider::GlobalSchemaProvider;
pub use self::stream_schema_provider::PartialTimeFilter;
use crate::alerts::alerts_utils::get_filter_string;
use crate::alerts::Conditions;
use crate::catalog::column::{Int64Type, TypedStatistics};
use crate::catalog::manifest::Manifest;
use crate::catalog::snapshot::Snapshot;
Expand Down Expand Up @@ -297,7 +299,7 @@ impl Query {
}

/// Record of counts for a given time bin.
#[derive(Debug, Serialize, Clone)]
#[derive(Debug, Serialize, Clone, Deserialize)]
pub struct CountsRecord {
/// Start time of the bin
pub start_time: String,
Expand All @@ -312,6 +314,15 @@ struct TimeBounds {
end: DateTime<Utc>,
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct CountConditions {
/// Optional conditions for filters
pub conditions: Option<Conditions>,
/// GroupBy columns
pub group_by: Option<Vec<String>>,
}

/// Request for counts, received from API/SQL query.
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
Expand All @@ -324,6 +335,8 @@ pub struct CountsRequest {
pub end_time: String,
/// Number of bins to divide the time range into
pub num_bins: u64,
/// Conditions
pub conditions: Option<CountConditions>,
}

impl CountsRequest {
Expand Down Expand Up @@ -429,6 +442,34 @@ impl CountsRequest {

bounds
}

/// This function will get executed only if self.conditions is some
pub async fn get_df_sql(&self) -> Result<String, QueryError> {
let count_conditions = self.conditions.as_ref().unwrap();

let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;

let dur = time_range.end.signed_duration_since(time_range.start);

let date_bin = if dur.num_minutes() <= 60 * 10 {
// date_bin 1 minute
format!("CAST(DATE_BIN('1 minute', {}.p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 minute' as end_time", self.stream)
} else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
// date_bin 1 hour
String::from("CAST(DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 hour' as end_time")
} else {
// date_bin 1 day
String::from("CAST(DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 day' as end_time")
};

let query = if let Some(conditions) = &count_conditions.conditions {
let f = get_filter_string(conditions).map_err(QueryError::CustomError)?;
format!("SELECT {date_bin}, COUNT(*) as count FROM {} WHERE {} GROUP BY end_time,start_time ORDER BY end_time",self.stream,f)
} else {
format!("SELECT {date_bin}, COUNT(*) as count FROM {} GROUP BY end_time,start_time ORDER BY end_time",self.stream)
};
Ok(query)
}
Comment on lines +447 to +472
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

get_df_sql ignores group_by, mixes table-qualified and unqualified columns, and hard-codes epoch

Issues:

  1. Only the 1-minute path prefixes {}.p_timestamp with the stream name; other branches rely on bare p_timestamp, risking ambiguous-column errors when joins are added later.
  2. GROUP BY end_time,start_time is hard-coded, so even if group_by were used the query would be semantically wrong.
  3. The anchor TIMESTAMP '2025-01-01T00:00:00.000Z' is arbitrary. Consider TIMESTAMP '1970-01-01 00:00:00+00' or derive it once.
  4. self.conditions.as_ref().unwrap() will panic on a malformed request instead of bubbling an error.
-        let date_bin = if dur.num_minutes() <= 60 * 10 {
-            // date_bin 1 minute
-            format!("CAST(DATE_BIN('1 minute', {}.p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 minute' as end_time", self.stream)
+        let ts_col = r#""p_timestamp""#;
+        let anchor = "TIMESTAMP '1970-01-01T00:00:00Z'";
+        let date_bin = if dur.num_minutes() <= 600 {
+            format!(
+                "CAST(DATE_BIN('1 minute', {ts_col}, {anchor}) AS TEXT)  AS start_time, \
+                 DATE_BIN('1 minute', {ts_col}, {anchor}) + INTERVAL '1 minute' AS end_time"
+            )
         } else if dur.num_minutes() < 60 * 240 {
             // date_bin 1 hour
-            String::from("CAST(DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 hour' as end_time")
+            format!(
+                "CAST(DATE_BIN('1 hour',  {ts_col}, {anchor}) AS TEXT)  AS start_time, \
+                 DATE_BIN('1 hour',  {ts_col}, {anchor}) + INTERVAL '1 hour'  AS end_time"
+            )
         } else {
             // date_bin 1 day
-            String::from("CAST(DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 day' as end_time")
+            format!(
+                "CAST(DATE_BIN('1 day',   {ts_col}, {anchor}) AS TEXT)  AS start_time, \
+                 DATE_BIN('1 day',   {ts_col}, {anchor}) + INTERVAL '1 day'   AS end_time"
+            )
         };
 
-        let query = if let Some(conditions) = &count_conditions.conditions {
-            let f = get_filter_string(conditions).map_err(QueryError::CustomError)?;
-            format!("SELECT {date_bin}, COUNT(*) as count FROM {} WHERE {} GROUP BY end_time,start_time ORDER BY end_time",self.stream,f)
-        } else {
-            format!("SELECT {date_bin}, COUNT(*) as count FROM {} GROUP BY end_time,start_time ORDER BY end_time",self.stream)
-        };
+        let base = format!(
+            "SELECT {date_bin}, COUNT(*) AS count FROM \"{}\"",
+            self.stream
+        );
+
+        let base = if let Some(cond) = &count_conditions.conditions {
+            let f = get_filter_string(cond).map_err(QueryError::CustomError)?;
+            format!("{base} WHERE {f}")
+        } else {
+            base
+        };
+
+        // add GROUP BY clauses
+        let mut group_cols = vec!["end_time", "start_time"];
+        if let Some(extra) = &count_conditions.group_by {
+            group_cols.extend(extra.iter().map(|c| c.as_str()));
+        }
+        let group_by = group_cols.join(",");
+        let query = format!("{base} GROUP BY {group_by} ORDER BY end_time");
+
         Ok(query)
     }

This resolves the panic path, honours additional group_by columns, and avoids ambiguous identifiers.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/query/mod.rs lines 447 to 472, fix get_df_sql to consistently prefix
p_timestamp with the stream name in all date_bin branches to avoid ambiguous
columns, replace the hard-coded anchor timestamp with a constant or a derived
value like '1970-01-01 00:00:00+00', modify the GROUP BY clause to include
dynamic group_by columns instead of hard-coded end_time and start_time, and
replace the unwrap on self.conditions with proper error handling to prevent
panics on malformed requests.

}

/// Response for the counts API
Expand Down
Loading