Skip to content

Commit 6ef4529

Browse files
restrict otel ingestion
log ingestion is not allowed if stream is already associated with otel metrics or traces metrics ingestion is not allowed if stream is already associated with otel traces or any log formats similarly, traces ingestion is not allowed if stream is already associated with otel metrics or any log formats otel logs can be ingested with other log formats
1 parent 8ac3105 commit 6ef4529

File tree

2 files changed

+95
-18
lines changed

2 files changed

+95
-18
lines changed

src/handlers/http/ingest.rs

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,32 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7272
return Err(PostError::OtelNotSupported);
7373
}
7474

75-
let p_custom_fields = get_custom_fields_from_header(req);
76-
7775
let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
7876
PARSEABLE
7977
.create_stream_if_not_exists(
8078
&stream_name,
8179
StreamType::UserDefined,
82-
vec![log_source_entry],
80+
vec![log_source_entry.clone()],
8381
)
8482
.await?;
8583

84+
//if stream exists, fetch the stream log source
85+
//return error if the stream log source is otel traces or otel metrics
86+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
87+
stream
88+
.get_log_source()
89+
.iter()
90+
.find(|&stream_log_source_entry| {
91+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
92+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
93+
})
94+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
95+
}
96+
97+
PARSEABLE
98+
.add_update_log_source(&stream_name, log_source_entry)
99+
.await?;
100+
let p_custom_fields = get_custom_fields_from_header(req);
86101
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
87102

88103
Ok(HttpResponse::Ok().finish())
@@ -143,9 +158,27 @@ pub async fn handle_otel_logs_ingestion(
143158
.create_stream_if_not_exists(
144159
&stream_name,
145160
StreamType::UserDefined,
146-
vec![log_source_entry],
161+
vec![log_source_entry.clone()],
147162
)
148163
.await?;
164+
165+
//if stream exists, fetch the stream log source
166+
//return error if the stream log source is otel traces or otel metrics
167+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
168+
stream
169+
.get_log_source()
170+
.iter()
171+
.find(|&stream_log_source_entry| {
172+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
173+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
174+
})
175+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
176+
}
177+
178+
PARSEABLE
179+
.add_update_log_source(&stream_name, log_source_entry)
180+
.await?;
181+
149182
let p_custom_fields = get_custom_fields_from_header(req);
150183

151184
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
@@ -172,6 +205,7 @@ pub async fn handle_otel_metrics_ingestion(
172205
}
173206

174207
let stream_name = stream_name.to_str().unwrap().to_owned();
208+
175209
let log_source_entry = LogSourceEntry::new(
176210
log_source.clone(),
177211
OTEL_METRICS_KNOWN_FIELD_LIST
@@ -183,10 +217,26 @@ pub async fn handle_otel_metrics_ingestion(
183217
.create_stream_if_not_exists(
184218
&stream_name,
185219
StreamType::UserDefined,
186-
vec![log_source_entry],
220+
vec![log_source_entry.clone()],
187221
)
188222
.await?;
189223

224+
//if stream exists, fetch the stream log source
225+
//return error if the stream log source is not otel metrics
226+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
227+
stream
228+
.get_log_source()
229+
.iter()
230+
.find(|&stream_log_source_entry| {
231+
stream_log_source_entry.log_source_format == log_source.clone()
232+
})
233+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
234+
}
235+
236+
PARSEABLE
237+
.add_update_log_source(&stream_name, log_source_entry)
238+
.await?;
239+
190240
let p_custom_fields = get_custom_fields_from_header(req);
191241

192242
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
@@ -213,6 +263,7 @@ pub async fn handle_otel_traces_ingestion(
213263
return Err(PostError::IncorrectLogSource(LogSource::OtelTraces));
214264
}
215265
let stream_name = stream_name.to_str().unwrap().to_owned();
266+
216267
let log_source_entry = LogSourceEntry::new(
217268
log_source.clone(),
218269
OTEL_TRACES_KNOWN_FIELD_LIST
@@ -225,10 +276,26 @@ pub async fn handle_otel_traces_ingestion(
225276
.create_stream_if_not_exists(
226277
&stream_name,
227278
StreamType::UserDefined,
228-
vec![log_source_entry],
279+
vec![log_source_entry.clone()],
229280
)
230281
.await?;
231282

283+
//if stream exists, fetch the stream log source
284+
//return error if the stream log source is not otel traces
285+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
286+
stream
287+
.get_log_source()
288+
.iter()
289+
.find(|&stream_log_source_entry| {
290+
stream_log_source_entry.log_source_format == log_source.clone()
291+
})
292+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
293+
}
294+
295+
PARSEABLE
296+
.add_update_log_source(&stream_name, log_source_entry)
297+
.await?;
298+
232299
let p_custom_fields = get_custom_fields_from_header(req);
233300

234301
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
@@ -246,6 +313,12 @@ pub async fn post_event(
246313
) -> Result<HttpResponse, PostError> {
247314
let stream_name = stream_name.into_inner();
248315

316+
let log_source = req
317+
.headers()
318+
.get(LOG_SOURCE_KEY)
319+
.and_then(|h| h.to_str().ok())
320+
.map_or(LogSource::default(), LogSource::from);
321+
249322
let internal_stream_names = PARSEABLE.streams.list_internal_streams();
250323
if internal_stream_names.contains(&stream_name) {
251324
return Err(PostError::InternalStream(stream_name));
@@ -267,19 +340,25 @@ pub async fn post_event(
267340
}
268341
}
269342

270-
let log_source = req
271-
.headers()
272-
.get(LOG_SOURCE_KEY)
273-
.and_then(|h| h.to_str().ok())
274-
.map_or(LogSource::default(), LogSource::from);
275-
276343
if matches!(
277344
log_source,
278345
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
279346
) {
280347
return Err(PostError::OtelNotSupported);
281348
}
282349

350+
//if stream exists, fetch the stream log source
351+
//return error if the stream log source is otel traces or otel metrics
352+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
353+
stream
354+
.get_log_source()
355+
.iter()
356+
.find(|&stream_log_source_entry| {
357+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
358+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
359+
})
360+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
361+
}
283362
let p_custom_fields = get_custom_fields_from_header(req);
284363
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
285364

@@ -347,6 +426,8 @@ pub enum PostError {
347426
IngestionNotAllowed,
348427
#[error("Missing field for time partition in json: {0}")]
349428
MissingTimePartition(String),
429+
#[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")]
430+
IncorrectLogFormat(String),
350431
}
351432

352433
impl actix_web::ResponseError for PostError {
@@ -373,6 +454,7 @@ impl actix_web::ResponseError for PostError {
373454
PostError::IncorrectLogSource(_) => StatusCode::BAD_REQUEST,
374455
PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST,
375456
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
457+
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST,
376458
}
377459
}
378460

src/parseable/mod.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -427,11 +427,6 @@ impl Parseable {
427427
log_source: Vec<LogSourceEntry>,
428428
) -> Result<bool, PostError> {
429429
if self.streams.contains(stream_name) {
430-
for stream_log_source in log_source {
431-
self.add_update_log_source(stream_name, stream_log_source)
432-
.await?;
433-
}
434-
435430
return Ok(true);
436431
}
437432

@@ -443,7 +438,7 @@ impl Parseable {
443438
.create_stream_and_schema_from_storage(stream_name)
444439
.await?
445440
{
446-
return Ok(false);
441+
return Ok(true);
447442
}
448443

449444
self.create_stream(

0 commit comments

Comments
 (0)