Skip to content

Commit 5ed9fd3

Browse files
include null, add query param in prism home api
1 parent ee06b73 commit 5ed9fd3

File tree

4 files changed

+144
-55
lines changed

4 files changed

+144
-55
lines changed

src/handlers/http/prism_home.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
};
2727

2828
const HOME_SEARCH_QUERY_PARAM: &str = "key";
29-
29+
const HOME_QUERY_PARAM: &str = "includeInternal";
3030
/// Fetches the data to populate Prism's home
3131
///
3232
///
@@ -36,8 +36,14 @@ const HOME_SEARCH_QUERY_PARAM: &str = "key";
3636
pub async fn home_api(req: HttpRequest) -> Result<impl Responder, PrismHomeError> {
3737
let key = extract_session_key_from_req(&req)
3838
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;
39+
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
40+
.map_err(|_| PrismHomeError::InvalidQueryParameter(HOME_QUERY_PARAM.to_string()))?;
3941

40-
let res = generate_home_response(&key).await?;
42+
let include_internal = query_map
43+
.get(HOME_QUERY_PARAM)
44+
.map_or(false, |v| v == "true");
45+
46+
let res = generate_home_response(&key, include_internal).await?;
4147

4248
Ok(web::Json(res))
4349
}
@@ -52,11 +58,12 @@ pub async fn home_search(req: HttpRequest) -> Result<impl Responder, PrismHomeEr
5258
return Ok(web::Json(serde_json::json!({})));
5359
}
5460

55-
let query_value = query_map
61+
let query_key = query_map
5662
.get(HOME_SEARCH_QUERY_PARAM)
5763
.ok_or_else(|| PrismHomeError::InvalidQueryParameter(HOME_SEARCH_QUERY_PARAM.to_string()))?
5864
.to_lowercase();
59-
let res = generate_home_search_response(&key, &query_value).await?;
65+
66+
let res = generate_home_search_response(&key, &query_key).await?;
6067
let json_res = serde_json::to_value(res)
6168
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;
6269

src/otel/traces.rs

Lines changed: 115 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,6 @@ fn flatten_span_record(span_record: &Span) -> Vec<Map<String, Value>> {
336336
span_records_json
337337
}
338338

339-
340339
#[cfg(test)]
341340
mod tests {
342341
use super::*;
@@ -360,13 +359,21 @@ mod tests {
360359
KeyValue {
361360
key: "service.name".to_string(),
362361
value: Some(AnyValue {
363-
value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue("test-service".to_string())),
362+
value: Some(
363+
opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(
364+
"test-service".to_string(),
365+
),
366+
),
364367
}),
365368
},
366369
KeyValue {
367370
key: "http.method".to_string(),
368371
value: Some(AnyValue {
369-
value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue("GET".to_string())),
372+
value: Some(
373+
opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(
374+
"GET".to_string(),
375+
),
376+
),
370377
}),
371378
},
372379
]
@@ -398,7 +405,8 @@ mod tests {
398405
assert_eq!(
399406
result.get("span_status_description").unwrap(),
400407
&Value::String(expected_description.to_string()),
401-
"Status description should match expected value for code {}", code
408+
"Status description should match expected value for code {}",
409+
code
402410
);
403411
assert_eq!(
404412
result.get("span_status_message").unwrap(),
@@ -432,7 +440,8 @@ mod tests {
432440
assert_eq!(
433441
result.get("span_kind_description").unwrap(),
434442
&Value::String(expected_description.to_string()),
435-
"Span kind description should match expected value for kind {}", kind
443+
"Span kind description should match expected value for kind {}",
444+
kind
436445
);
437446
}
438447
}
@@ -459,7 +468,8 @@ mod tests {
459468
assert_eq!(
460469
result.get("span_flags_description").unwrap(),
461470
&Value::String(expected_description.to_string()),
462-
"Span flags description should match expected value for flags {}", flags
471+
"Span flags description should match expected value for flags {}",
472+
flags
463473
);
464474
}
465475
}
@@ -488,7 +498,10 @@ mod tests {
488498

489499
// Check first event
490500
let first_event = &result[0];
491-
assert!(first_event.contains_key("event_time_unix_nano"), "Should contain timestamp");
501+
assert!(
502+
first_event.contains_key("event_time_unix_nano"),
503+
"Should contain timestamp"
504+
);
492505
assert_eq!(
493506
first_event.get("event_name").unwrap(),
494507
&Value::String("request.start".to_string()),
@@ -499,7 +512,10 @@ mod tests {
499512
&Value::Number(2.into()),
500513
"Dropped attributes count should be preserved"
501514
);
502-
assert!(first_event.contains_key("service.name"), "Should contain flattened attributes");
515+
assert!(
516+
first_event.contains_key("service.name"),
517+
"Should contain flattened attributes"
518+
);
503519

504520
// Check second event
505521
let second_event = &result[1];
@@ -518,16 +534,14 @@ mod tests {
518534
#[test]
519535
fn test_flatten_links_structure() {
520536
// Test that links are properly flattened with all expected fields
521-
let links = vec![
522-
Link {
523-
trace_id: sample_trace_id(),
524-
span_id: sample_span_id(),
525-
trace_state: "state1".to_string(),
526-
attributes: sample_attributes(),
527-
dropped_attributes_count: 1,
528-
flags: 0,
529-
},
530-
];
537+
let links = vec![Link {
538+
trace_id: sample_trace_id(),
539+
span_id: sample_span_id(),
540+
trace_state: "state1".to_string(),
541+
attributes: sample_attributes(),
542+
dropped_attributes_count: 1,
543+
flags: 0,
544+
}];
531545

532546
let result = flatten_links(&links);
533547

@@ -549,7 +563,10 @@ mod tests {
549563
&Value::Number(1.into()),
550564
"Dropped attributes count should be preserved"
551565
);
552-
assert!(link.contains_key("service.name"), "Should contain flattened attributes");
566+
assert!(
567+
link.contains_key("service.name"),
568+
"Should contain flattened attributes"
569+
);
553570
}
554571

555572
#[test]
@@ -611,12 +628,30 @@ mod tests {
611628
&Value::String("SPAN_KIND_SERVER".to_string()),
612629
"All records should contain span kind description"
613630
);
614-
assert!(record.contains_key("span_trace_id"), "Should contain trace ID");
615-
assert!(record.contains_key("span_span_id"), "Should contain span ID");
616-
assert!(record.contains_key("span_start_time_unix_nano"), "Should contain start time");
617-
assert!(record.contains_key("span_end_time_unix_nano"), "Should contain end time");
618-
assert!(record.contains_key("service.name"), "Should contain span attributes");
619-
assert!(record.contains_key("span_status_code"), "Should contain status");
631+
assert!(
632+
record.contains_key("span_trace_id"),
633+
"Should contain trace ID"
634+
);
635+
assert!(
636+
record.contains_key("span_span_id"),
637+
"Should contain span ID"
638+
);
639+
assert!(
640+
record.contains_key("span_start_time_unix_nano"),
641+
"Should contain start time"
642+
);
643+
assert!(
644+
record.contains_key("span_end_time_unix_nano"),
645+
"Should contain end time"
646+
);
647+
assert!(
648+
record.contains_key("service.name"),
649+
"Should contain span attributes"
650+
);
651+
assert!(
652+
record.contains_key("span_status_code"),
653+
"Should contain status"
654+
);
620655
}
621656

622657
// One record should be an event, one should be a link
@@ -650,17 +685,30 @@ mod tests {
650685

651686
let result = flatten_span_record(&span);
652687

653-
assert_eq!(result.len(), 1, "Should have exactly one record for span without events/links");
688+
assert_eq!(
689+
result.len(),
690+
1,
691+
"Should have exactly one record for span without events/links"
692+
);
654693

655694
let record = &result[0];
656695
assert_eq!(
657696
record.get("span_name").unwrap(),
658697
&Value::String("simple-span".to_string()),
659698
"Should contain span name"
660699
);
661-
assert!(!record.contains_key("event_name"), "Should not contain event fields");
662-
assert!(!record.contains_key("link_trace_id"), "Should not contain link fields");
663-
assert!(!record.contains_key("span_status_code"), "Should not contain status when none provided");
700+
assert!(
701+
!record.contains_key("event_name"),
702+
"Should not contain event fields"
703+
);
704+
assert!(
705+
!record.contains_key("link_trace_id"),
706+
"Should not contain link fields"
707+
);
708+
assert!(
709+
!record.contains_key("span_status_code"),
710+
"Should not contain status when none provided"
711+
);
664712
}
665713

666714
#[test]
@@ -705,10 +753,16 @@ mod tests {
705753
assert_eq!(hex_span_id, "12345678", "Span ID should be lowercase hex");
706754
}
707755
if let Some(Value::String(hex_parent_span_id)) = record.get("span_parent_span_id") {
708-
assert_eq!(hex_parent_span_id, "87654321", "Parent span ID should be lowercase hex");
756+
assert_eq!(
757+
hex_parent_span_id, "87654321",
758+
"Parent span ID should be lowercase hex"
759+
);
709760
}
710761
if let Some(Value::String(link_trace_id)) = record.get("link_trace_id") {
711-
assert_eq!(link_trace_id, "ffabcdef", "Link trace ID should be lowercase hex");
762+
assert_eq!(
763+
link_trace_id, "ffabcdef",
764+
"Link trace ID should be lowercase hex"
765+
);
712766
}
713767
}
714768
}
@@ -823,15 +877,36 @@ mod tests {
823877
fn test_known_field_list_completeness() {
824878
// Test that the OTEL_TRACES_KNOWN_FIELD_LIST contains all expected fields
825879
let expected_fields = [
826-
"scope_name", "scope_version", "scope_schema_url", "scope_dropped_attributes_count",
827-
"resource_schema_url", "resource_dropped_attributes_count",
828-
"span_trace_id", "span_span_id", "span_name", "span_parent_span_id", "name",
829-
"span_kind", "span_kind_description", "span_start_time_unix_nano", "span_end_time_unix_nano",
830-
"event_name", "event_time_unix_nano", "event_dropped_attributes_count",
831-
"link_span_id", "link_trace_id", "link_dropped_attributes_count",
832-
"span_dropped_events_count", "span_dropped_links_count", "span_dropped_attributes_count",
833-
"span_trace_state", "span_flags", "span_flags_description",
834-
"span_status_code", "span_status_description", "span_status_message",
880+
"scope_name",
881+
"scope_version",
882+
"scope_schema_url",
883+
"scope_dropped_attributes_count",
884+
"resource_schema_url",
885+
"resource_dropped_attributes_count",
886+
"span_trace_id",
887+
"span_span_id",
888+
"span_name",
889+
"span_parent_span_id",
890+
"name",
891+
"span_kind",
892+
"span_kind_description",
893+
"span_start_time_unix_nano",
894+
"span_end_time_unix_nano",
895+
"event_name",
896+
"event_time_unix_nano",
897+
"event_dropped_attributes_count",
898+
"link_span_id",
899+
"link_trace_id",
900+
"link_dropped_attributes_count",
901+
"span_dropped_events_count",
902+
"span_dropped_links_count",
903+
"span_dropped_attributes_count",
904+
"span_trace_state",
905+
"span_flags",
906+
"span_flags_description",
907+
"span_status_code",
908+
"span_status_description",
909+
"span_status_message",
835910
];
836911

837912
assert_eq!(

src/parseable/streams.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,7 @@ use crate::{
5252
format::{LogSource, LogSourceEntry},
5353
DEFAULT_TIMESTAMP_KEY,
5454
},
55-
handlers::http::{
56-
cluster::INTERNAL_STREAM_NAME, ingest::PostError,
57-
modal::utils::ingest_utils::flatten_and_push_logs,
58-
},
55+
handlers::http::{ingest::PostError, modal::utils::ingest_utils::flatten_and_push_logs},
5956
metadata::{LogStreamMetadata, SchemaVersion},
6057
metrics,
6158
option::Mode,
@@ -73,7 +70,7 @@ use super::{
7370
},
7471
LogStream, ARROW_FILE_EXTENSION,
7572
};
76-
73+
const DATASET_STATS_STREAM_NAME: &str = "pstats";
7774
const MAX_CONCURRENT_FIELD_STATS: usize = 10;
7875

7976
#[derive(Serialize, Debug)]
@@ -790,11 +787,10 @@ impl Stream {
790787
record_batches: Vec<RecordBatch>,
791788
schema: Arc<Schema>,
792789
) -> Result<(), PostError> {
793-
let stats_dataset_name = format!("dataset_{INTERNAL_STREAM_NAME}");
794790
let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new());
795791
PARSEABLE
796792
.create_stream_if_not_exists(
797-
&stats_dataset_name,
793+
DATASET_STATS_STREAM_NAME,
798794
StreamType::Internal,
799795
vec![log_source_entry],
800796
)
@@ -818,7 +814,7 @@ impl Stream {
818814

819815
flatten_and_push_logs(
820816
stats_value,
821-
&stats_dataset_name,
817+
DATASET_STATS_STREAM_NAME,
822818
&LogSource::Json,
823819
&HashMap::new(),
824820
)
@@ -946,7 +942,7 @@ impl Stream {
946942
field_name: &str,
947943
) -> Vec<DistinctStat> {
948944
let sql = format!(
949-
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit {}",
945+
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}",
950946
PARSEABLE.options.max_field_statistics
951947
);
952948
let mut distinct_stats = Vec::new();

src/prism/home/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
3434
parseable::PARSEABLE,
3535
rbac::{map::SessionKey, role::Action, Users},
36-
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
36+
storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
3737
users::{dashboards::DASHBOARDS, filters::FILTERS},
3838
};
3939

@@ -88,7 +88,10 @@ pub struct HomeSearchResponse {
8888
resources: Vec<Resource>,
8989
}
9090

91-
pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, PrismHomeError> {
91+
pub async fn generate_home_response(
92+
key: &SessionKey,
93+
include_internal: bool,
94+
) -> Result<HomeResponse, PrismHomeError> {
9295
// Execute these operations concurrently
9396
let (stream_titles_result, alerts_info_result) =
9497
tokio::join!(get_stream_titles(key), get_alerts_info());
@@ -120,6 +123,14 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
120123
for result in stream_metadata_results {
121124
match result {
122125
Ok((stream, metadata, dataset_type)) => {
126+
// Skip internal streams if the flag is false
127+
if !include_internal
128+
&& metadata
129+
.iter()
130+
.any(|m| m.stream_type == StreamType::Internal)
131+
{
132+
continue;
133+
}
123134
stream_wise_stream_json.insert(stream.clone(), metadata);
124135
datasets.push(DataSet {
125136
title: stream,

0 commit comments

Comments
 (0)