-
-
Notifications
You must be signed in to change notification settings - Fork 144
feat: add stats for each field #1340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis update introduces field-level statistics collection for Parquet datasets, configurable thresholds for sync operations, and new CLI options. Several methods are refactored for async execution, and new parameters are added to key functions. Internal stream filtering and shutdown sync flows are enhanced, and auxiliary constants are introduced. Changes
Sequence Diagram(s)sequenceDiagram
participant CLI
participant Parseable
participant ObjectStorage
participant StatsStream
CLI->>Parseable: ingest data / trigger sync
Parseable->>ObjectStorage: upload_files_from_staging
ObjectStorage->>ObjectStorage: For each Parquet file:
alt Dataset stats enabled and not internal stream
ObjectStorage->>ObjectStorage: calculate_field_stats
ObjectStorage->>StatsStream: push stats as JSON logs
end
ObjectStorage->>ObjectStorage: Remove uploaded file
sequenceDiagram
participant User
participant HTTP Handler
participant PrismHome
participant Storage
User->>HTTP Handler: GET /home?includeInternal=true
HTTP Handler->>PrismHome: generate_home_response(session_key, include_internal)
PrismHome->>Storage: fetch stream metadata
PrismHome->>PrismHome: filter internal streams if needed
PrismHome->>HTTP Handler: return HomeResponse
HTTP Handler->>User: respond with filtered datasets
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
✅ Files skipped from review due to trivial changes (1)
🧰 Additional context used🧠 Learnings (1)src/storage/object_storage.rs (2)
⏰ Context from checks skipped due to timeout of 90000ms (10)
🔇 Additional comments (6)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (2)
src/parseable/streams.rs (2)
338-365
: 🛠️ Refactor suggestionConsider improving the internal stream detection logic.
The current implementation uses a string contains check which could lead to false positives if a user-defined stream name happens to contain the internal stream identifier. Consider using the stream type instead.
- if !&self.stream_name.contains(INTERNAL_STREAM_NAME) { + if self.get_stream_type() != StreamType::Internal {Also, while non-fatal error handling is good for resilience, ensure that the warning provides enough context for debugging issues with statistics calculation.
466-566
:⚠️ Potential issue
⚠️ Potential memory issue with collecting all record batches.The current implementation collects all record batches in memory before calculating statistics. For large datasets, this could lead to excessive memory usage and potential out-of-memory errors.
Consider these alternatives:
- Process statistics incrementally as batches are written
- Implement streaming statistics calculation
- Add a configuration option to disable statistics for large streams
- Sample the data instead of processing all records
Example approach for incremental processing:
+ // Instead of collecting all batches, maintain running statistics + let mut stats_collector = StatsCollector::new(schema.clone()); for ref record in record_reader.merged_iter(schema, time_partition.cloned()) { writer.write(record)?; - record_batches.push(record.clone()); + stats_collector.update(record)?; }
🧹 Nitpick comments (2)
src/parseable/streams.rs (2)
783-822
: Good implementation with room for configuration improvements.The statistics calculation implementation using DataFusion is well-structured. However, consider making the internal stream naming pattern configurable to avoid potential conflicts.
Consider extracting the metadata stream suffix as a constant:
+const METADATA_STREAM_SUFFIX: &str = "pmeta"; + async fn calculate_field_stats( &self, record_batches: Vec<RecordBatch>, schema: Arc<Schema>, ) -> Result<(), PostError> { - let dataset_meta = format!("{}_{INTERNAL_STREAM_NAME}", &self.stream_name); + let dataset_meta = format!("{}_{METADATA_STREAM_SUFFIX}", &self.stream_name);
887-909
: Consider logging unsupported data types for better observability.The current implementation returns "UNSUPPORTED" for unhandled data types without any logging. This could make it difficult to identify which data types need support.
} else { + warn!("Unsupported array type for statistics: {:?}", array.data_type()); "UNSUPPORTED".to_string() }
Also consider supporting additional common types like:
Date32Array
/Date64Array
Float32Array
BinaryArray
/LargeBinaryArray
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/streams.rs
(19 hunks)
🔇 Additional comments (4)
src/parseable/streams.rs (4)
30-37
: LGTM! Necessary imports for the new statistics functionality.The added imports for arrow arrays, DataFusion, futures, and chrono are appropriate for implementing the field statistics feature.
79-98
: Well-structured statistics types with appropriate fields.The
DistinctStat
,FieldStat
, andDatasetStats
structs are well-designed for representing field-level statistics. The use ofSerialize
derive macro enables easy JSON serialization for storage in the internal stream.
1216-1217
: Test updates correctly handle the new return type.All test cases have been properly updated to destructure the tuple return value from
convert_disk_files_to_parquet
. The tests maintain their original assertions while adapting to the API change.Also applies to: 1297-1298, 1346-1347, 1400-1401
847-874
:❓ Verification inconclusive
Verify SQL identifier escaping in DataFusion.
While the implementation properly quotes field and stream names, ensure that DataFusion correctly handles special characters within these quoted identifiers to prevent any potential SQL injection issues.
Run the following script to check if DataFusion has any known issues with identifier escaping:
Consider adding input validation for field names before constructing SQL queries as an additional safety measure.
🏁 Script executed:
#!/bin/bash # Search for DataFusion SQL injection or identifier escaping documentation/issues rg -A 5 -B 5 "identifier.*escap|sql.*injection" --type rustLength of output: 61
Verify DataFusion identifier escaping
We didn’t find any references in the codebase or DataFusion docs indicating a vulnerability in how quoted identifiers are handled, but it’s best to manually confirm that DataFusion’s SQL parser correctly escapes special characters inside double-quoted identifiers.
• Location:
src/parseable/streams.rs
, lines 847–874
• Ensurefield_name
andstream_name
are only allowed to contain safe characters (e.g. match^[A-Za-z_][A-Za-z0-9_]*$
) before injecting them into the SQL string
• As an extra precaution, you could centralize identifier validation or sanitization in a helper function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (1)
src/parseable/streams.rs (1)
461-566
: 🛠️ Refactor suggestionMemory usage concern with record batch accumulation.
The change to accumulate all
RecordBatch
es could consume significant memory for large datasets, as all batches are held in memory simultaneously.Consider implementing a streaming approach or processing batches in chunks:
-let mut record_batches = Vec::new(); +let mut record_batches = Vec::with_capacity(1000); // or make configurable for (parquet_path, arrow_files) in staging_files { // ... existing code ... for ref record in record_reader.merged_iter(schema, time_partition.cloned()) { writer.write(record)?; record_batches.push(record.clone()); + + // Process in chunks to limit memory usage + if record_batches.len() >= 1000 { + // Process chunk here if needed + } } }
♻️ Duplicate comments (1)
src/parseable/streams.rs (1)
911-940
: Hardcoded limit needs to be configurable.This code segment has a hardcoded limit of 50 distinct values, which was previously flagged in past reviews but remains unaddressed.
As mentioned in previous reviews, the hardcoded limit of 50 should be made configurable to allow different limits based on field type or cardinality requirements.
🧹 Nitpick comments (2)
src/parseable/streams.rs (2)
338-393
: Async conversion looks correct, but consider error handling.The method signature change to async and the integration of field statistics calculation is well-structured. However, the error handling for
calculate_field_stats
only logs warnings, which might mask important issues.Consider whether statistics calculation failures should be more visible or reported differently.
887-909
: Incomplete data type support in value formatting.The
format_arrow_value
method returns "UNSUPPORTED" for many Arrow data types, which could result in incomplete statistics.Consider extending support for additional Arrow data types:
fn format_arrow_value(array: &dyn Array, idx: usize) -> String { if array.is_null(idx) { return "NULL".to_string(); } + // Add support for more data types + if let Some(arr) = array.as_any().downcast_ref::<arrow_array::Date32Array>() { + return arr.value(idx).to_string(); + } else if let Some(arr) = array.as_any().downcast_ref::<arrow_array::UInt64Array>() { + return arr.value(idx).to_string(); + } // ... existing type checks ... }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/streams.rs
(19 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (4)
src/parseable/streams.rs (4)
30-68
: LGTM! Import additions support the new async statistics functionality.The new imports are appropriate for the field statistics computation feature, including Arrow array types, DataFusion components, and async utilities.
79-98
: Well-designed data structures for field statistics.The struct definitions are clean, properly serializable, and follow Rust naming conventions. The hierarchical structure (DatasetStats -> FieldStat -> DistinctStat) appropriately models the statistical data.
1026-1026
: Async task spawning correctly implemented.The change to spawn async tasks that await the
flush_and_convert
method is properly implemented.
1216-1216
: Test updates correctly handle new return type.The test modifications properly handle the new tuple return type from
convert_disk_files_to_parquet
, accessing the schema via.0
and validating it appropriately.Also applies to: 1297-1298, 1346-1347, 1400-1401
dcb4e78
to
5ed9fd3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (5)
src/handlers/http/prism_home.rs (2)
39-46
: Loosen boolean parsing forincludeInternal
Only the literal
"true"
(lower-case) is recognised."True"
,"1"
,"yes"
or"false"
variants silently default tofalse
, which is surprising.-let include_internal = query_map - .get(HOME_QUERY_PARAM) - .map_or(false, |v| v == "true"); +let include_internal = query_map + .get(HOME_QUERY_PARAM) + .map(|v| matches!(v.as_str(), "true" | "True" | "TRUE" | "1" | "yes" | "YES")) + .unwrap_or(false);Alternatively, parse with
bool::from_str
to return a clear error on invalid input.
61-67
: Nit: variable rename increases clarity but comment is staleThe variable was renamed to
query_key
but the error message still references"key"
.
Update the constant or message to avoid confusion.src/parseable/streams.rs (3)
782-823
:calculate_field_stats
can blow the allocatorAll batches for the flush window are loaded into a
MemTable
; for large streams this easily reaches GBs of RAM and blocks the async executor.Consider:
- Registering the freshly-written parquet files in a DataFusion iterator instead of a
MemTable
.- Running stats calculation in a bounded thread-pool or offload to a separate service.
This keeps ingestion latency predictable.
892-902
:query_single_i64
misses overflow & type checksDown-casting is safe but the value could exceed
i64
, or the column might beUInt64
.
Return a typed error instead of silentlyNone
, and handle unsigned arrays:-if let Some(arr) = batch.column(0).as_any().downcast_ref::<Int64Array>() { - Some(arr.value(0)) -} else { - None -} +match batch.column(0).data_type() { + arrow_schema::DataType::Int64 => batch + .column(0) + .as_any() + .downcast_ref::<Int64Array>() + .map(|a| a.value(0)), + arrow_schema::DataType::UInt64 => batch + .column(0) + .as_any() + .downcast_ref::<arrow_array::UInt64Array>() + .map(|a| a.value(0) as i64), + _ => None, +}
906-934
: Date32 values returned as epoch-days
format_arrow_value
serialisesDate32Array
witharr.value(idx).to_string()
which yields a day offset from 1970-01-01, not a human date.Convert to ISO date for UI consumption:
-} else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() { - return arr.value(idx).to_string(); +} else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() { + let days = arr.value(idx); + return chrono::NaiveDate::from_num_days_from_ce_opt(days + 719163) // align to epoch + .map(|d| d.to_string()) + .unwrap_or_else(|| days.to_string()); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/cli.rs
(1 hunks)src/handlers/http/prism_home.rs
(3 hunks)src/otel/traces.rs
(12 hunks)src/parseable/streams.rs
(19 hunks)src/prism/home/mod.rs
(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/otel/traces.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/cli.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
🔇 Additional comments (1)
src/parseable/streams.rs (1)
828-851
:futures
/futures-util
mix may break trait resolution
futures::stream::iter(..)
returns a type from thefutures
crate, but the importedStreamExt
is fromfutures_util
.
Although often re-exported, this is version-sensitive and has bitten us before.Safer:
use futures_util::{stream, StreamExt}; ... stream::iter(field_futures) .buffer_unordered(MAX_CONCURRENT_FIELD_STATS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
src/parseable/streams.rs (1)
500-523
: 🛠️ Refactor suggestionUnbounded
RecordBatch
cloning – high memory pressure risk
record_batches.push(record.clone())
eagerly clones every batch that is written to Parquet and stores them in aVec
.
For large ingests this can explode resident memory (each batch is still fully held by the writer while the clone is kept for stats).- writer.write(record)?; - // Collect record batches for finding statistics later - record_batches.push(record.clone()); + writer.write(record)?; + + // Avoid cloning full batches – either + // 1. stream them into the `MemTable` immediately, or + // 2. compute stats directly from the already-written parquet file + // via `ctx.read_parquet()`Holding only row/column counts or a sampled subset is usually sufficient for field statistics.
Consider a streaming approach or computing stats on Parquet to cap memory usage.
♻️ Duplicate comments (2)
src/parseable/streams.rs (2)
861-870
: Field names interpolated into SQL – injection / syntax-break risk
Same concern raised in earlier review still applies:format!("...\"{field_name}\"...")
embeds unsanitised user data.Escape double quotes or, better, build the logical plan via DataFusion API to avoid raw SQL.
Also applies to: 872-879
945-969
:expect
on downcast can panic on schema driftThe previous review pointed this out; nothing changed – a bad type panics the whole stats job.
Replace with graceful handling (
downcast_ref()?
pluswarn!
).
🧹 Nitpick comments (2)
src/parseable/streams.rs (2)
873-879
:distinct_count
includes NULL whilecount
excludes it
count
filters outNULL
, butdistinct_count
does not.
This makes the two numbers incomparable (aNULL
group counts as one distinct value).Add
WHERE "{field_name}" IS NOT NULL
to the distinct query for consistency.
907-935
:format_arrow_value
printsDate32
as epoch days
Date32Array::value()
returns days since epoch, so consumers will see raw integers like19723
instead of2024-01-01
.Convert to an ISO date string:
else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() { let days = arr.value(idx); NaiveDate::from_num_days_from_ce_opt(days + 719163) .map(|d| d.to_string()) .unwrap_or_else(|| days.to_string()) }(Optional) consider similar human-friendly formatting for timestamps.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/parseable/streams.rs
(18 hunks)src/utils/mod.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/utils/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
89f6547
to
197fe51
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
src/parseable/streams.rs (3)
679-735
: Excessive cloning & accumulation ofRecordBatch
– memory blow-up riskEvery batch written to parquet is cloned and pushed into an ever-growing
record_batches
Vec
solely so stats can be computed later. On large backfills this can explode RAM usage (the earlier review on “Unbounded cloning ofRecordBatch
inflates memory usage” is still 100 % applicable).- writer.write(record)?; - // Collect record batches for finding statistics later - record_batches.push(record.clone()); + writer.write(record)?; + // TODO: eliminate cloning; derive stats by querying the just-written parquet or + // stream the batch into DataFusion immediately without keeping it in RAM.
1105-1124
: Unsafe string interpolation → SQL-injection / syntax-breakage
field_name
andstream_name
are interpolated directly into the SQL without quoting/escaping. If either contains"
,\
or reserved words the query will fail or – once user-supplied names are possible – become an injection vector.Consider:
// Minimal mitigation let escaped = field_name.replace('\"', "\"\""); format!("select count(\"{escaped}\") …")Ideally switch to DataFusion’s logical plan API instead of raw SQL.
1197-1204
:expect()
will panic on unexpected column type
downcast_ref::<Int64Array>().expect("…")
can still abort the whole flush path if the projection changes (e.g. BigInt → UInt64). The earlier panic-safety comment still applies.- .downcast_ref::<Int64Array>() - .expect("Counts should be Int64Array"); + let Some(counts) = rb + .column(0) + .as_any() + .downcast_ref::<Int64Array>() else { + warn!("Unexpected type for count column while computing stats"); + continue; + };
🧹 Nitpick comments (2)
src/parseable/streams.rs (2)
1151-1168
:Date32Array
value rendered as raw integer
format_arrow_value
printsDate32
values viaarr.value(idx).to_string()
which returns the day count since epoch (“19733”), not an ISO date. This is misleading for consumers.- } else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() { - return arr.value(idx).to_string(); + } else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() { + let days = arr.value(idx); + return chrono::NaiveDate::from_num_days_from_ce_opt(days + 719163) // 1970-01-01 offset + .map(|d| d.to_string()) + .unwrap_or_else(|| days.to_string());
1042-1049
:MemTable::try_new
duplicates data – consider querying parquet directlyWrapping all batches in a
MemTable
keeps a second full copy of the data in memory, aggravating the cloning issue above. Reading the freshly-written parquet (ctx.read_parquet
) or feeding an iterator (MemTable::try_new_with_reader
) avoids the duplication.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/cli.rs
(1 hunks)src/handlers/http/prism_home.rs
(3 hunks)src/parseable/streams.rs
(19 hunks)src/prism/home/mod.rs
(3 hunks)src/utils/mod.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/utils/mod.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- src/prism/home/mod.rs
- src/cli.rs
- src/handlers/http/prism_home.rs
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (1)
src/parseable/streams.rs (1)
1001-1017
:flush_and_convert
ignores errors from stats ingestion
prepare_parquet
internally logs and swallows errors fromcalculate_field_stats
, so a broken stats path silently succeeds. Consider bubbling a dedicated error variant (e.g.StatsError
) so operators can act instead of discovering silent data loss later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (6)
src/parseable/streams.rs (6)
666-721
: Memory usage concern with collecting all RecordBatches.The function now collects all record batches in memory, which could cause issues with large datasets. This is in addition to the memory already used during parquet writing.
Consider:
- Processing statistics in a streaming fashion as batches are written
- Adding memory limits or batch size limits
- Making statistics collection optional via configuration
740-742
: Unbounded cloning of RecordBatch inflates memory usage.Cloning every record batch that is written to parquet can quickly exhaust memory for large ingests.
1050-1054
: Security vulnerability: SQL injection risk in statistics calculation.The function registers a table with a user-provided stream name that will be used in SQL queries downstream. This creates a SQL injection vulnerability.
#!/bin/bash # Description: Check how stream names are validated before use in SQL queries # Search for stream name validation rg -A 5 "stream_name.*validate|validate.*stream_name" # Check if stream names are sanitized before SQL usage ast-grep --pattern 'fn $_($$$ stream_name: $_, $$$) { $$$ sql($$$stream_name$$$) $$$ }'
1112-1127
: Potential SQL injection via unsanitized field names.Field names are embedded directly into SQL strings without escaping, creating injection vulnerabilities.
Apply this fix to escape field names:
+fn escape_field_name(field_name: &str) -> String { + field_name.replace('"', "\"\"") +} + let count = query_single_i64( &ctx, &format!( - "select count(\"{field_name}\") as count from \"{stream_name}\" where \"{field_name}\" is not null" + "select count(\"{}\") as count from \"{}\" where \"{}\" is not null", + escape_field_name(field_name), + escape_field_name(stream_name), + escape_field_name(field_name) ), ) .await?;
1196-1199
: Multiple issues: SQL injection and missing field name escaping.The SQL query embeds field names without escaping, and the limit uses a global configuration value.
Apply proper escaping:
let sql = format!( - "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}", + "select count(*) as distinct_count, \"{}\" from \"{}\" group by \"{}\" order by distinct_count desc limit {}", + field_name.replace('"', "\"\""), + stream_name.replace('"', "\"\""), + field_name.replace('"', "\"\""), PARSEABLE.options.max_field_statistics );
1204-1208
: Panic risk withexpect
when downcasting counts column.Using
expect()
can cause panics if the schema is unexpected.Replace with safe error handling:
-let counts = rb - .column(0) - .as_any() - .downcast_ref::<Int64Array>() - .expect("Counts should be Int64Array"); +let Some(counts) = rb + .column(0) + .as_any() + .downcast_ref::<Int64Array>() else { + warn!("Unexpected type for count column in stats query"); + continue; +};
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)
495-558
: Consider improving error visibility for statistics calculation.The async conversion and statistics calculation logic look good. However, errors in the spawned task are only logged as warnings, which might make debugging difficult.
Consider adding metrics or alerting for statistics calculation failures to improve observability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/streams.rs
(19 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
🔇 Additional comments (7)
src/parseable/streams.rs (7)
29-49
: LGTM!The new imports are appropriate for the field statistics functionality.
76-96
: LGTM!The constant and struct definitions are well-designed for the statistics feature. The concurrency limit of 10 is reasonable for controlling resource usage.
1008-1031
: LGTM!Clean async conversion of the method.
1078-1103
: LGTM! Proper concurrency control implemented.Good implementation using
buffer_unordered
with a concurrency limit, addressing the performance concern from previous reviews.
1141-1154
: LGTM! Safe error handling implemented.Good implementation with proper error handling, avoiding panics by using safe downcasting.
1156-1186
: LGTM!Comprehensive type handling with proper null checks and error handling for timestamps.
1307-1308
: LGTM!Clean async conversion maintaining the same concurrency pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
src/parseable/streams.rs (4)
686-707
:record_batches.push(record.clone())
duplicates data during parquet writeInside
write_parquet_part_file
every batch is cloned and pushed into aVec
, again inflating memory usage proportional to the entire parquet file size.-// Collect record batches for finding statistics later -record_batches.push(record.clone()); +// Avoid cloning – either: +// 1. stream statistics directly from `record`, or +// 2. push only lightweight meta (row_count / value histogram)The previous reviews already highlighted this; please fix or justify why full duplication is unavoidable.
510-535
: Unbounded RB cloning + detached task may exhaust memory
record_batches
clones everyRecordBatch
, stores the whole vector, and then hands it to an un-awaited background task.
For large ingests this doubles RAM (already flagged earlier) and leaves the memory live until the async task finishes, which can easily happen long after the calling request completes.-let stats_rbs = rbs.clone(); -tokio::spawn(async move { +tokio::spawn(async move {
- Stream stats can be computed from the freshly written parquet (
ctx.read_parquet
) – no need to materialise batches.- If you must keep the in-memory path, stream the batches or cap the collected rows (e.g. first N MB).
- Consider
tokio::task::spawn_blocking
/tokio::spawn
withJoinSet
so the caller can await or cancel on shutdown.
1113-1126
: Field name interpolation still allows SQL injection / malformed identifiers
calculate_single_field_stats
and other helpers build SQL withformat!("… \"{field_name}\" …")
without escaping.
Any double quote infield_name
breaks the query; future external datasets could exploit this.Please escape
"
→""
or, better, build logical plans with DataFusion’s API instead of raw SQL.
1194-1207
:expect
may panic on unexpected schema – regressions remainThe earlier feedback to replace
expect("Counts should be Int64Array")
with graceful handling hasn’t been applied.
An unexpected data type will still bring the whole flush down.Replace with safe down-cast & early-return/log warn as proposed before.
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)
1168-1174
:Date32Array
formatted as raw integer
format_arrow_value
returns the raw day offset forDate32Array
, which is not human-friendly and inconsistent with the other prettified formats.Consider converting to an ISO date:
-} else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() { - return arr.value(idx).to_string(); +} else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() { + let days = arr.value(idx); + return chrono::NaiveDate::from_ymd_opt(1970, 1, 1) + .unwrap() + .checked_add_signed(chrono::Duration::days(days.into())) + .map(|d| d.to_string()) + .unwrap_or_else(|| days.to_string()); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/streams.rs
(19 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/parseable/streams.rs (8)
src/handlers/http/ingest.rs (1)
ingest
(51-127)src/handlers/http/modal/utils/ingest_utils.rs (1)
flatten_and_push_logs
(51-93)src/metadata.rs (2)
schema
(146-151)new
(95-130)src/parseable/mod.rs (5)
custom_partition
(701-701)custom_partition
(872-872)new
(136-148)serde_json
(276-276)serde_json
(282-282)src/parseable/staging/reader.rs (3)
new
(196-214)try_new
(46-75)try_new
(93-115)src/utils/time.rs (1)
new
(59-61)src/event/format/mod.rs (1)
new
(126-131)src/parseable/staging/writer.rs (1)
try_new
(57-72)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
src/parseable/streams.rs (1)
688-707
: Record-batch cloning can explode memory usage
record_batches.push(record.clone())
stores every batch written to parquet in an in-memoryVec
.
For large ingests this can easily reach gigabytes and duplicate the data already buffered by ArrowWriter.Minimal mitigation:
- writer.write(record)?; - // Collect record batches for finding statistics later - record_batches.push(record.clone()); + writer.write(record)?; + if PARSEABLE.options.collect_dataset_stats { + record_batches.push(record.clone()); + }Long-term: compute statistics directly from the freshly-written parquet (or a streaming iterator) instead of materialising all batches.
Also applies to: 742-744
♻️ Duplicate comments (3)
src/parseable/streams.rs (3)
29-36
:buffer_unordered
won’t compile – wrongStreamExt
import
buffer_unordered
is implemented for streams in thefutures
/futures_util
crate that have the same origin as the extension trait you import.
Here we importfutures_util::StreamExt
, but later create the iterator withfutures::stream::iter
(line 1100).
The two crates expose different blanket‐impls, and the trait fromfutures_util
is not in scope for thefutures::stream::Iter
type, causing a compilation error exactly as highlighted in the previous review.-use futures_util::StreamExt; +use futures_util::StreamExt; // keep if you still need it elsewhere +use futures::stream::StreamExt as _; // brings the blanket-impl for `futures::stream::*`Alternatively, stick to one crate:
- futures::stream::iter(field_futures) + futures_util::stream::iter(field_futures)Either fix unblocks the build.
1052-1056
: Whole-datasetMemTable
duplicates data again
MemTable::try_new(schema, vec![record_batches])
loads every cloned record batch into yet another in-memory copy, doubling RAM over the previous comment.Prefer
MemTable::try_new_with_reader
or query the parquet file on disk:- let mem_table = MemTable::try_new(schema.clone(), vec![record_batches])? + let parquet_path = /* path to the parquet you just wrote */; + let mem_table = ctx.read_parquet(parquet_path, Default::default())?;This keeps memory proportional to query buffer size.
1196-1200
: Field name still interpolated directly into SQL – injection / syntax riskThe field name originates from external schemas and is placed inside the SQL string without any escaping or validation.
Past reviews already highlighted this.
At minimum escape"
→""
or, better, build the logical plan through DataFusion’s API instead of raw SQL.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/cli.rs
(1 hunks)src/handlers/http/prism_home.rs
(3 hunks)src/parseable/streams.rs
(19 hunks)src/prism/home/mod.rs
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- src/prism/home/mod.rs
- src/cli.rs
- src/handlers/http/prism_home.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
read the record batches from arrow files in staging directory run datafusion queries to fetch count, distinct count and count for each distinct values for all fields in the dataset store in <dataset>_pmeta dataset UI to call below SQL query to fetch the stats from this dataset- ``` SELECT field_name, field_count distinct_count, distinct_value, distinct_value_count FROM ( SELECT field_stats_field_name as field_name, field_stats_distinct_stats_distinct_value as distinct_value, SUM(field_stats_count) as field_count, field_stats_distinct_count as distinct_count, SUM(field_stats_distinct_stats_count) as distinct_value_count, ROW_NUMBER() OVER ( PARTITION BY field_stats_field_name ORDER BY SUM(field_stats_count) DESC ) as rn FROM <dataset>_pmeta WHERE field_stats_field_name = 'status_code' AND field_stats_distinct_stats_distinct_value IS NOT NULL GROUP BY field_stats_field_name, field_stats_distinct_stats_distinct_value, field_stats_distinct_count ) ranked WHERE rn <= 5 ORDER BY field_name, distinct_value_count DESC; ```
1b237e8
to
ef08564
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🔭 Outside diff range comments (1)
src/parseable/mod.rs (1)
355-386
: Missing validation may allow invalid custom-partition values
create_stream_if_not_exists
now exposescustom_partition
, but unlike the higher-levelcreate_update_stream
it never validates the supplied value. A caller could therefore create a stream with an invalid key (e.g. multiple comma-separated fields) that later operations will reject, leading to inconsistent state between memory and object storage.Add the same guard used elsewhere before delegating to
create_stream
:@@ - // For distributed deployments, if the stream not found in memory map, + // Validate custom-partition early to avoid persisting illegal metadata + if let Some(partition) = custom_partition { + validate_custom_partition(partition)?; + } + + // For distributed deployments, if the stream not found in memory map, //check if it exists in the storageThis keeps the fast-path lightweight while ensuring invariants are upheld.
♻️ Duplicate comments (3)
src/sync.rs (3)
199-211
: Same observation as above – consider configurableLOCAL_SYNC_THRESHOLD
247-256
: Startup local-sync threshold is still compile-time only
259-270
: Startup object-store sync threshold mirrors earlier comment
🧹 Nitpick comments (7)
src/sync.rs (2)
33-35
: Prefer alphabetic grouping or a dedicatedconfig
module for constant importsImporting many top-level constants in a single curly-brace list quickly gets unwieldy as the list grows. Two lightweight nits you may want to consider:
- Keep the list alphabetically sorted to reduce merge conflicts.
- Move runtime-tunable constants into a distinct
config
orconstants
module anduse crate::config::*;
– it scales better as the constant set expands.Neither change is blocking, just a readability tweak.
134-147
: Make sync-duration thresholds runtime-tunable
OBJECT_STORE_SYNC_THRESHOLD
is now hard-coded. When large back-fills or slower object stores are involved, 15 s can produce a lot of “took longer than expected” noise in the logs.
Consider one of these follow-ups:-const OBJECT_STORE_SYNC_THRESHOLD: Duration = Duration::from_secs(15); +/// Defaults to 15 s – can be overridden via `PARSEABLE_OBJECT_SYNC_THRESHOLD` env-var. +lazy_static! { + pub static ref OBJECT_STORE_SYNC_THRESHOLD: Duration = + std::env::var("PARSEABLE_OBJECT_SYNC_THRESHOLD") + .ok() + .and_then(|s| s.parse::<u64>().ok()) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(15)); +}This keeps the default behaviour unchanged while letting operators tune thresholds without recompilation.
src/parseable/mod.rs (1)
355-386
: Minor: update docs / signature commentIf you maintain API-level comments or public docs for
create_stream_if_not_exists
, remember to mention the newcustom_partition
argument and its semantics (single key, comma-separated list not allowed).src/query/mod.rs (1)
101-118
:create_session_context
re-createsSessionState
each callEvery call allocates a fresh
SessionState
(and thus new memory limits, feature flags, etc.).
If the intent is to have one shared state, it would be cheaper and simpler to wrapQUERY_SESSION_STATE.clone()
instead of rebuilding from scratch, then only add the schema provider when first initialising it.src/storage/object_storage.rs (3)
1002-1026
: Hard-coded concurrency constant
MAX_CONCURRENT_FIELD_STATS = 10
is baked into the code.
Expose it as a CLI / config option so operators can tune for their hardware and dataset size.
1028-1058
: Unnecessary string interpolation & quotingBuilding SQL with
select count(\"{field_name}\") … from \"{stream_name}\"
is brittle and risks SQL-injection-like breakage if field names contain quotes.Prefer the logical plan API (
ctx.table(stream_name)?.aggregate(..)
) or at least pre-escape double quotes.
1151-1183
:select count(*) … group by
aliases the aggregate asdistinct_count
Arrow/DataFusion allow this today, but SQL engines differ and future versions may reject the alias in
ORDER BY
.
Safer:select count(*) as cnt … order by cnt desc
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
src/cli.rs
(1 hunks)src/handlers/http/ingest.rs
(4 hunks)src/handlers/http/prism_home.rs
(3 hunks)src/lib.rs
(1 hunks)src/parseable/mod.rs
(3 hunks)src/parseable/streams.rs
(10 hunks)src/prism/home/mod.rs
(3 hunks)src/query/mod.rs
(4 hunks)src/storage/object_storage.rs
(4 hunks)src/sync.rs
(5 hunks)src/utils/mod.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/lib.rs
🚧 Files skipped from review as they are similar to previous changes (5)
- src/utils/mod.rs
- src/cli.rs
- src/prism/home/mod.rs
- src/handlers/http/prism_home.rs
- src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (2)
src/handlers/http/ingest.rs (1)
99-106
: New argument aligns with updated API – good catchAll four call-sites correctly pass
None
for the newly-addedcustom_partition
parameter, keeping the ingestion paths behaviour-compatible with earlier versions. No further action required.Also applies to: 183-190, 249-256, 315-322
src/parseable/mod.rs (1)
328-336
: Internal stream path updated consistentlyThe internal-stream bootstrap now also forwards the new
custom_partition
parameter (None
), keeping the helper in sync with the public helper. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/connectors/kafka/processor.rs (2)
54-60
: ExplicitNone
aligns with new API but forfeits early custom-partition propagation
The new fourth argument matches the updatedcreate_stream_if_not_exists
signature, so this compiles fine.
However, we already computecustom_partition
a few lines later (66). Passing that value here—when it isSome(_)
—would keep the catalog in sync from the moment the stream is first created and avoid a subsequent update path.- .create_stream_if_not_exists( - stream_name, - StreamType::UserDefined, - None, - vec![log_source_entry], - ) + .create_stream_if_not_exists( + stream_name, + StreamType::UserDefined, + custom_partition.clone(), // or derive from the first record + vec![log_source_entry], + )Worth considering unless there is a strong reason to defer the partition assignment.
73-78
: Silently dropping invalid JSON hides data quality issues
Any payload that failsserde_json::from_slice
is ignored without trace. This can mask format errors and make debugging harder.- if let Ok(value) = serde_json::from_slice::<Value>(record) { - json_vec.push(value); - } + match serde_json::from_slice::<Value>(record) { + Ok(value) => json_vec.push(value), + Err(e) => { + tracing::warn!(error = %e, "Dropping invalid JSON payload"); + // TODO: increment metric for malformed records + } + }Adding at least a warn-level log (or a metric) will surface bad data early without polluting normal logs.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/connectors/kafka/processor.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)
1072-1080
:query_single_i64
still silently masks errors
Same concern as raised in earlier reviews: all?
results converted toNone
, making real failures indistinguishable from “no rows”. Propagate the error withResult<i64, DataFusionError>
and assert exactly one row to surface upstream problems.
🧹 Nitpick comments (2)
src/handlers/http/health_check.rs (1)
68-72
: Consider avoiding the second full sync loop
perform_sync_operations()
is executed twice whencollect_dataset_stats
is enabled.
The first call already flushes Arrow → Parquet and uploads all objects, including the stats Parquet generated during that step, so the second run usually finds nothing new and just burns CPU / S3 requests.A lighter alternative:
- perform_sync_operations().await; - if PARSEABLE.options.collect_dataset_stats { - perform_sync_operations().await; - } + perform_sync_operations().await; // flush + upload data + if PARSEABLE.options.collect_dataset_stats { + // only stats stream needs a second flush + perform_local_sync().await; + perform_object_store_sync().await; + }Keeps the “double-flush for stats” semantics but avoids redundant work on the other streams.
src/storage/object_storage.rs (1)
1083-1095
: Nit: log message prints data type twice
warn!("Expected {} for {:?}, but found {:?}", …)
prints the actual type twice.
One of the{:?}
placeholders can be dropped:-warn!("Expected {} for {:?}, but found {:?}", stringify!($ty), $arr.data_type(), $arr.data_type()); +warn!("Expected {} but found {:?}", stringify!($ty), $arr.data_type());
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/cli.rs
(1 hunks)src/handlers/http/health_check.rs
(2 hunks)src/parseable/mod.rs
(3 hunks)src/storage/object_storage.rs
(5 hunks)src/sync.rs
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- src/sync.rs
- src/parseable/mod.rs
- src/cli.rs
🧰 Additional context used
🧠 Learnings (2)
src/handlers/http/health_check.rs (1)
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.971Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.
src/storage/object_storage.rs (1)
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.037Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (2)
src/storage/object_storage.rs (2)
978-983
: Table name may still contain invalid SQL identifier characters
ctx_table_name
replaces only.
with_
.
Partitioned filenames typically include=
,-
, or%
, which are not valid in unquoted identifiers:date=2025-06-18_time=12-30-00_part-0000.parquet
If such a file slips through,
register_parquet
succeeds, but the subsequent
SQL queries (from "{table_name}"
) will raise a syntax error.Sanitise all non-alphanumeric characters:
let ctx_table_name = regex::Regex::new("[^A-Za-z0-9_]") .unwrap() .replace_all(&format!("{}_{}", stream_name, parquet_file_name), "_");or avoid raw SQL altogether by using the DataFrame API.
880-893
: Per-file stats calculation can become very expensive
calculate_field_stats
scans every Parquet file individually, executing
2 + N_distinct_query
full-column scans per field.
For wide tables (hundreds of columns) this introduces noticeable ingest latency.If staging-file lifetime allows, consider:
- aggregating stats at the stream level after the loop, or
- running a single
DESCRIBE
+COUNT(1)
+APPROX_DISTINCT
query per field usingapprox_distinct()
UDF.This would cut the number of required scans dramatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)
1072-1081
:query_single_i64
still swallows every errorAll
?
are immediately followed by.ok()?
, so:
- any SQL / DataFusion error is silenced;
- callers cannot distinguish “no-rows” from “execution failed”.
This was pointed out in the previous review and is still unresolved.
Please return aResult<i64, DataFusionError>
(or custom error) and propagate it upward.
🧹 Nitpick comments (1)
src/storage/object_storage.rs (1)
1107-1195
:format_arrow_value
vastly improved, but still misses complex & decimal typesArrow
Decimal*
,Dictionary
,LargeUtf8
,LargeBinary
,List
,Struct
, etc., will all fall through to"UNSUPPORTED"
.
If these appear the UI loses valuable information.Extending support (or at least logging the field name for easier debugging) would make the stats more future-proof.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/storage/object_storage.rs
(5 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/storage/object_storage.rs (2)
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.037Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:31-41
Timestamp: 2025-06-18T11:15:10.816Z
Learning: DataFusion's parquet reader defaults to using view types (Utf8View, BinaryView) when reading parquet files via the schema_force_view_types configuration (default: true). This means StringViewArray and BinaryViewArray downcasting is required when processing Arrow arrays from DataFusion parquet operations, even though these types are behind nightly feature flags.
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-apple-darwin
🔇 Additional comments (1)
src/storage/object_storage.rs (1)
1030-1035
: Nice use ofbuffer_unordered
to cap concurrent field queries
Thefilter_map(std::future::ready)
trick cleanly dropsNone
s without extra boiler-plate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
src/storage/object_storage.rs (2)
980-985
: Table alias can still contain illegal SQL identifier charactersReplacing
.
with_
is an improvement, butstream_name
and the ULID may still contain-
,/
, or other characters that are invalid in identifiers.
If any such character slips through, every downstreamctx.sql(...)
call will fail.Consider sanitising with
char::is_ascii_alphanumeric
:-let ctx_table_name = format!("{}_{}_{}", stream_name, parquet_file_name, random_suffix); +let ctx_table_name = format!("{}_{}_{}", stream_name, parquet_file_name, random_suffix) + .chars() + .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' }) + .collect::<String>();
1014-1064
: Still two full scans per column – heavy for wide datasets
calculate_single_field_stats
issues
COUNT, COUNT(DISTINCT)
GROUP BY … ORDER BY …
for every field.
On a 100-column table that’s 200 full passes over the file.The first query could aggregate all columns in one shot by projecting the whole set of
COUNT(DISTINCT col)
expressions, reducing I/O dramatically.
Only fire the second (top-N) query for columns whose distinct count is under the UI threshold.This was pointed out earlier; the new implementation reduces 3 scans → 2, but the cost is still quadratic.
🧹 Nitpick comments (2)
src/storage/object_storage.rs (2)
1096-1184
: Missing handling forLargeUtf8
/LargeBinary
& other common Arrow types
format_arrow_value
now covers many cases, but high-cardinality text columns read via
LargeUtf8
/LargeBinary
will fall into the default branch and log “UNSUPPORTED”.
That means such values are silently replaced by the literal UNSUPPORTED in
the stats output, skewing UI counts.Add at least:
+ DataType::LargeUtf8 => try_downcast!(arrow_array::LargeStringArray, array, |arr: &arrow_array::LargeStringArray| arr.value(idx).to_string()), + DataType::LargeBinary => try_downcast!(arrow_array::LargeBinaryArray, array, |arr: &arrow_array::LargeBinaryArray| { + String::from_utf8_lossy(arr.value(idx)).to_string() + }),(or gate them behind the same view-type feature flag).
Failing gracefully is good; covering the common large types is better.
1030-1034
: Nit:filter_map(std::future::ready)
is concise but obscures intentThe closure
std::future::ready
simply forwards theOption
, but readers have to mentally
unpack the trait gymnastics. Replacing with an explicit inline closure improves readability
with zero performance cost.- .filter_map(std::future::ready) + .filter_map(|opt| std::future::ready(opt))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/storage/object_storage.rs
(5 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/storage/object_storage.rs (2)
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.037Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:31-41
Timestamp: 2025-06-18T11:15:10.816Z
Learning: DataFusion's parquet reader defaults to using view types (Utf8View, BinaryView) when reading parquet files via the schema_force_view_types configuration (default: true). This means StringViewArray and BinaryViewArray downcasting is required when processing Arrow arrays from DataFusion parquet operations, even though these types are behind nightly feature flags.
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/storage/object_storage.rs (1)
969-1016
: Solid stats calculation orchestration with minor optimization opportunity.The function structure is well-designed with proper error handling and async patterns. Consider a small optimization:
- Some(&"dataset_name".into()), + Some("dataset_name"),This avoids unnecessary String allocation if the function accepts
&str
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/storage/object_storage.rs
(7 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/storage/object_storage.rs (2)
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.037Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:31-41
Timestamp: 2025-06-18T11:15:10.816Z
Learning: DataFusion's parquet reader defaults to using view types (Utf8View, BinaryView) when reading parquet files via the schema_force_view_types configuration (default: true). This means StringViewArray and BinaryViewArray downcasting is required when processing Arrow arrays from DataFusion parquet operations, even though these types are behind nightly feature flags.
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (6)
src/storage/object_storage.rs (6)
20-82
: LGTM! Comprehensive imports for statistics functionality.All imports are appropriate and used by the new field statistics collection feature.
944-964
: Well-designed data structures for statistics representation.The hierarchical structure cleanly models dataset → field → distinct value statistics, and the concurrency limit is appropriate.
1021-1045
: Excellent concurrent processing implementation.Well-structured use of
buffer_unordered
with proper concurrency limits and lifetime management.
1050-1112
: Excellent error handling and streaming implementation.Great improvement in error handling patterns - using
match
statements and graceful failure handling instead of panicking. The streaming approach properly handles large result sets.
1114-1260
: Robust SQL generation and value formatting implementation.The CTE-based query is efficient and properly escaped. The value formatting function comprehensively handles Arrow types with safe downcasting and graceful fallbacks.
828-828
: Well-integrated stats collection with proper safeguards.Excellent integration approach with recursion prevention, configuration control, and non-blocking error handling. The async task spawning for sync operations maintains good performance characteristics.
Also applies to: 881-917
read the record batches from arrow files in staging directory run datafusion queries to fetch count, distinct count and count for each distinct values for all fields in the dataset
store in _pmeta dataset
UI to call below SQL query to fetch the stats from this dataset-
Summary by CodeRabbit
New Features
Improvements
Other Changes