Skip to content

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Mar 22, 2025

log ingestion is not allowed
if stream is already associated with otel metrics or traces

metrics ingestion is not allowed
if stream is already associated with otel traces or any log formats

similarly, traces ingestion is not allowed
if stream is already associated with otel metrics or any log formats

otel logs can be ingested with other log formats

Summary by CodeRabbit

  • New Features

    • Enhanced error handling for log ingestion, providing users with clearer notifications when unsupported log formats are detected.
  • Refactor

    • Streamlined the logic for handling existing streams, ensuring more accurate confirmation of stream existence and improving overall ingestion efficiency.

Copy link
Contributor

coderabbitai bot commented Mar 22, 2025

Walkthrough

This pull request enhances the log ingestion process by adding error handling and validation for log sources. In the HTTP ingestion handler, checks are introduced to verify that a log source’s format is supported before proceeding. A new error variant (IncorrectLogFormat) is added for clarity. Additionally, the logic in the stream creation process within the parseable module is streamlined by removing an unnecessary update loop and adjusting the return value to correctly indicate stream presence.

Changes

File(s) Change Summary
src/handlers/.../ingest.rs Added error handling for log source validation in ingestion functions. Introduced a new error variant IncorrectLogFormat in the PostError enum. Adjusted the sequence of operations by defining p_custom_fields after log source processing and ensured proper instance passing via cloning.
src/parseable/mod.rs Removed the loop that updated log sources in the create_stream_if_not_exists method. Modified the return value for streams found in storage from Ok(false) to Ok(true) to accurately signal stream existence, thereby simplifying the control flow within the method.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant IngestionHandler
    participant StreamManager
    participant FormatValidator

    Client->>IngestionHandler: Send ingestion request
    IngestionHandler->>StreamManager: Check if stream exists / Create stream if needed
    alt Stream exists
        StreamManager-->>IngestionHandler: Return log source entry
        IngestionHandler->>FormatValidator: Validate log source format
        alt Unsupported format
            FormatValidator-->>IngestionHandler: Return IncorrectLogFormat error
            IngestionHandler->>Client: Return error response
        else Supported format
            FormatValidator-->>IngestionHandler: Validation passed
            IngestionHandler->>Client: Proceed with ingestion
        end
    else Stream does not exist
        StreamManager-->>IngestionHandler: Stream created
        IngestionHandler->>Client: Proceed with ingestion
    end
Loading

Possibly related PRs

Poem

I'm a bunny coder, hopping through the code,
Fixing log streams on my merry road.
With errors checked and formats right,
My little paws make bugs take flight.
Hoppy changes make our code glow bright! 🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/handlers/http/ingest.rs (1)

161-161: Consider consolidating repeated ingestion checks
The logic for verifying the stream against OTEL traces or metrics before ingesting OTEL logs is nearly identical to the block in ingest. Factor out a helper function to reduce duplication and improve maintainability.

Also applies to: 164-181

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8ac3105 and 6ef4529.

