diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index fa699d7af..485b071a9 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -89,16 +89,32 @@ pub async fn ingest( }; let log_source_entry = LogSourceEntry::new(log_source.clone(), fields); - let p_custom_fields = get_custom_fields_from_header(req); - + PARSEABLE .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, - vec![log_source_entry], + vec![log_source_entry.clone()], ) .await?; + //if stream exists, fetch the stream log source + //return error if the stream log source is otel traces or otel metrics + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format != LogSource::OtelTraces + && stream_log_source_entry.log_source_format != LogSource::OtelMetrics + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + + PARSEABLE + .add_update_log_source(&stream_name, log_source_entry) + .await?; + let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) @@ -159,9 +175,27 @@ pub async fn handle_otel_logs_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, - vec![log_source_entry], + vec![log_source_entry.clone()], ) .await?; + + //if stream exists, fetch the stream log source + //return error if the stream log source is otel traces or otel metrics + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format != LogSource::OtelTraces + && stream_log_source_entry.log_source_format != LogSource::OtelMetrics + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + + PARSEABLE + .add_update_log_source(&stream_name, log_source_entry) + .await?; + let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -188,6 +222,7 @@ pub async fn handle_otel_metrics_ingestion( } let stream_name = stream_name.to_str().unwrap().to_owned(); + let log_source_entry = LogSourceEntry::new( log_source.clone(), OTEL_METRICS_KNOWN_FIELD_LIST @@ -199,10 +234,26 @@ pub async fn handle_otel_metrics_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, - vec![log_source_entry], + vec![log_source_entry.clone()], ) .await?; + //if stream exists, fetch the stream log source + //return error if the stream log source is not otel metrics + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format == log_source.clone() + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + + PARSEABLE + .add_update_log_source(&stream_name, log_source_entry) + .await?; + let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -229,6 +280,7 @@ pub async fn handle_otel_traces_ingestion( return Err(PostError::IncorrectLogSource(LogSource::OtelTraces)); } let stream_name = stream_name.to_str().unwrap().to_owned(); + let log_source_entry = LogSourceEntry::new( log_source.clone(), OTEL_TRACES_KNOWN_FIELD_LIST @@ -241,10 +293,26 @@ pub async fn handle_otel_traces_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, - vec![log_source_entry], + vec![log_source_entry.clone()], ) .await?; + //if stream exists, fetch the stream log source + //return error if the stream log source is not otel traces + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format == log_source.clone() + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + + PARSEABLE + .add_update_log_source(&stream_name, log_source_entry) + .await?; + let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -304,6 +372,18 @@ pub async fn post_event( _ => {} } + //if stream exists, fetch the stream log source + //return error if the stream log source is otel traces or otel metrics + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format != LogSource::OtelTraces + && stream_log_source_entry.log_source_format != LogSource::OtelMetrics + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -373,6 +453,8 @@ pub enum PostError { MissingTimePartition(String), #[error("{0}")] KnownFormat(#[from] known_schema::Error), + #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] + IncorrectLogFormat(String), } impl actix_web::ResponseError for PostError { @@ -400,6 +482,7 @@ impl actix_web::ResponseError for PostError { PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST, PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, + PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index b10a34bdd..ed6354eb8 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -427,11 +427,6 @@ impl Parseable { log_source: Vec, ) -> Result { if self.streams.contains(stream_name) { - for stream_log_source in log_source { - self.add_update_log_source(stream_name, stream_log_source) - .await?; - } - return Ok(true); } @@ -443,7 +438,7 @@ impl Parseable { .create_stream_and_schema_from_storage(stream_name) .await? { - return Ok(false); + return Ok(true); } self.create_stream(