Skip to content

Commit 9712581

Browse files
feat: force events to be 1 hr apart at the max (#1411)
1 parent 9f6556e commit 9712581

File tree

3 files changed

+58
-44
lines changed

3 files changed

+58
-44
lines changed

src/cli.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,17 @@ pub struct Options {
475475
help = "OIDC scope to request (default: openid profile email)"
476476
)]
477477
pub scope: String,
478+
479+
// event's maximum chunk age in hours
480+
#[arg(
481+
long,
482+
env = "P_EVENT_MAX_CHUNK_AGE",
483+
// Accept 0 to disallow older-than-reference events; cap to one week by default.
484+
value_parser = clap::value_parser!(u64).range(0..=168),
485+
default_value = "1",
486+
help = "Max allowed age gap (in hours) between events within the same node, relative to the reference event"
487+
)]
488+
pub event_max_chunk_age: u64,
478489
}
479490

480491
#[derive(Parser, Debug)]

src/utils/json/flatten.rs

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use std::collections::BTreeMap;
2020
use std::num::NonZeroU32;
21+
use std::sync::Mutex;
2122

2223
use chrono::{DateTime, Duration, Utc};
2324
use serde_json::map::Map;
@@ -27,6 +28,9 @@ use thiserror::Error;
2728

2829
use crate::parseable::PARSEABLE;
2930

31+
// Global variable to track the first timestamp encountered during validation
32+
static REFERENCE_TIMESTAMP: Mutex<Option<DateTime<Utc>>> = Mutex::new(None);
33+
3034
#[derive(Error, Debug)]
3135
pub enum JsonFlattenError {
3236
#[error("Cannot flatten this JSON")]
@@ -45,8 +49,12 @@ pub enum JsonFlattenError {
4549
FieldNotString(String),
4650
#[error("Field {0} is not in the correct datetime format")]
4751
InvalidDatetimeFormat(String),
48-
#[error("Field {0} value is more than {1} days old")]
49-
TimestampTooOld(String, i64),
52+
#[error("Field {0} value '{2}' is more than {1} days old")]
53+
TimestampTooOld(String, i64, DateTime<Utc>),
54+
#[error(
55+
"Field {0} timestamp '{2}' is more than {1} hours older than reference timestamp '{3}'"
56+
)]
57+
TimestampTooOldRelative(String, i64, DateTime<Utc>, DateTime<Utc>),
5058
#[error("Expected object in array of objects")]
5159
ExpectedObjectInArray,
5260
#[error("Found non-object element while flattening array of objects")]
@@ -169,14 +177,43 @@ pub fn validate_time_partition(
169177
partition_key.to_owned(),
170178
));
171179
};
172-
let cutoff_date = Utc::now().naive_utc() - Duration::days(limit_days);
173-
if parsed_timestamp.naive_utc() >= cutoff_date {
174-
Ok(())
175-
} else {
176-
Err(JsonFlattenError::TimestampTooOld(
177-
partition_key.to_owned(),
178-
limit_days,
179-
))
180+
181+
// Access the global reference timestamp and handle poisoning
182+
let mut reference_timestamp = REFERENCE_TIMESTAMP
183+
.lock()
184+
.unwrap_or_else(|p| p.into_inner());
185+
186+
match *reference_timestamp {
187+
None => {
188+
// First timestamp encountered - validate against cutoff date
189+
let cutoff_ts = Utc::now() - Duration::days(limit_days);
190+
if parsed_timestamp >= cutoff_ts {
191+
// Set the reference timestamp
192+
*reference_timestamp = Some(parsed_timestamp);
193+
Ok(())
194+
} else {
195+
Err(JsonFlattenError::TimestampTooOld(
196+
partition_key.to_owned(),
197+
limit_days,
198+
parsed_timestamp,
199+
))
200+
}
201+
}
202+
Some(ref_timestamp) => {
203+
// Subsequent timestamps - validate they're not more than configured hours older than reference
204+
let max_age_hours = PARSEABLE.options.event_max_chunk_age as i64;
205+
let max_age_before_ref = ref_timestamp - Duration::hours(max_age_hours);
206+
if parsed_timestamp >= max_age_before_ref {
207+
Ok(())
208+
} else {
209+
Err(JsonFlattenError::TimestampTooOldRelative(
210+
partition_key.to_owned(),
211+
max_age_hours,
212+
parsed_timestamp,
213+
ref_timestamp,
214+
))
215+
}
216+
}
180217
}
181218
}
182219

src/utils/json/mod.rs

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -480,40 +480,6 @@ mod tests {
480480
);
481481
}
482482

483-
#[test]
484-
fn test_convert_array_to_object_with_time_partition() {
485-
let json = json!([
486-
{
487-
"a": "b",
488-
"source_time": "2025-08-01T00:00:00.000Z"
489-
},
490-
{
491-
"a": "b",
492-
"source_time": "2025-08-01T00:01:00.000Z"
493-
}
494-
]);
495-
496-
let time_partition = Some("source_time".to_string());
497-
let result = convert_array_to_object(
498-
json,
499-
time_partition.as_ref(),
500-
None,
501-
None,
502-
SchemaVersion::V0,
503-
&crate::event::format::LogSource::default(),
504-
);
505-
506-
assert!(result.is_ok());
507-
let objects = result.unwrap();
508-
509-
// Should return 2 separate objects, not wrapped in an array
510-
assert_eq!(objects.len(), 2);
511-
assert_eq!(objects[0]["a"], "b");
512-
assert_eq!(objects[0]["source_time"], "2025-08-01T00:00:00.000Z");
513-
assert_eq!(objects[1]["a"], "b");
514-
assert_eq!(objects[1]["source_time"], "2025-08-01T00:01:00.000Z");
515-
}
516-
517483
#[test]
518484
fn test_convert_array_to_object_without_time_partition() {
519485
let json = json!([

0 commit comments

Comments
 (0)