Skip to content

Commit 4fedec5

Browse files
refactor
1 parent de65af8 commit 4fedec5

File tree

1 file changed

+35
-31
lines changed

1 file changed

+35
-31
lines changed

src/storage/object_storage.rs

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,22 +1041,26 @@ async fn calculate_single_field_stats(
10411041
stream_name: &str,
10421042
field_name: &str,
10431043
) -> Option<FieldStat> {
1044-
let count = query_single_i64(
1045-
&ctx,
1046-
&format!("select count(\"{field_name}\") as count from \"{stream_name}\""),
1047-
)
1048-
.await?;
1049-
if count == 0 {
1044+
// Use the escaped field name in the SQL query to avoid issues with special characters
1045+
let escaped_field_name = field_name.replace('"', "\"\"");
1046+
let escaped_stream_name = stream_name.replace('"', "\"\"");
1047+
let count_distinct_sql = format!(
1048+
"select COUNT(\"{escaped_field_name}\") as count, COUNT(DISTINCT \"{escaped_field_name}\") as distinct_count from \"{escaped_stream_name}\" ");
1049+
1050+
let df = ctx.sql(&count_distinct_sql).await.ok()?;
1051+
let batches = df.collect().await.ok()?;
1052+
let batch = batches.first()?;
1053+
if batch.num_rows() == 0 {
10501054
return None;
10511055
}
1056+
let count_array = batch.column(0).as_any().downcast_ref::<Int64Array>()?;
1057+
let distinct_count_array = batch.column(1).as_any().downcast_ref::<Int64Array>()?;
10521058

1053-
let distinct_count = query_single_i64(
1054-
&ctx,
1055-
&format!(
1056-
"select COUNT(DISTINCT \"{field_name}\") as distinct_count from \"{stream_name}\""
1057-
),
1058-
)
1059-
.await?;
1059+
let count = count_array.value(0);
1060+
let distinct_count = distinct_count_array.value(0);
1061+
if distinct_count == 0 {
1062+
return None;
1063+
}
10601064
let distinct_stats = query_distinct_stats(&ctx, stream_name, field_name).await;
10611065
Some(FieldStat {
10621066
field_name: field_name.to_string(),
@@ -1066,21 +1070,6 @@ async fn calculate_single_field_stats(
10661070
})
10671071
}
10681072

1069-
/// Queries a single integer value from the DataFusion context.
1070-
/// Returns `None` if the query fails or returns no rows.
1071-
/// This is used for fetching record count for a field and distinct count.
1072-
async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option<i64> {
1073-
let df = ctx.sql(sql).await.ok()?;
1074-
let batches = df.collect().await.ok()?;
1075-
let batch = batches.first()?;
1076-
if batch.num_rows() == 0 {
1077-
return None;
1078-
}
1079-
let array = batch.column(0).as_any().downcast_ref::<Int64Array>()?;
1080-
1081-
Some(array.value(0))
1082-
}
1083-
10841073
macro_rules! try_downcast {
10851074
($ty:ty, $arr:expr, $body:expr) => {
10861075
if let Some(arr) = $arr.as_any().downcast_ref::<$ty>() {
@@ -1202,15 +1191,30 @@ async fn query_distinct_stats(
12021191
stream_name: &str,
12031192
field_name: &str,
12041193
) -> Vec<DistinctStat> {
1194+
let escaped_field_name = field_name.replace('"', "\"\"");
1195+
let escaped_stream_name = stream_name.replace('"', "\"\"");
1196+
12051197
let sql = format!(
1206-
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}",
1198+
"select count(*) as distinct_count, \"{escaped_field_name}\" from \"{escaped_stream_name}\" group by \"{escaped_field_name}\" order by distinct_count desc limit {}",
12071199
PARSEABLE.options.max_field_statistics
12081200
);
12091201
let mut distinct_stats = Vec::new();
12101202
if let Ok(df) = ctx.sql(&sql).await {
1211-
let mut stream = df.execute_stream().await.expect("Failed to execute stream");
1203+
let mut stream = match df.execute_stream().await {
1204+
Ok(stream) => stream,
1205+
Err(e) => {
1206+
warn!("Failed to execute distinct stats query: {e}");
1207+
return distinct_stats; // Return empty if query fails
1208+
}
1209+
};
12121210
while let Some(batch_result) = stream.next().await {
1213-
let rb = batch_result.expect("Failed to execute stream");
1211+
let rb = match batch_result {
1212+
Ok(batch) => batch,
1213+
Err(e) => {
1214+
warn!("Failed to fetch batch in distinct stats query: {e}");
1215+
continue; // Skip this batch if there's an error
1216+
}
1217+
};
12141218
let Some(counts) = rb.column(0).as_any().downcast_ref::<Int64Array>() else {
12151219
warn!("Unexpected type for count column in stats query");
12161220
continue;

0 commit comments

Comments
 (0)