Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 89 additions & 6 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,32 @@ pub async fn ingest(
};

let log_source_entry = LogSourceEntry::new(log_source.clone(), fields);
let p_custom_fields = get_custom_fields_from_header(req);


PARSEABLE
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
vec![log_source_entry],
vec![log_source_entry.clone()],
)
.await?;

//if stream exists, fetch the stream log source
//return error if the stream log source is otel traces or otel metrics
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
stream
.get_log_source()
.iter()
.find(|&stream_log_source_entry| {
stream_log_source_entry.log_source_format != LogSource::OtelTraces
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
}

PARSEABLE
.add_update_log_source(&stream_name, log_source_entry)
.await?;
let p_custom_fields = get_custom_fields_from_header(req);
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -159,9 +175,27 @@ pub async fn handle_otel_logs_ingestion(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
vec![log_source_entry],
vec![log_source_entry.clone()],
)
.await?;

//if stream exists, fetch the stream log source
//return error if the stream log source is otel traces or otel metrics
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
stream
.get_log_source()
.iter()
.find(|&stream_log_source_entry| {
stream_log_source_entry.log_source_format != LogSource::OtelTraces
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
}

PARSEABLE
.add_update_log_source(&stream_name, log_source_entry)
.await?;

let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
Expand All @@ -188,6 +222,7 @@ pub async fn handle_otel_metrics_ingestion(
}

let stream_name = stream_name.to_str().unwrap().to_owned();

let log_source_entry = LogSourceEntry::new(
log_source.clone(),
OTEL_METRICS_KNOWN_FIELD_LIST
Expand All @@ -199,10 +234,26 @@ pub async fn handle_otel_metrics_ingestion(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
vec![log_source_entry],
vec![log_source_entry.clone()],
)
.await?;

//if stream exists, fetch the stream log source
//return error if the stream log source is not otel metrics
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
stream
.get_log_source()
.iter()
.find(|&stream_log_source_entry| {
stream_log_source_entry.log_source_format == log_source.clone()
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
}

PARSEABLE
.add_update_log_source(&stream_name, log_source_entry)
.await?;

let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
Expand All @@ -229,6 +280,7 @@ pub async fn handle_otel_traces_ingestion(
return Err(PostError::IncorrectLogSource(LogSource::OtelTraces));
}
let stream_name = stream_name.to_str().unwrap().to_owned();

let log_source_entry = LogSourceEntry::new(
log_source.clone(),
OTEL_TRACES_KNOWN_FIELD_LIST
Expand All @@ -241,10 +293,26 @@ pub async fn handle_otel_traces_ingestion(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
vec![log_source_entry],
vec![log_source_entry.clone()],
)
.await?;

//if stream exists, fetch the stream log source
//return error if the stream log source is not otel traces
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
stream
.get_log_source()
.iter()
.find(|&stream_log_source_entry| {
stream_log_source_entry.log_source_format == log_source.clone()
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
}
Comment on lines +300 to +310
Copy link
Contributor

@coderabbitai coderabbitai bot Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Similar concern with OTEL traces ingestion logic

Like the metrics ingestion, this logic only allows OTEL traces ingestion if the stream already has an OtelTraces entry. This could prevent ingestion to new streams with no log sources yet.

The PR objectives state "traces ingestion is not allowed if the stream is already connected to otel metrics or any log formats", but this implementation seems to require that traces already exist.


🏁 Script executed:

#!/bin/bash
# Look for any special handling of empty streams in the codebase
rg "get_log_source\(\)" -A 5 -B 5

Length of output: 4622


Attention: OTEL Traces Ingestion Logic May Block New Streams

The current implementation in src/handlers/http/ingest.rs (lines ~300–310) requires that a stream already includes a log source entry matching the provided trace format. This check

.find(|&stream_log_source_entry| {
    stream_log_source_entry.log_source_format == log_source.clone()
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;

prevents ingestion on new streams that have no log source entries. This is at odds with the PR objective stating that "traces ingestion is not allowed if the stream is already connected to otel metrics or any log formats"—which implies that a trace ingestion should be rejected only when a conflicting log source already exists, not when no log source is present.

  • File: src/handlers/http/ingest.rs
  • Lines: ~300–310
  • Issue: The equality check forces a pre-existing matching OTEL traces entry, effectively blocking ingestion on new streams with no log sources.
  • Request: Please revisit this condition to ensure that OTEL traces ingestion is allowed for new streams while still preventing ingestion if a stream is already connected to OTEL metrics or any log formats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Parseable, a stream is always associated with a log_source, no stream can exist without a log_source, and the idea is - if user wants to ingest otel-traces, and has provided a stream which already contains a different log_source, server should reject, same with otel-metrics, server allows logs from multiple log_sources to a single stream, is this clear?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhilsinhaparseable Thank you for the clarification. I understand now:

  1. In Parseable, every stream must have an associated log_source - no stream exists without one
  2. For otel-traces and otel-metrics, you're implementing strict restrictions: if a stream already has a different log_source format, ingestion is rejected
  3. For regular logs, multiple log_sources can coexist in a single stream

This makes the implementation in handle_otel_traces_ingestion correct as it enforces that a stream can only accept otel-traces if it was created for that purpose.

My apologies for the confusion in the initial review. The check is working as intended for your architecture.


✏️ Learnings added
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.031Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-03-19T20:28:50.867Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhilsinhaparseable You're welcome! I'm glad I could understand the implementation correctly. Let me know if you need any further assistance with this PR.


🧠 Learnings used
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-03-19T20:28:50.867Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.


PARSEABLE
.add_update_log_source(&stream_name, log_source_entry)
.await?;

let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
Expand Down Expand Up @@ -304,6 +372,18 @@ pub async fn post_event(
_ => {}
}

//if stream exists, fetch the stream log source
//return error if the stream log source is otel traces or otel metrics
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
stream
.get_log_source()
.iter()
.find(|&stream_log_source_entry| {
stream_log_source_entry.log_source_format != LogSource::OtelTraces
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
}
let p_custom_fields = get_custom_fields_from_header(req);
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Expand Down Expand Up @@ -373,6 +453,8 @@ pub enum PostError {
MissingTimePartition(String),
#[error("{0}")]
KnownFormat(#[from] known_schema::Error),
#[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")]
IncorrectLogFormat(String),
}

impl actix_web::ResponseError for PostError {
Expand Down Expand Up @@ -400,6 +482,7 @@ impl actix_web::ResponseError for PostError {
PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST,
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
PostError::KnownFormat(_) => StatusCode::BAD_REQUEST,
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down
7 changes: 1 addition & 6 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,6 @@ impl Parseable {
log_source: Vec<LogSourceEntry>,
) -> Result<bool, PostError> {
if self.streams.contains(stream_name) {
for stream_log_source in log_source {
self.add_update_log_source(stream_name, stream_log_source)
.await?;
}

return Ok(true);
}

Expand All @@ -443,7 +438,7 @@ impl Parseable {
.create_stream_and_schema_from_storage(stream_name)
.await?
{
return Ok(false);
return Ok(true);
}

self.create_stream(
Expand Down
Loading