Skip to content

refactor: continue reorganizing/streamlining ingestion flow #1190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Feb 16, 2025

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Introduced enhanced asynchronous log ingestion that supports multiple log sources, improving schema management and error feedback.
    • Added new methods for processing incoming JSON logs, enhancing log processing and schema management.
  • Refactor

    • Streamlined event processing and record handling for more direct operations.
    • Improved query error reporting with clearer messages tailored to missing streams and schema updates.
    • Updated stream management to handle multiple custom partitions for more flexible operations.
  • Chores

    • Removed legacy ingestion utilities and outdated modules to maintain a leaner, more maintainable codebase.

Sorry, something went wrong.

Devdutt Shenoi added 5 commits February 16, 2025 11:38

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Copy link
Contributor

coderabbitai bot commented Feb 16, 2025

Walkthrough

This update streamlines event processing and log ingestion across multiple modules by refactoring method signatures and internal flows. The Kafka connector’s event creation is merged into inline processing, and the Event struct now explicitly requires a stream reference. HTTP ingestion handlers now retrieve stream objects via a centralized PARSEABLE module and use new stream methods for log processing. Redundant modules and functions have been removed, and enhanced error handling has been introduced for schema commitments and Arrow-related operations. Additionally, stream metadata handling has been updated to support multiple custom partitions.

Changes

File(s) Change Summary
src/connectors/kafka/processor.rs Renamed build_event_from_chunk to process_event_from_chunk; inlined event processing (now returns Ok(())); updated Processor trait to call the new method.
src/event/mod.rs Updated process and process_unchecked methods to accept a stream parameter; removed the commit_schema function and adjusted stream management.
src/handlers/http/ingest.rs Refactored log ingestion to retrieve stream objects via PARSEABLE.get_stream; updated calls from direct function invocations to stream method calls.
src/handlers/http/query.rs Enhanced error handling by adding StreamNotFound and Arrow error variants; updated schema commit logic to utilize stream objects.
src/parseable/mod.rs Revised export list to include Stream; updated stream metadata handling for multiple custom partitions.
src/parseable/streams.rs Added asynchronous methods flatten_and_push_logs and push_logs; updated commit_schema and changed partition extraction methods.
src/lib.rs Added new module: mod kinesis;.
src/handlers/http/mod.rs Removed the mod kinesis; declaration.
src/handlers/http/modal/utils/ingest_utils.rs File deleted; removed multiple utility functions for log/event processing including log flattening and record batch creation.
src/handlers/http/modal/utils/mod.rs Removed the public declaration for the ingest_utils module.

Sequence Diagram(s)

Loading
sequenceDiagram
  participant Client as HTTP Request
  participant Ingest as Ingest Handler
  participant Parseable as PARSEABLE
  participant Stream as Stream Object
  participant Event as Event Processor

  Client->>Ingest: Send log data & stream name
  Ingest->>Parseable: get_stream(stream_name)
  Parseable-->>Ingest: Return Stream object
  Ingest->>Stream: flatten_and_push_logs(data, logSource)
  alt Event requires further processing
    Ingest->>Event: event.process(&Stream)
  end
  Stream-->>Ingest: Acknowledge processing
  Ingest-->>Client: Return success response
Loading
sequenceDiagram
  participant Kafka as Kafka Broker
  participant Consumer as Kafka Consumer
  participant Processor as ParseableSinkProcessor

  Kafka->>Consumer: Deliver record batch
  Consumer->>Processor: process_event_from_chunk(records)
  Processor->>Processor: Process event inline (invoke process method)
  Processor-->>Consumer: Return Ok(())

Possibly related PRs

  • refactor: process ain't async #1180: The changes in the main PR are related to the modifications in the process method of the ParseableSinkProcessor class, as both PRs involve refactoring the event processing logic and the handling of the process method in the Event struct.

Suggested reviewers

  • nikhilsinhaparseable
  • nitisht

Poem

Hop, hop, hop—I'm off to code away,
New streams and flows brighten my day.
Logs and events now dance in the light,
Clean and swift, they make my heart bright.
With every refactor, bugs flee in delight,
A bunny’s cheer for a change that feels right! 🐇


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 920bcae and 431f252.

📒 Files selected for processing (4)
  • src/connectors/kafka/processor.rs (4 hunks)
  • src/event/mod.rs (4 hunks)
  • src/handlers/http/ingest.rs (21 hunks)
  • src/parseable/streams.rs (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/ingest.rs
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (7)
src/event/mod.rs (3)

20-28: LGTM! Import changes look good.

The imports have been reorganized and updated to match the new functionality.


47-85: LGTM! The process method has been refactored to use the stream parameter directly.

The changes improve encapsulation by having the stream object passed in rather than retrieved from a global state. This is a good practice as it makes the code more testable and the dependencies more explicit.


87-99: LGTM! The process_unchecked method has been similarly refactored.

The changes are consistent with the process method, maintaining a uniform approach to stream handling.

src/connectors/kafka/processor.rs (1)

45-87: LGTM! The Kafka processor has been refactored to process events directly.

The changes improve the code by:

  1. Removing unnecessary intermediate event object creation
  2. Directly processing events using the stream object
  3. Better error handling with early stream existence check
src/parseable/streams.rs (3)

119-158: LGTM! The flatten_and_push_logs method provides a clean interface for different log sources.

The method handles various log sources (Kinesis, OtelLogs, OtelTraces, OtelMetrics) with appropriate flattening logic for each type.


776-790: LGTM! The commit_schema method has been improved.

The method now properly merges schemas and handles the update atomically.


895-902: LGTM! The get_parsed_timestamp function provides good error handling.

The function properly handles missing fields and parsing errors with appropriate error types.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
src/parseable/streams.rs (4)

115-136: Consider preserving the original log source for Kinesis.
Currently, Kinesis logs are flattened and then passed to push_logs using LogSource::default(). If you need to preserve the specific Kinesis source for downstream processing or reporting, consider reusing the original log source instead. Otherwise, the current approach is fine if the new source is intended to be generic.


138-201: Guard against potential panic from unwrap().
The call serde_json::to_vec(&value).unwrap() at line 166 may panic if serialization fails unexpectedly. Although unlikely, consider replacing .unwrap() with .expect("...") or handling the error to avoid a possible runtime panic.


753-768: Simplify schema replacement to improve readability.
The current code uses std::mem::replace(map, updated_schema). A simpler pattern might be *map = updated_schema; or using map.clear() then inserting, which can be more readable and explicit.


873-881: Ensure that date formats match expectations.
The current code supports parsing an RFC 3339-style string as a DateTime<Utc> and returns an error otherwise. For broader date format coverage, consider utilizing more flexible parsing or fallback strategies.

src/event/mod.rs (1)

49-66: Pass &Stream for event processing to unify schema logic.
This approach centralizes schema commitment (commit_schema) and pushing records (push) in the Stream, improving encapsulation. As a minor optimization, consider consolidating consecutive lock acquisitions in a single block if it becomes a performance bottleneck. Otherwise, this looks good.

Also applies to: 87-98

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 523ecc7 and b7dd383.

📒 Files selected for processing (10)
  • src/connectors/kafka/processor.rs (4 hunks)
  • src/event/mod.rs (4 hunks)
  • src/handlers/http/ingest.rs (21 hunks)
  • src/handlers/http/mod.rs (0 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (0 hunks)
  • src/handlers/http/modal/utils/mod.rs (0 hunks)
  • src/handlers/http/query.rs (4 hunks)
  • src/lib.rs (1 hunks)
  • src/parseable/mod.rs (1 hunks)
  • src/parseable/streams.rs (5 hunks)
💤 Files with no reviewable changes (3)
  • src/handlers/http/modal/utils/mod.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (13)
src/parseable/streams.rs (1)

31-32: No issues with the updated imports.
All newly introduced or modified imports appear consistent with their usages.

Also applies to: 44-44, 49-53, 61-64

src/lib.rs (1)

32-32: Newly added kinesis module looks fine.
The addition of mod kinesis; aligns well with the usage in other files.

src/event/mod.rs (1)

20-30: Updated imports look consistent.
No detectable issues found.

src/connectors/kafka/processor.rs (2)

45-88: LGTM! The refactoring improves encapsulation and streamlines event processing.

The changes effectively:

  1. Rename the method to better reflect its purpose
  2. Remove intermediate event object construction
  3. Process events directly using the stream object
  4. Return a simple success/failure indicator

This aligns well with the PR's objective of streamlining the ingestion flow.


111-112: LGTM! Updated process method to use the new event processing flow.

The change correctly updates the process method to use the new process_event_from_chunk method.

src/handlers/http/query.rs (3)

22-22: LGTM! Enhanced error handling with specific error types.

Added imports for ArrowError and StreamNotFound to support more granular error handling.

Also applies to: 42-42


177-179: LGTM! Improved schema update flow using stream object.

The change correctly uses the stream object to commit schema changes, aligning with the PR's objective of improving encapsulation.


323-326: LGTM! Added specific error variants for better error handling.

Added StreamNotFound and Arrow error variants to QueryError enum for more precise error reporting.

src/handlers/http/ingest.rs (4)

72-75: LGTM! Improved encapsulation using stream object for log processing.

The change correctly retrieves the stream object and uses it for log processing, aligning with the PR's objective of improving encapsulation.


83-101: LGTM! Enhanced internal stream processing using stream object.

The changes correctly:

  1. Retrieve the stream object
  2. Use the stream's schema
  3. Process events using the stream object

128-134: LGTM! Standardized OTEL ingestion using stream objects.

The changes consistently use stream objects across all OTEL ingestion handlers (logs, metrics, traces), improving code consistency and maintainability.

Also applies to: 164-170, 197-203, 244-248, 257-269


397-399: LGTM! Updated test cases to use json::Event struct.

The changes correctly update test cases to use the json::Event struct, maintaining consistency with the new event processing approach.

Also applies to: 425-427, 457-459, 489-491, 507-509, 548-550, 645-647, 694-696, 819-823

src/parseable/mod.rs (1)

31-31: LGTM! Added Stream type to public exports.

The change correctly exposes the Stream type, supporting the PR's objective of using stream objects for event processing.

use temp_dir::TempDir;
use tokio::time::sleep;

use super::*;

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Address test naming and correctness issues.

  1. Misspelled test method name: The word “partition” is spelled “parition” in the function names at lines 896, 905, and 913.
  2. Missing key test not triggered: In time_parition_not_in_json, the code still uses "timestamp", which exists in the JSON, so it never tests for a truly missing key.
    fn time_parition_not_in_json() {
        let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
    -   let parsed = get_parsed_timestamp(&json, "timestamp");
    +   let parsed = get_parsed_timestamp(&json, "non_existent_key");
        matches!(parsed, Err(PostError::MissingTimePartition(_)));
    }
  3. Unparseable date test not triggered: In time_parition_not_parseable_as_datetime, the date string "2025-05-15T15:30:00Z" is actually valid.
    fn time_parition_not_parseable_as_datetime() {
        let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
    -   let parsed = get_parsed_timestamp(&json, "timestamp");
    +   let json = json!({"timestamp": "invalid date"});
    +   let parsed = get_parsed_timestamp(&json, "timestamp");
        matches!(parsed, Err(PostError::SerdeError(_)));
    }

Also applies to: 896-902, 904-910, 912-918

Comment on lines +853 to +871
fn get_custom_partition_values(
json: &Value,
custom_partition_list: &[&str],
) -> HashMap<String, String> {
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
for custom_partition_field in custom_partition_list {
let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
let custom_partition_value = match custom_partition_value {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
};
custom_partition_values.insert(
custom_partition_field.trim().to_string(),
custom_partition_value,
);
}
custom_partition_values
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid panicking when a custom partition field is missing.
Using .unwrap() to retrieve values from the JSON can panic if the key is absent. Consider returning an error or defaulting to an empty string rather than panicking, ensuring robust handling of partial or invalid data.

-for custom_partition_field in custom_partition_list {
-    let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
+for custom_partition_field in custom_partition_list {
+    let custom_partition_value = match json.get(custom_partition_field.trim()) {
+        Some(v) => v.clone(),
+        None => {
+            // Return error or store a safe default
+            // return Err(PostError::MissingCustomPartition(...)) or use an empty string
+            Value::Null
+        }
+    };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn get_custom_partition_values(
json: &Value,
custom_partition_list: &[&str],
) -> HashMap<String, String> {
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
for custom_partition_field in custom_partition_list {
let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
let custom_partition_value = match custom_partition_value {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
};
custom_partition_values.insert(
custom_partition_field.trim().to_string(),
custom_partition_value,
);
}
custom_partition_values
}
fn get_custom_partition_values(
json: &Value,
custom_partition_list: &[&str],
) -> HashMap<String, String> {
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
for custom_partition_field in custom_partition_list {
- let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
+ let custom_partition_value = match json.get(custom_partition_field.trim()) {
+ Some(v) => v.clone(),
+ None => {
+ // Return error or store a safe default
+ // return Err(PostError::MissingCustomPartition(...)) or use an empty string
+ Value::Null
+ }
+ };
let custom_partition_value = match custom_partition_value {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
};
custom_partition_values.insert(
custom_partition_field.trim().to_string(),
custom_partition_value,
);
}
custom_partition_values
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
src/parseable/streams.rs (1)

888-913: ⚠️ Potential issue

Fix test naming and implementation issues.

  1. The test name time_parition_not_in_json has a typo and its implementation doesn't actually test for a missing key.
  2. The test time_parition_not_parseable_as_datetime uses a valid datetime string.
-fn time_parition_not_in_json() {
+fn time_partition_not_in_json() {
     let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
-    let parsed = get_parsed_timestamp(&json, "timestamp");
+    let parsed = get_parsed_timestamp(&json, "non_existent_key");
     matches!(parsed, Err(PostError::MissingTimePartition(_)));
 }

-fn time_parition_not_parseable_as_datetime() {
+fn time_partition_not_parseable_as_datetime() {
-    let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
+    let json = json!({"timestamp": "invalid date"});
     let parsed = get_parsed_timestamp(&json, "timestamp");
     matches!(parsed, Err(PostError::SerdeError(_)));
 }
🧹 Nitpick comments (2)
src/handlers/http/modal/utils/logstream_utils.rs (2)

32-37: Consider wrapping these type aliases in newtypes.
While aliases are fine, newtypes can help enforce domain invariants and provide stronger type safety.


116-128: Consider supporting suffixes other than 'd'.
The parse_time_partition_limit function currently only recognizes a 'd' suffix. If future expansions are required (e.g., hours, months), additional logic might be needed.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b7dd383 and 818b719.

📒 Files selected for processing (12)
  • src/handlers/http/ingest.rs (22 hunks)
  • src/handlers/http/logstream.rs (7 hunks)
  • src/handlers/http/modal/utils/logstream_utils.rs (3 hunks)
  • src/metadata.rs (3 hunks)
  • src/migration/mod.rs (2 hunks)
  • src/parseable/mod.rs (12 hunks)
  • src/parseable/streams.rs (16 hunks)
  • src/static_schema.rs (2 hunks)
  • src/storage/mod.rs (4 hunks)
  • src/storage/object_storage.rs (3 hunks)
  • src/utils/json/flatten.rs (7 hunks)
  • src/utils/json/mod.rs (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (24)
src/handlers/http/modal/utils/logstream_utils.rs (8)

19-20: Good use of NonZeroU32 for validation.
Ensures partition limit cannot be zero and enhances safety.


38-46: Well-structured custom error enum.
Leveraging thiserror is a clean approach for descriptive error reporting.


50-52: Clear separation of time partition and custom partitions.
Exposing them as separate fields makes it easier to handle distinct partitioning strategies.


59-61: Robust error handling with TryFrom.
Implementing TryFrom for HeaderMap is a neat way to group header parsing and validation.


73-77: Add checks for empty or invalid custom partitions.
Defaulting to an empty string might mask errors or lead to unintended behavior if partitions must be non-empty.


79-82: Neat construction of the PutStreamHeaders struct.
Props for keeping the field assignments concise and centralized.


96-98: Straightforward return of parsed headers.
Returning Ok(headers) at the end of the parse logic is clear and idiomatic.


102-114: Validate for empty partition strings if necessary.
This function ensures at most three partitions, but does not reject empty entries. Confirm whether they’re intended to be valid.

src/static_schema.rs (3)

61-63: Improved parameter handling for time and custom partitions.
Passing them as references to optional/string slices streamlines the function signature.


81-81: Properly flags time partition as a datetime field.
Switching to "datetime" when the field matches the optional time partition is consistent with typical time-based partitions.


119-126: Correctly raises an error when the time partition field is not found.
Ensures schemas remain valid if a time partition is specified.

src/metadata.rs (3)

86-86: Transition to custom_partitions: Vec<String> is consistent with multi-partition support.
Facilitates flexible partitioning strategies instead of a single partition field.


97-99: Optional time_partition and multiple custom_partitions enhance flexibility.
This design is more coherent with the rest of the code where partitions may or may not be specified.


112-114: Assigning time_partition and custom_partitions neatly.
Straightforward field initialization that aligns well with the changes in the constructor signature.

src/storage/mod.rs (1)

110-115: LGTM! The change to support multiple custom partitions is well-structured.

The transition from a single optional custom partition to a vector of custom partitions enhances flexibility while maintaining backward compatibility through custom serialization/deserialization.

Also applies to: 141-145

src/utils/json/mod.rs (1)

173-194: LGTM! Well-implemented serialization functions for backward compatibility.

The implementation correctly handles:

  • Deserialization of comma-separated strings to vector
  • Serialization of vector back to comma-separated string
  • Proper handling of empty vectors
src/migration/mod.rs (1)

275-275: LGTM! Field name updated consistently.

The change from custom_partition to custom_partitions maintains consistency with the struct definition changes.

src/utils/json/flatten.rs (1)

95-136: LGTM! Validation logic properly updated for multiple custom partitions.

The validation function now correctly:

  • Iterates over multiple custom partitions
  • Maintains all the necessary validation checks for each partition
  • Preserves the existing error handling
src/handlers/http/logstream.rs (1)

366-366: LGTM! Consistent with the custom partitions refactoring.

The change from custom_partition to custom_partitions aligns with the broader refactoring to support multiple custom partitions.

src/handlers/http/ingest.rs (2)

72-75: LGTM! Improved encapsulation of stream operations.

The refactoring to use get_stream and call methods directly on the stream object improves encapsulation and maintainability.


128-129: LGTM! Consistent stream object usage across handlers.

The same pattern of retrieving and using stream objects is consistently applied across all handlers, including OTEL ingestion.

Also applies to: 164-165, 197-198

src/storage/object_storage.rs (1)

179-194: LGTM! Enhanced custom partitions handling.

The changes improve the handling of custom partitions:

  1. Method renamed to reflect multiple partitions support
  2. Early return optimization for empty partitions
  3. Type change from optional single partition to slice of partitions
src/parseable/streams.rs (2)

115-136: LGTM! Well-structured log flattening and routing.

The new flatten_and_push_logs method provides clear routing logic for different log sources, with proper error handling for unsupported formats.


138-195: LGTM! Comprehensive log processing implementation.

The push_logs method provides thorough handling of:

  • Time partitioning
  • Custom partitioning
  • Schema versioning
  • Event processing

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/static_schema.rs (2)

81-84: Consider validating the field type before mutation.

While the time partition check is correct, consider validating that the field can be safely converted to a datetime type before mutation.

 if time_partition.as_ref().is_some_and(|p| p == &field.name) {
     time_partition_exists = true;
+    if !matches!(field.data_type.as_str(), "datetime" | "string") {
+        return Err(anyhow!("time partition field must be of type datetime or string"));
+    }
     field.data_type = "datetime".to_string();
 }

119-127: Simplify error message formatting.

The format! macro is unnecessary here since anyhow! already supports string interpolation.

-            return Err(anyhow! {
-                format!(
-                    "time partition field {time_partition} does not exist in the schema for the static schema logstream"
-                ),
-            });
+            return Err(anyhow!(
+                "time partition field {time_partition} does not exist in the schema for the static schema logstream"
+            ));
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 818b719 and f27475c.

📒 Files selected for processing (1)
  • src/static_schema.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (2)
src/static_schema.rs (2)

59-63: LGTM! Improved type safety and partition handling.

The function signature changes enhance type safety with optional time partition and better support for multiple custom partitions.


70-78: LGTM! Efficient custom partitions validation.

The validation efficiently checks each custom partition's existence and provides clear error messages.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/static_schema.rs (1)

119-123: Enhance error message clarity for time partition validation.

While the validation logic is correct, the error message could be more specific about whether the time partition is required or optional.

Consider updating the error message to be more explicit:

-            return Err(anyhow!("time partition field {time_partition} does not exist in the schema for the static schema logstream"));
+            return Err(anyhow!("specified time partition field '{time_partition}' does not exist in the schema for the static schema logstream"));
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 67b577f and f75f0c6.

📒 Files selected for processing (1)
  • src/static_schema.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (2)
src/static_schema.rs (2)

61-62: LGTM! Parameter changes improve type safety and flexibility.

The changes to make time_partition optional and use a slice for custom_partitions enhance the API's flexibility while maintaining type safety.


70-78: LGTM! Clear and efficient custom partition validation.

The validation logic is well-structured with:

  • Early return on validation failure
  • Clear error messages including the specific partition name
  • Efficient iteration using .any()

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

646-668: Consider consolidating duplicate error handling.

The time partition validation is duplicated between this function and the create_stream method (lines 460-469). Consider extracting this validation into a shared helper function.

+ fn validate_partition_conflicts(
+     time_partition: &Option<String>,
+     custom_partitions: &[String],
+ ) -> Result<(), CreateStreamError> {
+     if let Some(time_partition) = time_partition {
+         if custom_partitions.contains(time_partition) {
+             return Err(CreateStreamError::Custom {
+                 msg: format!("time partition {time_partition:?} cannot be set as custom partition"),
+                 status: StatusCode::BAD_REQUEST,
+             });
+         }
+     }
+     Ok(())
+ }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f75f0c6 and 492a915.

📒 Files selected for processing (1)
  • src/parseable/mod.rs (12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (3)
src/parseable/mod.rs (3)

31-31: LGTM! Enhanced module exports.

The addition of Stream to the public exports improves the module's API accessibility.


322-326: LGTM! Improved metadata handling.

The changes correctly handle multiple custom partitions and maintain consistent naming.


460-469: LGTM! Added crucial validation.

The validation prevents potential conflicts by ensuring time partition is not duplicated as a custom partition.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/parseable/streams.rs (2)

875-893: ⚠️ Potential issue

Handle missing custom partition fields gracefully.

Using .unwrap() on JSON values can panic if a field is missing.

Consider returning an error or defaulting to an empty string:

-let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
+let custom_partition_value = match json.get(custom_partition_field.trim()) {
+    Some(value) => value.to_owned(),
+    None => Value::String("".to_string()),
+};

917-940: ⚠️ Potential issue

Address test naming and correctness issues.

The test cases have several issues:

  1. Misspelled test method names: "parition" instead of "partition"
  2. Tests don't effectively test error conditions

Apply these corrections:

-fn parse_time_parition_from_value() {
+fn parse_time_partition_from_value() {
     let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
     let parsed = get_parsed_timestamp(&json, "timestamp");
     let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap();
     assert_eq!(parsed.unwrap(), expected);
 }

-fn time_parition_not_in_json() {
+fn time_partition_not_in_json() {
     let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
-    let parsed = get_parsed_timestamp(&json, "timestamp");
+    let parsed = get_parsed_timestamp(&json, "non_existent_key");
     matches!(parsed, Err(PostError::MissingTimePartition(_)));
 }

-fn time_parition_not_parseable_as_datetime() {
+fn time_partition_not_parseable_as_datetime() {
     let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
-    let parsed = get_parsed_timestamp(&json, "timestamp");
+    let json = json!({"timestamp": "invalid date"});
+    let parsed = get_parsed_timestamp(&json, "timestamp");
     matches!(parsed, Err(PostError::SerdeError(_)));
 }
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

776-790: Consider using RwLock::write() without expect.

The expect(LOCK_EXPECT) call could panic if the lock is poisoned.

Consider using a more graceful error handling approach:

-let map = &mut self.metadata.write().expect(LOCK_EXPECT).schema;
+let map = &mut self.metadata.write().map_err(|e| {
+    ArrowError::ExternalError(Box::new(e))
+})?.schema;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 492a915 and 920bcae.

📒 Files selected for processing (3)
  • src/handlers/http/ingest.rs (21 hunks)
  • src/lib.rs (1 hunks)
  • src/parseable/streams.rs (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/lib.rs
  • src/handlers/http/ingest.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (3)
src/parseable/streams.rs (3)

31-37: LGTM! Well-organized imports.

The new imports are logically grouped and all are utilized in the implementation.

Also applies to: 47-47, 52-55, 56-57, 61-61, 65-68


119-158: LGTM! Well-structured log source handling.

The method effectively handles different log sources with clear separation of concerns and proper error handling.


895-902: LGTM! Well-structured timestamp parsing.

The function has proper error handling and clear functionality.

Comment on lines 160 to 223
pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> Result<(), PostError> {
let time_partition = self.get_time_partition();
let time_partition_limit = self.get_time_partition_limit();
let static_schema_flag = self.get_static_schema_flag();
let custom_partition = self.get_custom_partition();
let schema_version = self.get_schema_version();

let data = if time_partition.is_some() || custom_partition.is_some() {
convert_array_to_object(
json,
time_partition.as_ref(),
time_partition_limit,
custom_partition.as_ref(),
schema_version,
log_source,
)?
} else {
vec![convert_to_array(convert_array_to_object(
json,
None,
None,
None,
schema_version,
log_source,
)?)?]
};

for value in data {
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
let parsed_timestamp = match time_partition.as_ref() {
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
_ => Utc::now().naive_utc(),
};
let custom_partition_values = match custom_partition.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
get_custom_partition_values(&value, &custom_partitions)
}
None => HashMap::new(),
};
let schema = self.metadata.read().expect(LOCK_EXPECT).schema.clone();
let (rb, is_first_event) = json::Event { data: value }.into_recordbatch(
&schema,
static_schema_flag,
time_partition.as_ref(),
schema_version,
)?;

Event {
rb,
stream_name: self.stream_name.to_owned(),
origin_format: "json",
origin_size,
is_first_event,
parsed_timestamp,
time_partition: time_partition.clone(),
custom_partition_values,
stream_type: StreamType::UserDefined,
}
.process(self)
.await?;
}
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential panic in JSON serialization.

The serde_json::to_vec(&value).unwrap() call at line 188 could panic if serialization fails.

Consider handling the error case:

-let origin_size = serde_json::to_vec(&value).unwrap().len() as u64;
+let origin_size = serde_json::to_vec(&value)
+    .map_err(|e| PostError::SerdeError(e))?
+    .len() as u64;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> Result<(), PostError> {
let time_partition = self.get_time_partition();
let time_partition_limit = self.get_time_partition_limit();
let static_schema_flag = self.get_static_schema_flag();
let custom_partition = self.get_custom_partition();
let schema_version = self.get_schema_version();
let data = if time_partition.is_some() || custom_partition.is_some() {
convert_array_to_object(
json,
time_partition.as_ref(),
time_partition_limit,
custom_partition.as_ref(),
schema_version,
log_source,
)?
} else {
vec![convert_to_array(convert_array_to_object(
json,
None,
None,
None,
schema_version,
log_source,
)?)?]
};
for value in data {
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
let parsed_timestamp = match time_partition.as_ref() {
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
_ => Utc::now().naive_utc(),
};
let custom_partition_values = match custom_partition.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
get_custom_partition_values(&value, &custom_partitions)
}
None => HashMap::new(),
};
let schema = self.metadata.read().expect(LOCK_EXPECT).schema.clone();
let (rb, is_first_event) = json::Event { data: value }.into_recordbatch(
&schema,
static_schema_flag,
time_partition.as_ref(),
schema_version,
)?;
Event {
rb,
stream_name: self.stream_name.to_owned(),
origin_format: "json",
origin_size,
is_first_event,
parsed_timestamp,
time_partition: time_partition.clone(),
custom_partition_values,
stream_type: StreamType::UserDefined,
}
.process(self)
.await?;
}
Ok(())
}
pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> Result<(), PostError> {
let time_partition = self.get_time_partition();
let time_partition_limit = self.get_time_partition_limit();
let static_schema_flag = self.get_static_schema_flag();
let custom_partition = self.get_custom_partition();
let schema_version = self.get_schema_version();
let data = if time_partition.is_some() || custom_partition.is_some() {
convert_array_to_object(
json,
time_partition.as_ref(),
time_partition_limit,
custom_partition.as_ref(),
schema_version,
log_source,
)?
} else {
vec![convert_to_array(convert_array_to_object(
json,
None,
None,
None,
schema_version,
log_source,
)?)?]
};
for value in data {
- let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
+ let origin_size = serde_json::to_vec(&value)
+ .map_err(|e| PostError::SerdeError(e))?
+ .len() as u64; // string length need not be the same as byte length
let parsed_timestamp = match time_partition.as_ref() {
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
_ => Utc::now().naive_utc(),
};
let custom_partition_values = match custom_partition.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
get_custom_partition_values(&value, &custom_partitions)
}
None => HashMap::new(),
};
let schema = self.metadata.read().expect(LOCK_EXPECT).schema.clone();
let (rb, is_first_event) = json::Event { data: value }.into_recordbatch(
&schema,
static_schema_flag,
time_partition.as_ref(),
schema_version,
)?;
Event {
rb,
stream_name: self.stream_name.to_owned(),
origin_format: "json",
origin_size,
is_first_event,
parsed_timestamp,
time_partition: time_partition.clone(),
custom_partition_values,
stream_type: StreamType::UserDefined,
}
.process(self)
.await?;
}
Ok(())
}

@de-sh de-sh closed this Mar 5, 2025
@de-sh de-sh deleted the refactor branch March 5, 2025 18:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant