-
-
Notifications
You must be signed in to change notification settings - Fork 142
Updates for counts API #1347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Updates for counts API #1347
Conversation
counts API will also work with applied filters
WalkthroughThis change introduces support for SQL-style conditional filtering and grouping in counts queries. It adds a mechanism to convert structured filter conditions into SQL WHERE clauses, updates the counts request structure to accept optional conditions, and modifies the counts query handler to branch between SQL-based and legacy logic based on the presence of conditions. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant HTTP Handler
participant Query Module
participant Alerts Utils
Client->>HTTP Handler: POST /counts (CountsRequest)
HTTP Handler->>Query Module: Check for conditions in CountsRequest
alt Conditions present
Query Module->>Query Module: get_df_sql()
Query Module->>Alerts Utils: get_filter_string(Conditions)
Alerts Utils-->>Query Module: SQL WHERE clause (or error)
Query Module-->>HTTP Handler: SQL query string
HTTP Handler->>Query Module: get_records_and_fields(Query)
Query Module-->>HTTP Handler: Records (Arrow batches)
HTTP Handler->>HTTP Handler: Convert to JSON
HTTP Handler-->>Client: JSON results
else No conditions
HTTP Handler->>Query Module: get_bin_density()
Query Module-->>HTTP Handler: Bin density results
HTTP Handler-->>Client: JSON results
end
Suggested reviewers
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/prism/logstream/mod.rs (1)
352-353
: Minor:conditions: None
repetitionSince most internal callers will want “no conditions”, consider deriving
Default
forCountsRequest
withconditions
defaulting toNone
so this noise can be eliminated.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/alerts/alerts_utils.rs
(1 hunks)src/handlers/http/query.rs
(5 hunks)src/prism/logstream/mod.rs
(1 hunks)src/query/mod.rs
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (2)
src/query/mod.rs (2)
317-324
:group_by
field is defined but never used
CountConditions
exposes agroup_by
vector, yet no call-site (includingget_df_sql
) consumes it. Either wire it into the SQL generation or drop the field to avoid a misleading public API.Would you like guidance on integrating dynamic
GROUP BY
support?
338-340
:CountsRequest
now requires manual initialisation ofconditions
everywhereBe sure every
CountsRequest
instantiation (CLI, tests, etc.) setsconditions: None
to avoid breaking changes. Consider#[serde(default)]
on the new field to maintain backward compatibility with existing payloads.
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> { | ||
match &where_clause.operator { | ||
Some(op) => match op { | ||
LogicalOperator::And => { | ||
let mut exprs = vec![]; | ||
for condition in &where_clause.condition_config { | ||
let value = NumberOrString::from_string(condition.value.to_string()); | ||
match value { | ||
NumberOrString::Number(val) => exprs.push(format!( | ||
"{} {} {}", | ||
condition.column, condition.operator, val | ||
)), | ||
NumberOrString::String(val) => exprs.push(format!( | ||
"{} {} '{}'", | ||
condition.column, condition.operator, val | ||
)), | ||
} | ||
} | ||
|
||
Ok(exprs.join(" AND ")) | ||
} | ||
_ => Err(String::from("Invalid option 'or'")), | ||
}, | ||
_ => Err(String::from("Invalid option 'null'")), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
get_filter_string
omits identifier quoting & OR-support – may break queries and is inconsistent with get_filter_expr
- Column names are emitted verbatim (
condition.column
).
• This loses the double-quote casing thatmatch_alert_operator
applies and will silently fail for mixed-case columns or reserved keywords. - Only the
AND
path is accepted;OR
and single-condition (None
) cases return an error, whereas the expression-based siblingget_filter_expr
handles them gracefully. - String values are not SQL-escaped – a single quote in user input will break the query.
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
- match &where_clause.operator {
- Some(op) => match op {
- LogicalOperator::And => {
- let mut exprs = vec![];
- for condition in &where_clause.condition_config {
- let value = NumberOrString::from_string(condition.value.to_string());
- match value {
- NumberOrString::Number(val) => exprs.push(format!(
- "{} {} {}",
- condition.column, condition.operator, val
- )),
- NumberOrString::String(val) => exprs.push(format!(
- "{} {} '{}'",
- condition.column, condition.operator, val
- )),
- }
- }
- Ok(exprs.join(" AND "))
- }
- _ => Err(String::from("Invalid option 'or'")),
- },
- _ => Err(String::from("Invalid option 'null'")),
+ use LogicalOperator::*;
+
+ // helper – quote identifiers and escape single quotes
+ fn fmt_cond(c: &ConditionConfig) -> String {
+ let col = format!(r#""{}""#, c.column); // preserve case
+ let val = match NumberOrString::from_string(c.value.clone()) {
+ NumberOrString::Number(n) => n.to_string(),
+ NumberOrString::String(s) => format!("'{}'", s.replace('\'', "''")),
+ };
+ format!("{col} {} {val}", c.operator)
+ }
+
+ let parts: Vec<String> = where_clause
+ .condition_config
+ .iter()
+ .map(fmt_cond)
+ .collect();
+
+ match where_clause.operator {
+ Some(And) | None => Ok(parts.join(" AND ")),
+ Some(Or) => Ok(parts.join(" OR ")),
}
}
This keeps behaviour aligned with get_filter_expr
, avoids casing pitfalls and closes a basic SQL-injection surface.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> { | |
match &where_clause.operator { | |
Some(op) => match op { | |
LogicalOperator::And => { | |
let mut exprs = vec![]; | |
for condition in &where_clause.condition_config { | |
let value = NumberOrString::from_string(condition.value.to_string()); | |
match value { | |
NumberOrString::Number(val) => exprs.push(format!( | |
"{} {} {}", | |
condition.column, condition.operator, val | |
)), | |
NumberOrString::String(val) => exprs.push(format!( | |
"{} {} '{}'", | |
condition.column, condition.operator, val | |
)), | |
} | |
} | |
Ok(exprs.join(" AND ")) | |
} | |
_ => Err(String::from("Invalid option 'or'")), | |
}, | |
_ => Err(String::from("Invalid option 'null'")), | |
} | |
} | |
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> { | |
use LogicalOperator::*; | |
// helper – quote identifiers and escape single quotes | |
fn fmt_cond(c: &ConditionConfig) -> String { | |
let col = format!(r#""{}""#, c.column); // preserve case | |
let val = match NumberOrString::from_string(c.value.clone()) { | |
NumberOrString::Number(n) => n.to_string(), | |
NumberOrString::String(s) => format!("'{}'", s.replace('\'', "''")), | |
}; | |
format!("{col} {} {val}", c.operator) | |
} | |
let parts: Vec<String> = where_clause | |
.condition_config | |
.iter() | |
.map(fmt_cond) | |
.collect(); | |
match where_clause.operator { | |
Some(And) | None => Ok(parts.join(" AND ")), | |
Some(Or) => Ok(parts.join(" OR ")), | |
} | |
} |
🤖 Prompt for AI Agents
In src/alerts/alerts_utils.rs lines 342 to 367, update get_filter_string to
quote column identifiers consistently with get_filter_expr by applying double
quotes around column names, add support for the OR logical operator and handle
the None case for single conditions instead of returning errors, and ensure
string values are properly SQL-escaped to prevent injection and syntax errors by
escaping single quotes within string literals. Adjust the logic to build the
filter string accordingly for AND, OR, and single conditions.
pub async fn get_df_sql(&self) -> Result<String, QueryError> { | ||
let count_conditions = self.conditions.as_ref().unwrap(); | ||
|
||
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; | ||
|
||
let dur = time_range.end.signed_duration_since(time_range.start); | ||
|
||
let date_bin = if dur.num_minutes() <= 60 * 10 { | ||
// date_bin 1 minute | ||
format!("CAST(DATE_BIN('1 minute', {}.p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 minute' as end_time", self.stream) | ||
} else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 { | ||
// date_bin 1 hour | ||
String::from("CAST(DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 hour' as end_time") | ||
} else { | ||
// date_bin 1 day | ||
String::from("CAST(DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 day' as end_time") | ||
}; | ||
|
||
let query = if let Some(conditions) = &count_conditions.conditions { | ||
let f = get_filter_string(conditions).map_err(QueryError::CustomError)?; | ||
format!("SELECT {date_bin}, COUNT(*) as count FROM {} WHERE {} GROUP BY end_time,start_time ORDER BY end_time",self.stream,f) | ||
} else { | ||
format!("SELECT {date_bin}, COUNT(*) as count FROM {} GROUP BY end_time,start_time ORDER BY end_time",self.stream) | ||
}; | ||
Ok(query) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
get_df_sql
ignores group_by
, mixes table-qualified and unqualified columns, and hard-codes epoch
Issues:
- Only the 1-minute path prefixes
{}.p_timestamp
with the stream name; other branches rely on barep_timestamp
, risking ambiguous-column errors when joins are added later. GROUP BY end_time,start_time
is hard-coded, so even ifgroup_by
were used the query would be semantically wrong.- The anchor
TIMESTAMP '2025-01-01T00:00:00.000Z'
is arbitrary. ConsiderTIMESTAMP '1970-01-01 00:00:00+00'
or derive it once. self.conditions.as_ref().unwrap()
will panic on a malformed request instead of bubbling an error.
- let date_bin = if dur.num_minutes() <= 60 * 10 {
- // date_bin 1 minute
- format!("CAST(DATE_BIN('1 minute', {}.p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 minute' as end_time", self.stream)
+ let ts_col = r#""p_timestamp""#;
+ let anchor = "TIMESTAMP '1970-01-01T00:00:00Z'";
+ let date_bin = if dur.num_minutes() <= 600 {
+ format!(
+ "CAST(DATE_BIN('1 minute', {ts_col}, {anchor}) AS TEXT) AS start_time, \
+ DATE_BIN('1 minute', {ts_col}, {anchor}) + INTERVAL '1 minute' AS end_time"
+ )
} else if dur.num_minutes() < 60 * 240 {
// date_bin 1 hour
- String::from("CAST(DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 hour' as end_time")
+ format!(
+ "CAST(DATE_BIN('1 hour', {ts_col}, {anchor}) AS TEXT) AS start_time, \
+ DATE_BIN('1 hour', {ts_col}, {anchor}) + INTERVAL '1 hour' AS end_time"
+ )
} else {
// date_bin 1 day
- String::from("CAST(DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 day' as end_time")
+ format!(
+ "CAST(DATE_BIN('1 day', {ts_col}, {anchor}) AS TEXT) AS start_time, \
+ DATE_BIN('1 day', {ts_col}, {anchor}) + INTERVAL '1 day' AS end_time"
+ )
};
- let query = if let Some(conditions) = &count_conditions.conditions {
- let f = get_filter_string(conditions).map_err(QueryError::CustomError)?;
- format!("SELECT {date_bin}, COUNT(*) as count FROM {} WHERE {} GROUP BY end_time,start_time ORDER BY end_time",self.stream,f)
- } else {
- format!("SELECT {date_bin}, COUNT(*) as count FROM {} GROUP BY end_time,start_time ORDER BY end_time",self.stream)
- };
+ let base = format!(
+ "SELECT {date_bin}, COUNT(*) AS count FROM \"{}\"",
+ self.stream
+ );
+
+ let base = if let Some(cond) = &count_conditions.conditions {
+ let f = get_filter_string(cond).map_err(QueryError::CustomError)?;
+ format!("{base} WHERE {f}")
+ } else {
+ base
+ };
+
+ // add GROUP BY clauses
+ let mut group_cols = vec!["end_time", "start_time"];
+ if let Some(extra) = &count_conditions.group_by {
+ group_cols.extend(extra.iter().map(|c| c.as_str()));
+ }
+ let group_by = group_cols.join(",");
+ let query = format!("{base} GROUP BY {group_by} ORDER BY end_time");
+
Ok(query)
}
This resolves the panic path, honours additional group_by
columns, and avoids ambiguous identifiers.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/query/mod.rs lines 447 to 472, fix get_df_sql to consistently prefix
p_timestamp with the stream name in all date_bin branches to avoid ambiguous
columns, replace the hard-coded anchor timestamp with a constant or a derived
value like '1970-01-01 00:00:00+00', modify the GROUP BY clause to include
dynamic group_by columns instead of hard-coded end_time and start_time, and
replace the unwrap on self.conditions with proper error handling to prevent
panics on malformed requests.
let body = counts_request.into_inner(); | ||
|
||
// does user have access to table? | ||
user_auth_for_datasets(&permissions, &[counts_request.stream.clone()])?; | ||
user_auth_for_datasets(&permissions, &[body.stream.clone()])?; | ||
|
||
// if the user has given a sql query (counts call with filters applied), then use this flow | ||
// this could include filters or group by | ||
if body.conditions.is_some() { | ||
let sql = body.get_df_sql().await?; | ||
|
||
let query_request = Query { | ||
query: sql, | ||
start_time: body.start_time, | ||
end_time: body.end_time, | ||
send_null: true, | ||
fields: true, | ||
streaming: false, | ||
filter_tags: None, | ||
}; | ||
|
||
let records = counts_request.get_bin_density().await?; | ||
let (records, _) = get_records_and_fields(&query_request, &req).await?; | ||
|
||
Ok(web::Json(CountsResponse { | ||
fields: vec!["start_time".into(), "end_time".into(), "count".into()], | ||
records, | ||
})) | ||
if let Some(records) = records { | ||
let json_records = record_batches_to_json(&records)?; | ||
let records = json_records.into_iter().map(Value::Object).collect_vec(); | ||
|
||
let res = json!({ | ||
"fields": vec!["start_time", "end_time", "count"], | ||
"records": records, | ||
}); | ||
|
||
return Ok(web::Json(res)); | ||
} else { | ||
return Err(QueryError::CustomError( | ||
"No data returned for counts SQL".into(), | ||
)); | ||
} | ||
} | ||
|
||
let records = body.get_bin_density().await?; | ||
let res = json!({ | ||
"fields": vec!["start_time", "end_time", "count"], | ||
"records": records, | ||
}); | ||
Ok(web::Json(res)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Static fields
vector will be wrong for grouped COUNT queries
When group_by
is present, the result schema will be
[group_by..., start_time, end_time, count]
. Returning a hard-coded
["start_time", "end_time", "count"]
misleads clients and drops data.
Refactor to take the header from the returned RecordBatch
:
- let res = json!({
- "fields": vec!["start_time", "end_time", "count"],
- "records": records,
- });
+ let fields = records
+ .first()
+ .map(|map| map.keys().cloned().collect_vec())
+ .unwrap_or_default();
+
+ let res = json!({
+ "fields": fields,
+ "records": records,
+ });
🤖 Prompt for AI Agents
In src/handlers/http/query.rs around lines 372 to 417, the response uses a
hard-coded fields vector ["start_time", "end_time", "count"], which is incorrect
when group_by is present because the result schema includes group_by columns
before these fields. To fix this, extract the actual schema or header from the
returned RecordBatch or query result and use it to dynamically build the fields
array in the JSON response, ensuring all group_by columns and standard fields
are included accurately.
counts API will also work with applied filters
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Bug Fixes
Improvements