📒 Files selected for processing (2)
  • src/handlers/http/ingest.rs (10 hunks)
  • src/parseable/mod.rs (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/handlers/http/ingest.rs (1)
src/handlers/http/modal/utils/ingest_utils.rs (1) (1)
  • get_custom_fields_from_header (149-207)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (6)
src/parseable/mod.rs (1)

441-441: Returning Ok(true) simplifies existing-stream handling
This change ensures that the function accurately indicates the stream already exists, preventing unnecessary re-creation. Verify that callers handle the boolean return appropriately if additional updates to the stream are ever needed.

src/handlers/http/ingest.rs (5)

209-209: Otel metrics ingestion validation
These checks ensure only an OTEL metrics stream remains compatible with further OTEL metrics ingestion. This aligns well with restricting cross-format ingestion. No immediate issues spotted; the logic is consistent with your PR objectives.

Also applies to: 220-220, 224-239


266-266: Consistent tracing format enforcement
Similar to metrics handling, this block confirms the stream is dedicated to OTEL traces if it already exists. The logic is coherent and follows the same pattern.

Also applies to: 279-279, 283-293, 295-298


316-321: Parallel checks in post_event
These lines replicate the “non-OTEL” check from ingest. If that duplication is intended, no functional problem is apparent. Otherwise, consider refactoring for consistency.

Also applies to: 350-361


429-430: New IncorrectLogFormat error
Introducing this variant clarifies the user-facing error when an incompatible OTEL format is encountered on a stream. The choice to map this variant to BAD_REQUEST status aligns well with typical client-side error semantics.

Also applies to: 457-457


80-80:

❓ Verification inconclusive

Ensure correct logic when mixing log sources
You are only allowing ingestion if at least one log source in the stream is not OtelTraces or OtelMetrics. If any existing log source is exclusively Otel-based, the code raises IncorrectLogFormat. This matches your stated requirement to disallow normal log ingestion on a stream that is tied strictly to OTEL. However, if the presence of any OTEL format among multiple log sources should block ingestion, verify that this condition is logically inverted.

Would you like to run a test script that creates a stream with mixed OTEL and non-OTEL LogSourceEntrys to confirm this behavior is correct?

Also applies to: 84-100


🏁 Script executed:

#!/bin/bash
# Attempt to create a stream and add OtelTraces, then add normal logs.

# We'll mock an API call by echoing JSON and verifying the response codes/logic.
# This script stands in for an actual test that checks mixing multiple log sources.
echo "Simulating multi log-source creation..."

Length of output: 88


Confirm Log Ingestion Behavior with Mixed OTEL Sources

The test script output simply echoed a message and did not verify the actual outcome when mixing OTEL and non-OTEL log sources. Please expand on the test by asserting the expected responses: specifically, ensure that a stream composed solely of OTEL entries triggers an IncorrectLogFormat error, while a stream containing at least one non-OTEL source proceeds with normal ingestion. Also, verify that the logic using vec![log_source_entry.clone()] in src/handlers/http/ingest.rs (lines 80-80) correctly implements this behavior.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 22, 2025
log ingestion is not allowed
if stream is already associated with otel metrics or traces

metrics ingestion is not allowed
if stream is already associated with otel traces or any log formats

similarly, traces ingestion is not allowed
if stream is already associated with otel metrics or any log formats

otel logs can be ingested with other log formats
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/handlers/http/ingest.rs (1)

237-251: Consider refactoring repetitive validation logic

The validation logic is duplicated across multiple handler functions. Consider creating a helper function to centralize this logic and reduce code duplication.

You could create a helper function like:

+ fn validate_log_source_compatibility(
+     stream_name: &str,
+     expected_format: &LogSource,
+     exclusive: bool
+ ) -> Result<(), PostError> {
+     if let Ok(stream) = PARSEABLE.get_stream(stream_name) {
+         let found = stream
+             .get_log_source()
+             .iter()
+             .find(|&entry| {
+                 if exclusive {
+                     // For metrics and traces: must match exactly
+                     entry.log_source_format == *expected_format
+                 } else {
+                     // For logs: must not be incompatible formats
+                     entry.log_source_format != LogSource::OtelTraces
+                         && entry.log_source_format != LogSource::OtelMetrics
+                 }
+             });
+         
+         if found.is_none() {
+             return Err(PostError::IncorrectLogFormat(stream_name.clone()));
+         }
+     }
+     Ok(())
+ }

Then use it in each handler:

- //if stream exists, fetch the stream log source
- //return error if the stream log source is not otel metrics
- if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
-     stream
-         .get_log_source()
-         .iter()
-         .find(|&stream_log_source_entry| {
-             stream_log_source_entry.log_source_format == log_source.clone()
-         })
-         .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
- }
+ validate_log_source_compatibility(&stream_name, &log_source, true)?;

Also applies to: 253-255, 300-310, 312-314

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d42d937 and a99e1c0.

📒 Files selected for processing (2)
  • src/handlers/http/ingest.rs (9 hunks)
  • src/parseable/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (9)
src/handlers/http/ingest.rs (9)

97-97: Good use of clone() to avoid consumption

Using clone() here is appropriate since you need to keep the original log_source_entry for later use.


101-112: Implement the first part of the OTEL format restriction

This block correctly implements the restriction logic, preventing log ingestion if the stream is already associated with OtelTraces or OtelMetrics formats.


114-117: Good sequencing of operations

Adding the log source entry before retrieving custom fields ensures proper initialization order. The code correctly updates the stream with the log source entry.


178-179: Good use of clone() in OTEL logs ingestion

Similar to the change in the main ingest function, using clone() here prevents consumption of the log_source_entry.


182-193: Consistent restriction implementation for OTEL logs

This block implements the same restriction as in the main ingest function, preventing OTEL logs ingestion if the stream already has OTEL traces or metrics entries.


195-197: Properly update log source entry

