Skip to content

Commit bc70091

Browse files
feat: add custom fields to events
p_user_agent - fetch user_agent from request header p_src_ip - fetch source ip from connection info from request header user can add additional headers to the ingest apis in the below format `x-p-<key-name>: <value>` e.g. x-p-environment:dev server adds `environment` in the events with the value `dev` user can add multiple custom headers to be inserted as separate fields in the event
1 parent 7596825 commit bc70091

File tree

7 files changed

+174
-49
lines changed

7 files changed

+174
-49
lines changed

src/event/format/json.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ impl EventFormat for Event {
148148
time_partition: Option<&String>,
149149
schema_version: SchemaVersion,
150150
stream_type: StreamType,
151+
p_custom_fields: &HashMap<String, String>,
151152
) -> Result<super::Event, anyhow::Error> {
152153
let custom_partition_values = match custom_partitions.as_ref() {
153154
Some(custom_partition) => {
@@ -167,6 +168,7 @@ impl EventFormat for Event {
167168
static_schema_flag,
168169
time_partition,
169170
schema_version,
171+
p_custom_fields,
170172
)?;
171173

172174
Ok(super::Event {

src/event/format/mod.rs

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use serde_json::Value;
3333
use crate::{
3434
metadata::SchemaVersion,
3535
storage::StreamType,
36-
utils::arrow::{get_field, get_timestamp_array, replace_columns},
36+
utils::arrow::{add_parseable_fields, get_field},
3737
};
3838

3939
use super::{Event, DEFAULT_TIMESTAMP_KEY};
@@ -115,9 +115,10 @@ pub trait EventFormat: Sized {
115115
static_schema_flag: bool,
116116
time_partition: Option<&String>,
117117
schema_version: SchemaVersion,
118+
p_custom_fields: &HashMap<String, String>,
118119
) -> Result<(RecordBatch, bool), AnyError> {
119120
let p_timestamp = self.get_p_timestamp();
120-
let (data, mut schema, is_first) =
121+
let (data, schema, is_first) =
121122
self.to_data(storage_schema, time_partition, schema_version)?;
122123

123124
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
@@ -127,16 +128,6 @@ pub trait EventFormat: Sized {
127128
));
128129
};
129130

130-
// add the p_timestamp field to the event schema to the 0th index
131-
schema.insert(
132-
0,
133-
Arc::new(Field::new(
134-
DEFAULT_TIMESTAMP_KEY,
135-
DataType::Timestamp(TimeUnit::Millisecond, None),
136-
true,
137-
)),
138-
);
139-
140131
// prepare the record batch and new fields to be added
141132
let mut new_schema = Arc::new(Schema::new(schema));
142133
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
@@ -145,13 +136,9 @@ pub trait EventFormat: Sized {
145136
new_schema =
146137
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
147138

148-
let mut rb = Self::decode(data, new_schema.clone())?;
149-
rb = replace_columns(
150-
rb.schema(),
151-
&rb,
152-
&[0],
153-
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
154-
);
139+
let rb = Self::decode(data, new_schema.clone())?;
140+
141+
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields);
155142

156143
Ok((rb, is_first))
157144
}
@@ -189,6 +176,7 @@ pub trait EventFormat: Sized {
189176
time_partition: Option<&String>,
190177
schema_version: SchemaVersion,
191178
stream_type: StreamType,
179+
p_custom_fields: &HashMap<String, String>,
192180
) -> Result<Event, AnyError>;
193181
}
194182

src/event/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use chrono::NaiveDateTime;
3535
use std::collections::HashMap;
3636

3737
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
38+
pub const USER_AGENT_KEY: &str = "p_user_agent";
39+
pub const SOURCE_IP_KEY: &str = "p_src_ip";
3840

3941
#[derive(Clone)]
4042
pub struct Event {

src/handlers/http/audit.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use actix_web::{
2323
middleware::Next,
2424
};
2525
use actix_web_httpauth::extractors::basic::BasicAuth;
26+
use http::header::USER_AGENT;
2627
use ulid::Ulid;
2728

2829
use crate::{
@@ -85,7 +86,7 @@ pub async fn audit_log_middleware(
8586
)
8687
.with_user_agent(
8788
req.headers()
88-
.get("User-Agent")
89+
.get(USER_AGENT)
8990
.and_then(|a| a.to_str().ok())
9091
.unwrap_or_default(),
9192
)

src/handlers/http/ingest.rs

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::utils::header_parsing::ParseHeaderError;
3838
use crate::utils::json::flatten::JsonFlattenError;
3939

4040
use super::logstream::error::{CreateStreamError, StreamError};
41-
use super::modal::utils::ingest_utils::flatten_and_push_logs;
41+
use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header};
4242
use super::users::dashboards::DashboardError;
4343
use super::users::filters::FiltersError;
4444

@@ -72,7 +72,9 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7272
return Err(PostError::OtelNotSupported);
7373
}
7474

75-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
75+
let p_custom_fields = get_custom_fields_from_header(req);
76+
77+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
7678

7779
Ok(HttpResponse::Ok().finish())
7880
}
@@ -93,6 +95,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9395
None,
9496
SchemaVersion::V0,
9597
StreamType::Internal,
98+
&HashMap::new(),
9699
)?
97100
.process()?;
98101

@@ -122,8 +125,9 @@ pub async fn handle_otel_logs_ingestion(
122125
PARSEABLE
123126
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
124127
.await?;
128+
let p_custom_fields = get_custom_fields_from_header(req);
125129

126-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
130+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
127131

128132
Ok(HttpResponse::Ok().finish())
129133
}
@@ -145,6 +149,7 @@ pub async fn handle_otel_metrics_ingestion(
145149
if log_source != LogSource::OtelMetrics {
146150
return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics));
147151
}
152+
148153
let stream_name = stream_name.to_str().unwrap().to_owned();
149154
PARSEABLE
150155
.create_stream_if_not_exists(
@@ -154,7 +159,9 @@ pub async fn handle_otel_metrics_ingestion(
154159
)
155160
.await?;
156161

157-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
162+
let p_custom_fields = get_custom_fields_from_header(req);
163+
164+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
158165

159166
Ok(HttpResponse::Ok().finish())
160167
}
@@ -182,7 +189,9 @@ pub async fn handle_otel_traces_ingestion(
182189
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
183190
.await?;
184191

185-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
192+
let p_custom_fields = get_custom_fields_from_header(req);
193+
194+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
186195

187196
Ok(HttpResponse::Ok().finish())
188197
}
@@ -231,7 +240,8 @@ pub async fn post_event(
231240
return Err(PostError::OtelNotSupported);
232241
}
233242

234-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
243+
let p_custom_fields = get_custom_fields_from_header(req);
244+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
235245

236246
Ok(HttpResponse::Ok().finish())
237247
}
@@ -381,7 +391,13 @@ mod tests {
381391
});
382392

383393
let (rb, _) = json::Event::new(json)
384-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
394+
.into_recordbatch(
395+
&HashMap::default(),
396+
false,
397+
None,
398+
SchemaVersion::V0,
399+
&HashMap::new(),
400+
)
385401
.unwrap();
386402

387403
assert_eq!(rb.num_rows(), 1);
@@ -409,7 +425,13 @@ mod tests {
409425
});
410426

411427
let (rb, _) = json::Event::new(json)
412-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
428+
.into_recordbatch(
429+
&HashMap::default(),
430+
false,
431+
None,
432+
SchemaVersion::V0,
433+
&HashMap::new(),
434+
)
413435
.unwrap();
414436

415437
assert_eq!(rb.num_rows(), 1);
@@ -441,7 +463,7 @@ mod tests {
441463
);
442464

443465
let (rb, _) = json::Event::new(json)
444-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
466+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
445467
.unwrap();
446468

447469
assert_eq!(rb.num_rows(), 1);
@@ -473,7 +495,7 @@ mod tests {
473495
);
474496

475497
assert!(json::Event::new(json)
476-
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
498+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
477499
.is_err());
478500
}
479501

@@ -491,7 +513,7 @@ mod tests {
491513
);
492514

493515
let (rb, _) = json::Event::new(json)
494-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
516+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
495517
.unwrap();
496518

497519
assert_eq!(rb.num_rows(), 1);
@@ -532,7 +554,13 @@ mod tests {
532554
]);
533555

534556
let (rb, _) = json::Event::new(json)
535-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
557+
.into_recordbatch(
558+
&HashMap::default(),
559+
false,
560+
None,
561+
SchemaVersion::V0,
562+
&HashMap::new(),
563+
)
536564
.unwrap();
537565

538566
assert_eq!(rb.num_rows(), 3);
@@ -580,7 +608,13 @@ mod tests {
580608
]);
581609

582610
let (rb, _) = json::Event::new(json)
583-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
611+
.into_recordbatch(
612+
&HashMap::default(),
613+
false,
614+
None,
615+
SchemaVersion::V0,
616+
&HashMap::new(),
617+
)
584618
.unwrap();
585619

586620
assert_eq!(rb.num_rows(), 3);
@@ -629,7 +663,7 @@ mod tests {
629663
);
630664

631665
let (rb, _) = json::Event::new(json)
632-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
666+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
633667
.unwrap();
634668

635669
assert_eq!(rb.num_rows(), 3);
@@ -678,7 +712,7 @@ mod tests {
678712
);
679713

680714
assert!(json::Event::new(json)
681-
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
715+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
682716
.is_err());
683717
}
684718

@@ -718,7 +752,13 @@ mod tests {
718752
.unwrap();
719753

720754
let (rb, _) = json::Event::new(flattened_json)
721-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
755+
.into_recordbatch(
756+
&HashMap::default(),
757+
false,
758+
None,
759+
SchemaVersion::V0,
760+
&HashMap::new(),
761+
)
722762
.unwrap();
723763
assert_eq!(rb.num_rows(), 4);
724764
assert_eq!(rb.num_columns(), 5);
@@ -801,7 +841,13 @@ mod tests {
801841
.unwrap();
802842

803843
let (rb, _) = json::Event::new(flattened_json)
804-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1)
844+
.into_recordbatch(
845+
&HashMap::default(),
846+
false,
847+
None,
848+
SchemaVersion::V1,
849+
&HashMap::new(),
850+
)
805851
.unwrap();
806852

807853
assert_eq!(rb.num_rows(), 4);

0 commit comments

Comments
 (0)