This ensures the log source is correctly updated in the stream after validating format compatibility.


375-386: Consistent restriction in post_event function

This correctly implements the same restriction logic for the post_event function, ensuring consistent behavior across different ingestion endpoints.


456-457: New error variant properly added

The new error variant IncorrectLogFormat is well-defined with a clear error message and appropriate status code mapping.

Also applies to: 485-485


241-251:

✅ Verification successful

Logic may be too restrictive for OTEL metrics ingestion

This implementation appears to only allow OTEL metrics ingestion if the stream already has an OtelMetrics entry. This could prevent ingestion to new streams with no log sources yet, which may not be the intention.

Is the intention to only allow OTEL metrics ingestion if the stream already has OTEL metrics entries? Or is it supposed to prevent metrics ingestion if the stream has any incompatible formats?


🏁 Script executed:

#!/bin/bash
# Check how empty streams are handled in the codebase
rg -A 5 "create_stream_if_not_exists" 

Length of output: 2446


OTEL Metrics Ingestion Logic Validated

After verifying the surrounding code, it’s clear that the current logic is intentional. New streams (which have no log sources yet) are created via create_stream_if_not_exists with the correct OTEL metrics entry. The check in question applies only to existing streams—ensuring that if a stream already exists, its log source is compatible—thereby preventing incompatible ingestions. No changes are required.

Comment on lines +300 to +310
//if stream exists, fetch the stream log source
//return error if the stream log source is not otel traces
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
stream
.get_log_source()
.iter()
.find(|&stream_log_source_entry| {
stream_log_source_entry.log_source_format == log_source.clone()
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
}
Copy link
Contributor

@coderabbitai coderabbitai bot Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Similar concern with OTEL traces ingestion logic

Like the metrics ingestion, this logic only allows OTEL traces ingestion if the stream already has an OtelTraces entry. This could prevent ingestion to new streams with no log sources yet.

The PR objectives state "traces ingestion is not allowed if the stream is already connected to otel metrics or any log formats", but this implementation seems to require that traces already exist.


🏁 Script executed:

#!/bin/bash
# Look for any special handling of empty streams in the codebase
rg "get_log_source\(\)" -A 5 -B 5

Length of output: 4622


Attention: OTEL Traces Ingestion Logic May Block New Streams

The current implementation in src/handlers/http/ingest.rs (lines ~300–310) requires that a stream already includes a log source entry matching the provided trace format. This check

.find(|&stream_log_source_entry| {
    stream_log_source_entry.log_source_format == log_source.clone()
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;

prevents ingestion on new streams that have no log source entries. This is at odds with the PR objective stating that "traces ingestion is not allowed if the stream is already connected to otel metrics or any log formats"—which implies that a trace ingestion should be rejected only when a conflicting log source already exists, not when no log source is present.

  • File: src/handlers/http/ingest.rs
  • Lines: ~300–310
  • Issue: The equality check forces a pre-existing matching OTEL traces entry, effectively blocking ingestion on new streams with no log sources.
  • Request: Please revisit this condition to ensure that OTEL traces ingestion is allowed for new streams while still preventing ingestion if a stream is already connected to OTEL metrics or any log formats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Parseable, a stream is always associated with a log_source, no stream can exist without a log_source, and the idea is - if user wants to ingest otel-traces, and has provided a stream which already contains a different log_source, server should reject, same with otel-metrics, server allows logs from multiple log_sources to a single stream, is this clear?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhilsinhaparseable Thank you for the clarification. I understand now:

  1. In Parseable, every stream must have an associated log_source - no stream exists without one
  2. For otel-traces and otel-metrics, you're implementing strict restrictions: if a stream already has a different log_source format, ingestion is rejected
  3. For regular logs, multiple log_sources can coexist in a single stream

This makes the implementation in handle_otel_traces_ingestion correct as it enforces that a stream can only accept otel-traces if it was created for that purpose.

My apologies for the confusion in the initial review. The check is working as intended for your architecture.


✏️ Learnings added
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.031Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-03-19T20:28:50.867Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhilsinhaparseable You're welcome! I'm glad I could understand the implementation correctly. Let me know if you need any further assistance with this PR.


🧠 Learnings used
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-03-19T20:28:50.867Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

@nitisht nitisht merged commit 003539b into parseablehq:main Mar 26, 2025
14 checks passed
@nikhilsinhaparseable nikhilsinhaparseable deleted the otel-restriction branch July 12, 2025 08:58
This was referenced Jul 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants