diff --git a/CHANGELOG.md b/CHANGELOG.md index e8e228c37c..3219f0d4b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - Bump the revision of `sysinfo` to the revision at `15b3be3273ba286740122fed7bb7dccd2a79dc8f`. ([#4613](https://github.com/getsentry/relay/pull/4613)) - Switch the processor and store to `async`. ([#4552](https://github.com/getsentry/relay/pull/4552)) - Validate the spooling memory configuration on startup. ([#4617](https://github.com/getsentry/relay/pull/4617)) +- Rework currently unused 'log' protocol / envelope item type schema. ([#4592](https://github.com/getsentry/relay/pull/4592)) - Improve descriptiveness of autoscaling metric name. ([#4629](https://github.com/getsentry/relay/pull/4629)) ## 25.3.0 diff --git a/relay-event-schema/src/processor/traits.rs b/relay-event-schema/src/processor/traits.rs index 9c5491872d..99f1fa55a7 100644 --- a/relay-event-schema/src/processor/traits.rs +++ b/relay-event-schema/src/processor/traits.rs @@ -113,7 +113,6 @@ pub trait Processor: Sized { process_method!(process_trace_context, crate::protocol::TraceContext); process_method!(process_native_image_path, crate::protocol::NativeImagePath); process_method!(process_contexts, crate::protocol::Contexts); - process_method!(process_attribute_value, crate::protocol::AttributeValue); fn process_other( &mut self, diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs index 753493e2cd..7e2fc74de8 100644 --- a/relay-event-schema/src/protocol/ourlog.rs +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -1,42 +1,29 @@ -use relay_protocol::{ - Annotated, Empty, Error, FromValue, IntoValue, Object, SkipSerialization, Value, -}; +use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, SkipSerialization, Value}; +use std::fmt::{self, Display}; -use serde::ser::SerializeMap; +use serde::{Serialize, Serializer}; use crate::processor::ProcessValue; -use crate::protocol::{SpanId, TraceId}; +use crate::protocol::{SpanId, Timestamp, TraceId}; #[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] #[metastructure(process_func = "process_ourlog", value_type = "OurLog")] pub struct OurLog { - /// Time when the event occurred. - #[metastructure(required = true, trim = false)] - pub timestamp_nanos: Annotated, - - /// Time when the event was observed. - #[metastructure(required = true, trim = false)] - pub observed_timestamp_nanos: Annotated, + /// Timestamp when the log was created. + #[metastructure(required = true)] + pub timestamp: Annotated, /// The ID of the trace the log belongs to. - #[metastructure(required = false, trim = false)] + #[metastructure(required = true, trim = false)] pub trace_id: Annotated, - /// The Span id. - /// + + /// The Span this log entry belongs to. #[metastructure(required = false, trim = false)] pub span_id: Annotated, - /// Trace flag bitfield. - #[metastructure(required = false)] - pub trace_flags: Annotated, - - /// This is the original string representation of the severity as it is known at the source - #[metastructure(required = false, max_chars = 32, pii = "true", trim = false)] - pub severity_text: Annotated, - - /// Numerical representation of the severity level - #[metastructure(required = false)] - pub severity_number: Annotated, + /// The log level. + #[metastructure(required = true)] + pub level: Annotated, /// Log body. #[metastructure(required = true, pii = "true", trim = false)] @@ -44,50 +31,137 @@ pub struct OurLog { /// Arbitrary attributes on a log. #[metastructure(pii = "true", trim = false)] - pub attributes: Annotated>, + pub attributes: Annotated>, /// Additional arbitrary fields for forwards compatibility. - #[metastructure(additional_properties, retain = true, pii = "maybe", trim = false)] + #[metastructure(additional_properties, retain = true, pii = "maybe")] pub other: Object, } -#[derive(Debug, Clone, PartialEq, ProcessValue)] -pub enum AttributeValue { - #[metastructure(field = "string_value", pii = "true")] - StringValue(String), - #[metastructure(field = "int_value", pii = "true")] - IntValue(i64), - #[metastructure(field = "double_value", pii = "true")] - DoubleValue(f64), - #[metastructure(field = "bool_value", pii = "true")] - BoolValue(bool), - /// Any other unknown attribute value. - /// - /// This exists to ensure other attribute values such as array and object can be added in the future. +impl OurLog { + pub fn attribute(&self, key: &str) -> Option> { + Some( + self.attributes + .value()? + .get(key)? + .value()? + .value + .clone() + .value, + ) + } +} + +#[derive(Clone, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] +pub struct OurLogAttribute { + #[metastructure(flatten)] + pub value: OurLogAttributeValue, + + /// Additional arbitrary fields for forwards compatibility. + #[metastructure(additional_properties)] + pub other: Object, +} + +impl OurLogAttribute { + pub fn new(attribute_type: OurLogAttributeType, value: Value) -> Self { + Self { + value: OurLogAttributeValue { + ty: Annotated::new(attribute_type), + value: Annotated::new(value), + }, + other: Object::new(), + } + } +} + +impl fmt::Debug for OurLogAttribute { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OurLogAttribute") + .field("value", &self.value.value) + .field("type", &self.value.ty) + .field("other", &self.other) + .finish() + } +} + +#[derive(Debug, Clone, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] +pub struct OurLogAttributeValue { + #[metastructure(field = "type", required = true, trim = false)] + pub ty: Annotated, + #[metastructure(required = true, pii = "true")] + pub value: Annotated, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OurLogAttributeType { + Boolean, + Integer, + Double, + String, Unknown(String), } -impl IntoValue for AttributeValue { - fn into_value(self) -> Value { - let mut map = Object::new(); +impl ProcessValue for OurLogAttributeType {} + +impl OurLogAttributeType { + pub fn as_str(&self) -> &str { match self { - AttributeValue::StringValue(v) => { - map.insert("string_value".to_string(), Annotated::new(Value::String(v))); - } - AttributeValue::IntValue(v) => { - map.insert("int_value".to_string(), Annotated::new(Value::I64(v))); - } - AttributeValue::DoubleValue(v) => { - map.insert("double_value".to_string(), Annotated::new(Value::F64(v))); - } - AttributeValue::BoolValue(v) => { - map.insert("bool_value".to_string(), Annotated::new(Value::Bool(v))); - } - AttributeValue::Unknown(v) => { - map.insert("unknown".to_string(), Annotated::new(Value::String(v))); - } + Self::Boolean => "boolean", + Self::Integer => "integer", + Self::Double => "double", + Self::String => "string", + Self::Unknown(value) => value, } - Value::Object(map) + } + + pub fn unknown_string() -> String { + "unknown".to_string() + } +} + +impl fmt::Display for OurLogAttributeType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +impl From for OurLogAttributeType { + fn from(value: String) -> Self { + match value.as_str() { + "boolean" => Self::Boolean, + "integer" => Self::Integer, + "double" => Self::Double, + "string" => Self::String, + _ => Self::Unknown(value), + } + } +} + +impl Empty for OurLogAttributeType { + #[inline] + fn is_empty(&self) -> bool { + false + } +} + +impl FromValue for OurLogAttributeType { + fn from_value(value: Annotated) -> Annotated { + match String::from_value(value) { + Annotated(Some(value), meta) => Annotated(Some(value.into()), meta), + Annotated(None, meta) => Annotated(None, meta), + } + } +} + +impl IntoValue for OurLogAttributeType { + fn into_value(self) -> Value + where + Self: Sized, + { + Value::String(match self { + Self::Unknown(s) => s, + s => s.to_string(), + }) } fn serialize_payload(&self, s: S, _behavior: SkipSerialization) -> Result @@ -95,149 +169,275 @@ impl IntoValue for AttributeValue { Self: Sized, S: serde::Serializer, { - let mut map = s.serialize_map(None)?; - match self { - AttributeValue::StringValue(v) => { - map.serialize_entry("string_value", v)?; - } - AttributeValue::IntValue(v) => { - map.serialize_entry("int_value", v)?; - } - AttributeValue::DoubleValue(v) => { - map.serialize_entry("double_value", v)?; - } - AttributeValue::BoolValue(v) => { - map.serialize_entry("bool_value", v)?; - } - AttributeValue::Unknown(v) => { - map.serialize_entry("unknown", v)?; - } - } - map.end() + serde::ser::Serialize::serialize(self.as_str(), s) } } -impl AttributeValue { - pub fn string_value(&self) -> Option<&String> { +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum OurLogLevel { + Trace, + Debug, + Info, + Warn, + Error, + Fatal, + /// Unknown status, for forward compatibility. + Unknown(String), +} + +impl OurLogLevel { + fn as_str(&self) -> &str { match self { - AttributeValue::StringValue(s) => Some(s), - _ => None, + OurLogLevel::Trace => "trace", + OurLogLevel::Debug => "debug", + OurLogLevel::Info => "info", + OurLogLevel::Warn => "warn", + OurLogLevel::Error => "error", + OurLogLevel::Fatal => "fatal", + OurLogLevel::Unknown(s) => s.as_str(), } } - pub fn int_value(&self) -> Option { - match self { - AttributeValue::IntValue(i) => Some(*i), - _ => None, - } +} + +impl Display for OurLogLevel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) } - pub fn double_value(&self) -> Option { - match self { - AttributeValue::DoubleValue(d) => Some(*d), - _ => None, +} + +impl From for OurLogLevel { + fn from(value: String) -> Self { + match value.as_str() { + "trace" => OurLogLevel::Trace, + "debug" => OurLogLevel::Debug, + "info" => OurLogLevel::Info, + "warn" => OurLogLevel::Warn, + "error" => OurLogLevel::Error, + "fatal" => OurLogLevel::Fatal, + _ => OurLogLevel::Unknown(value), } } - pub fn bool_value(&self) -> Option { - match self { - AttributeValue::BoolValue(b) => Some(*b), - _ => None, +} + +impl FromValue for OurLogLevel { + fn from_value(value: Annotated) -> Annotated { + match String::from_value(value) { + Annotated(Some(value), meta) => Annotated(Some(value.into()), meta), + Annotated(None, meta) => Annotated(None, meta), } } } -impl Empty for AttributeValue { - #[inline] - fn is_empty(&self) -> bool { - matches!(self, Self::Unknown(_)) +impl IntoValue for OurLogLevel { + fn into_value(self) -> Value { + Value::String(self.to_string()) + } + + fn serialize_payload(&self, s: S, _behavior: SkipSerialization) -> Result + where + Self: Sized, + S: Serializer, + { + Serialize::serialize(self.as_str(), s) } } -impl FromValue for AttributeValue { - fn from_value(value: Annotated) -> Annotated { - match value { - Annotated(Some(Value::String(value)), meta) => { - Annotated(Some(AttributeValue::StringValue(value)), meta) - } - Annotated(Some(Value::I64(value)), meta) => { - Annotated(Some(AttributeValue::IntValue(value)), meta) - } - Annotated(Some(Value::F64(value)), meta) => { - Annotated(Some(AttributeValue::DoubleValue(value)), meta) - } - Annotated(Some(Value::Bool(value)), meta) => { - Annotated(Some(AttributeValue::BoolValue(value)), meta) - } - Annotated(Some(value), mut meta) => { - meta.add_error(Error::expected( - "a valid attribute value (string, int, double, bool)", - )); - meta.set_original_value(Some(value)); - Annotated(None, meta) - } - Annotated(None, meta) => Annotated(None, meta), - } +impl ProcessValue for OurLogLevel {} + +impl Empty for OurLogLevel { + #[inline] + fn is_empty(&self) -> bool { + false } } #[cfg(test)] mod tests { use super::*; + use relay_protocol::SerializableAnnotated; #[test] fn test_ourlog_serialization() { let json = r#"{ - "timestamp_nanos": 1544712660300000000, - "observed_timestamp_nanos": 1544712660300000000, - "trace_id": "5b8efff798038103d269b633813fc60c", - "span_id": "eee19b7ec3c1b174", - "severity_text": "Information", - "severity_number": 10, - "body": "Example log record", - "attributes": { - "boolean.attribute": { - "bool_value": true - }, - "double.attribute": { - "double_value": 637.704 - }, - "int.attribute": { - "int_value": 10 - }, - "string.attribute": { - "string_value": "some string" - } - } -}"#; - - let mut attributes = Object::new(); - attributes.insert( - "string.attribute".into(), - Annotated::new(AttributeValue::StringValue("some string".into())), - ); - attributes.insert( - "boolean.attribute".into(), - Annotated::new(AttributeValue::BoolValue(true)), - ); - attributes.insert( - "int.attribute".into(), - Annotated::new(AttributeValue::IntValue(10)), - ); - attributes.insert( - "double.attribute".into(), - Annotated::new(AttributeValue::DoubleValue(637.704)), - ); - - let log = Annotated::new(OurLog { - timestamp_nanos: Annotated::new(1544712660300000000), - observed_timestamp_nanos: Annotated::new(1544712660300000000), - severity_number: Annotated::new(10), - severity_text: Annotated::new("Information".to_string()), - trace_id: Annotated::new(TraceId("5b8efff798038103d269b633813fc60c".into())), - span_id: Annotated::new(SpanId("eee19b7ec3c1b174".into())), - body: Annotated::new("Example log record".to_string()), - attributes: Annotated::new(attributes), - ..Default::default() - }); - - assert_eq!(json, log.to_json_pretty().unwrap()); + "timestamp": 1544719860.0, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b174", + "level": "info", + "body": "Example log record", + "attributes": { + "boolean.attribute": { + "value": true, + "type": "boolean" + }, + "double.attribute": { + "value": 1.23, + "type": "double" + }, + "string.attribute": { + "value": "some string", + "type": "string" + }, + "sentry.severity_text": { + "value": "info", + "type": "string" + }, + "sentry.severity_number": { + "value": "10", + "type": "integer" + }, + "sentry.observed_timestamp_nanos": { + "value": "1544712660300000000", + "type": "integer" + }, + "sentry.trace_flags": { + "value": "10", + "type": "integer" + } + } + }"#; + + let data = Annotated::::from_json(json).unwrap(); + insta::assert_debug_snapshot!(data, @r###" + OurLog { + timestamp: Timestamp( + 2018-12-13T16:51:00Z, + ), + trace_id: TraceId( + "5b8efff798038103d269b633813fc60c", + ), + span_id: SpanId( + "eee19b7ec3c1b174", + ), + level: Info, + body: "Example log record", + attributes: { + "boolean.attribute": OurLogAttribute { + value: Bool( + true, + ), + type: Boolean, + other: {}, + }, + "double.attribute": OurLogAttribute { + value: F64( + 1.23, + ), + type: Double, + other: {}, + }, + "sentry.observed_timestamp_nanos": OurLogAttribute { + value: String( + "1544712660300000000", + ), + type: Integer, + other: {}, + }, + "sentry.severity_number": OurLogAttribute { + value: String( + "10", + ), + type: Integer, + other: {}, + }, + "sentry.severity_text": OurLogAttribute { + value: String( + "info", + ), + type: String, + other: {}, + }, + "sentry.trace_flags": OurLogAttribute { + value: String( + "10", + ), + type: Integer, + other: {}, + }, + "string.attribute": OurLogAttribute { + value: String( + "some string", + ), + type: String, + other: {}, + }, + }, + other: {}, + } + "###); + + insta::assert_json_snapshot!(SerializableAnnotated(&data), @r###" + { + "timestamp": 1544719860.0, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b174", + "level": "info", + "body": "Example log record", + "attributes": { + "boolean.attribute": { + "type": "boolean", + "value": true + }, + "double.attribute": { + "type": "double", + "value": 1.23 + }, + "sentry.observed_timestamp_nanos": { + "type": "integer", + "value": "1544712660300000000" + }, + "sentry.severity_number": { + "type": "integer", + "value": "10" + }, + "sentry.severity_text": { + "type": "string", + "value": "info" + }, + "sentry.trace_flags": { + "type": "integer", + "value": "10" + }, + "string.attribute": { + "type": "string", + "value": "some string" + } + } + } + "###); + } + + #[test] + fn test_invalid_int_attribute() { + let json = r#"{ + "timestamp": 1544719860.0, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b174", + "level": "info", + "body": "Example log record", + "attributes": { + "sentry.severity_number": { + "value": 10, + "type": "integer" + } + } + }"#; + + let data = Annotated::::from_json(json).unwrap(); + + insta::assert_json_snapshot!(SerializableAnnotated(&data), @r###" + { + "timestamp": 1544719860.0, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b174", + "level": "info", + "body": "Example log record", + "attributes": { + "sentry.severity_number": { + "type": "integer", + "value": 10 + } + } + } + "###); } } diff --git a/relay-ourlogs/src/lib.rs b/relay-ourlogs/src/lib.rs index 38130a4a4c..8498ba09e0 100644 --- a/relay-ourlogs/src/lib.rs +++ b/relay-ourlogs/src/lib.rs @@ -7,7 +7,7 @@ )] pub use crate::ourlog::otel_to_sentry_log; - +pub use crate::ourlog::ourlog_merge_otel; pub use opentelemetry_proto::tonic::logs::v1::LogRecord as OtelLog; mod ourlog; diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs index c6f2fafd30..9baca62aa3 100644 --- a/relay-ourlogs/src/ourlog.rs +++ b/relay-ourlogs/src/ourlog.rs @@ -1,9 +1,41 @@ +use chrono::{TimeZone, Utc}; use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; use crate::OtelLog; use relay_common::time::UnixTimestamp; -use relay_event_schema::protocol::{AttributeValue, OurLog, SpanId, TraceId}; -use relay_protocol::{Annotated, Object}; +use relay_event_schema::protocol::{ + OurLog, OurLogAttribute, OurLogAttributeType, OurLogLevel, SpanId, Timestamp, TraceId, +}; +use relay_protocol::{Annotated, Object, Value}; + +fn otel_value_to_log_attribute(value: OtelValue) -> Option { + match value { + OtelValue::BoolValue(v) => Some(OurLogAttribute::new( + OurLogAttributeType::Boolean, + Value::Bool(v), + )), + OtelValue::DoubleValue(v) => Some(OurLogAttribute::new( + OurLogAttributeType::Double, + Value::F64(v), + )), + OtelValue::IntValue(v) => Some(OurLogAttribute::new( + OurLogAttributeType::Integer, + Value::I64(v), + )), + OtelValue::StringValue(v) => Some(OurLogAttribute::new( + OurLogAttributeType::String, + Value::String(v), + )), + OtelValue::BytesValue(v) => String::from_utf8(v).map_or(None, |str| { + Some(OurLogAttribute::new( + OurLogAttributeType::String, + Value::String(str), + )) + }), + OtelValue::ArrayValue(_) => None, + OtelValue::KvlistValue(_) => None, + } +} /// Transform an OtelLog to a Sentry log. pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { @@ -14,11 +46,14 @@ pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { attributes, trace_id, span_id, + time_unix_nano, .. } = otel_log; - let span_id = hex::encode(span_id); - let trace_id = hex::encode(trace_id); + let span_id = SpanId(hex::encode(span_id)); + let trace_id = TraceId(hex::encode(trace_id)); + let nanos = time_unix_nano; + let timestamp = Utc.timestamp_nanos(nanos as i64); let body = body .and_then(|v| v.value) @@ -34,54 +69,224 @@ pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { // We may change this in the future with forwarding Relays. let observed_time_unix_nano = UnixTimestamp::now().as_nanos(); + attribute_data.insert( + "sentry.severity_text".to_string(), + Annotated::new(OurLogAttribute::new( + OurLogAttributeType::String, + Value::String(severity_text.clone()), + )), + ); + attribute_data.insert( + "sentry.severity_number".to_string(), + Annotated::new(OurLogAttribute::new( + OurLogAttributeType::Integer, + Value::I64(severity_number as i64), + )), + ); + attribute_data.insert( + "sentry.timestamp_nanos".to_string(), + Annotated::new(OurLogAttribute::new( + OurLogAttributeType::String, + Value::String(time_unix_nano.to_string()), + )), + ); + attribute_data.insert( + "sentry.observed_timestamp_nanos".to_string(), + Annotated::new(OurLogAttribute::new( + OurLogAttributeType::String, + Value::String(observed_time_unix_nano.to_string()), + )), + ); + attribute_data.insert( + "sentry.trace_flags".to_string(), + Annotated::new(OurLogAttribute::new( + OurLogAttributeType::Integer, + Value::I64(0), + )), + ); + for attribute in attributes.into_iter() { if let Some(value) = attribute.value.and_then(|v| v.value) { let key = attribute.key; - match value { - OtelValue::ArrayValue(_) => {} - OtelValue::BoolValue(v) => { - attribute_data.insert(key, Annotated::new(AttributeValue::BoolValue(v))); - } - OtelValue::BytesValue(v) => { - if let Ok(v) = String::from_utf8(v) { - attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); - } - } - OtelValue::DoubleValue(v) => { - attribute_data.insert(key, Annotated::new(AttributeValue::DoubleValue(v))); - } - OtelValue::IntValue(v) => { - attribute_data.insert(key, Annotated::new(AttributeValue::IntValue(v))); - } - OtelValue::KvlistValue(_) => {} - OtelValue::StringValue(v) => { - attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); - } + if let Some(v) = otel_value_to_log_attribute(value) { + attribute_data.insert(key, Annotated::new(v)); } } } + // Map severity_number to OurLogLevel, falling back to severity_text if it's not a number. + // Finally default to Info if severity_number is not in range and severity_text is not a valid + // log level. + let level = match severity_number { + 1..=4 => OurLogLevel::Trace, + 5..=8 => OurLogLevel::Debug, + 9..=12 => OurLogLevel::Info, + 13..=16 => OurLogLevel::Warn, + 17..=20 => OurLogLevel::Error, + 21..=24 => OurLogLevel::Fatal, + _ => match severity_text.as_str() { + "trace" => OurLogLevel::Trace, + "debug" => OurLogLevel::Debug, + "info" => OurLogLevel::Info, + "warn" => OurLogLevel::Warn, + "error" => OurLogLevel::Error, + "fatal" => OurLogLevel::Fatal, + _ => OurLogLevel::Info, + }, + }; + + let mut other = Object::default(); + other.insert( + "severity_text".to_string(), + Annotated::new(Value::String(severity_text)), + ); + other.insert( + "severity_number".to_string(), + Annotated::new(Value::I64(severity_number as i64)), + ); + other.insert("trace_flags".to_string(), Annotated::new(Value::I64(0))); + other.insert( + "timestamp_nanos".to_string(), + Annotated::new(Value::U64(otel_log.time_unix_nano)), + ); + other.insert( + "observed_timestamp_nanos".to_string(), + Annotated::new(Value::U64(observed_time_unix_nano)), + ); + OurLog { - timestamp_nanos: Annotated::new(otel_log.time_unix_nano), - observed_timestamp_nanos: Annotated::new(observed_time_unix_nano), - trace_id: TraceId(trace_id).into(), - span_id: Annotated::new(SpanId(span_id)), - trace_flags: Annotated::new(0), - severity_text: severity_text.into(), - severity_number: Annotated::new(severity_number as i64), - attributes: attribute_data.into(), + timestamp: Annotated::new(Timestamp(timestamp)), + trace_id: Annotated::new(trace_id), + span_id: Annotated::new(span_id), + level: Annotated::new(level), + attributes: Annotated::new(attribute_data), body: Annotated::new(body), - ..Default::default() + other, + } +} + +/// This fills attributes with OTel specific fields to be compatible with the otel schema. +/// +/// This also currently backfills data into deprecated fields (other) on the OurLog protocol in order to continue working with the snuba consumers. +/// +/// This will need to transform all fields into attributes to be ported to using the generic trace items consumers once they're done. +pub fn ourlog_merge_otel(ourlog: Annotated) -> Annotated { + let mut ourlog = ourlog; + if let Some(ourlog_value) = ourlog.value_mut() { + let attributes = ourlog_value.attributes.value_mut().get_or_insert_default(); + attributes.insert( + "sentry.severity_number".to_string(), + Annotated::new(OurLogAttribute::new( + OurLogAttributeType::Integer, + Value::I64(level_to_otel_severity_number( + ourlog_value.level.value().cloned(), + )), + )), + ); + attributes.insert( + "sentry.severity_text".to_owned(), + Annotated::new(OurLogAttribute::new( + OurLogAttributeType::String, + Value::String( + ourlog_value + .level + .value() + .map(|level| level.to_string()) + .unwrap_or_else(|| "info".to_string()), + ), + )), + ); + + if let Some(value) = ourlog_value.attribute("sentry.severity_text") { + if let Some(s) = value.as_str() { + ourlog_value.other.insert( + "severity_text".to_string(), + Annotated::new(Value::String(s.to_string())), + ); + } + } + + if let Some(value) = ourlog_value.attribute("sentry.severity_number") { + if let Some(v) = value.value() { + ourlog_value + .other + .insert("severity_number".to_string(), Annotated::new(v.clone())); + } + } + + if let Some(value) = ourlog_value.attribute("sentry.trace_flags") { + if let Some(v) = value.value() { + ourlog_value + .other + .insert("trace_flags".to_string(), Annotated::new(v.clone())); + } + } + + if let Some(value) = ourlog_value.attribute("sentry.observed_timestamp_nanos") { + if let Some(s) = value.as_str() { + if let Ok(nanos) = s.parse::() { + ourlog_value.other.insert( + "observed_timestamp_nanos".to_string(), + Annotated::new(Value::U64(nanos)), + ); + } + } + } + + if let Some(value) = ourlog_value.attribute("sentry.timestamp_nanos") { + if let Some(s) = value.as_str() { + if let Ok(nanos) = s.parse::() { + ourlog_value.other.insert( + "timestamp_nanos".to_string(), + Annotated::new(Value::U64(nanos)), + ); + } + } + } + + // We ignore the passed observed time since Relay always acts as the collector in Sentry. + // We may change this in the future with forwarding Relays. + let observed_time_unix_nano = UnixTimestamp::now().as_nanos(); + ourlog_value.other.insert( + "observed_timestamp_nanos".to_string(), + Annotated::new(Value::U64(observed_time_unix_nano)), + ); + + if let Some(timestamp_nanos) = ourlog_value + .timestamp + .value() + .and_then(|timestamp| timestamp.0.timestamp_nanos_opt()) + { + ourlog_value.other.insert( + "timestamp_nanos".to_string(), + Annotated::new(Value::U64(timestamp_nanos as u64)), + ); + } + } + ourlog +} + +fn level_to_otel_severity_number(level: Option) -> i64 { + match level { + Some(OurLogLevel::Trace) => 1, + Some(OurLogLevel::Debug) => 5, + Some(OurLogLevel::Info) => 9, + Some(OurLogLevel::Warn) => 13, + Some(OurLogLevel::Error) => 17, + Some(OurLogLevel::Fatal) => 21, + // 0 is the default value. + // https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/68e1d6cd94bfca9bdf725327d4221f97ce0e0564/pkg/stanza/docs/types/severity.md + _ => 0, } } #[cfg(test)] mod tests { use super::*; - use relay_protocol::{get_path, get_value}; + use relay_protocol::{get_path, SerializableAnnotated}; #[test] - fn parse_log() { + fn parse_otel_log() { // https://github.com/open-telemetry/opentelemetry-proto/blob/c4214b8168d0ce2a5236185efb8a1c8950cccdd6/examples/logs.json let json = r#"{ "timeUnixNano": "1544712660300000000", @@ -161,7 +366,28 @@ mod tests { } #[test] - fn parse_log_with_db_attributes() { + fn parse_otellog_with_invalid_trace_id() { + let json = r#"{ + "timeUnixNano": "1544712660300000000", + "observedTimeUnixNano": "1544712660300000000", + "severityNumber": 10, + "severityText": "Information", + "traceId": "", + "spanId": "EEE19B7EC3C1B174" + }"#; + + let otel_log: OtelLog = serde_json::from_str(json).unwrap(); + let our_log = otel_to_sentry_log(otel_log); + let annotated_log: Annotated = Annotated::new(our_log); + + assert_eq!( + get_path!(annotated_log.trace_id), + Some(&Annotated::new(TraceId("".into()))) + ); + } + + #[test] + fn parse_otel_log_with_db_attributes() { let json = r#"{ "timeUnixNano": "1544712660300000000", "observedTimeUnixNano": "1544712660300000000", @@ -202,13 +428,18 @@ mod tests { Some(&Annotated::new("Database query executed".into())) ); assert_eq!( - get_value!(annotated_log.attributes["db.statement"]!).string_value(), - Some(&"SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into()) + annotated_log + .value() + .and_then(|v| v.attribute("db.statement")) + .unwrap() + .value() + .and_then(|v| v.as_str()), + Some("SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s") ); } #[test] - fn parse_log_without_observed_time() { + fn parse_otel_log_without_observed_time() { let json_without_observed_time = r#"{ "timeUnixNano": "1544712660300000000", "observedTimeUnixNano": "0", @@ -227,14 +458,19 @@ mod tests { let our_log: OurLog = otel_to_sentry_log(otel_log); let after_test = UnixTimestamp::now().as_nanos(); - let observed_time = our_log.observed_timestamp_nanos.value().unwrap(); - assert!(*observed_time > 0); - assert!(*observed_time >= before_test); - assert!(*observed_time <= after_test); + // Get the observed timestamp from attributes + let observed_timestamp = our_log + .attribute("sentry.observed_timestamp_nanos") + .and_then(|value| value.as_str().and_then(|s| s.parse::().ok())) + .unwrap_or(0); + + assert!(observed_timestamp > 0); + assert!(observed_timestamp >= before_test); + assert!(observed_timestamp <= after_test); } #[test] - fn parse_log_ignores_observed_time() { + fn parse_otel_log_ignores_observed_time() { let json_with_observed_time = r#"{ "timeUnixNano": "1544712660300000000", "observedTimeUnixNano": "1544712660300000000", @@ -253,14 +489,218 @@ mod tests { let our_log: OurLog = otel_to_sentry_log(otel_log); let after_test = UnixTimestamp::now().as_nanos(); - let observed_time = our_log.observed_timestamp_nanos.value().unwrap(); - assert!(*observed_time > 0); - assert!(*observed_time >= before_test); - assert!(*observed_time <= after_test); + // Get the observed timestamp from attributes + let observed_timestamp = our_log + .attribute("sentry.observed_timestamp_nanos") + .and_then(|value| value.as_str().and_then(|s| s.parse::().ok())) + .unwrap_or(0); + + assert!(observed_timestamp > 0); + assert!(observed_timestamp >= before_test); + assert!(observed_timestamp <= after_test); + assert_ne!(observed_timestamp, 1544712660300000000); + } + + #[test] + fn ourlog_merge_otel_log() { + let json = r#"{ + "timestamp": 946684800.0, + "level": "info", + "trace_id": "5B8EFFF798038103D269B633813FC60C", + "span_id": "EEE19B7EC3C1B174", + "body": "Example log record", + "attributes": { + "foo": { + "value": "9", + "type": "string" + } + } + }"#; + + let data = Annotated::::from_json(json).unwrap(); + let mut merged_log = ourlog_merge_otel(data); + if let Some(log) = merged_log.value_mut() { + log.other.insert( + "observed_timestamp_nanos".to_string(), + Annotated::new(Value::U64(1742481864000000000)), + ); + } + + insta::assert_debug_snapshot!(merged_log, @r###" + OurLog { + timestamp: Timestamp( + 2000-01-01T00:00:00Z, + ), + trace_id: TraceId( + "5b8efff798038103d269b633813fc60c", + ), + span_id: SpanId( + "eee19b7ec3c1b174", + ), + level: Info, + body: "Example log record", + attributes: { + "foo": OurLogAttribute { + value: String( + "9", + ), + type: String, + other: {}, + }, + "sentry.severity_number": OurLogAttribute { + value: I64( + 9, + ), + type: Integer, + other: {}, + }, + "sentry.severity_text": OurLogAttribute { + value: String( + "info", + ), + type: String, + other: {}, + }, + }, + other: { + "observed_timestamp_nanos": U64( + 1742481864000000000, + ), + "severity_number": I64( + 9, + ), + "severity_text": String( + "info", + ), + "timestamp_nanos": U64( + 946684800000000000, + ), + }, + } + "###); + } + + #[test] + fn ourlog_merge_otel_log_with_unknown_severity_number() { + let json = r#"{ + "timestamp": 946684800.0, + "level": "abc", + "trace_id": "5B8EFFF798038103D269B633813FC60C", + "span_id": "EEE19B7EC3C1B174", + "body": "Example log record", + "attributes": { + "foo": { + "value": "9", + "type": "string" + } + } + }"#; - assert_ne!( - our_log.observed_timestamp_nanos, - Annotated::new(1544712660300000000) + let data = Annotated::::from_json(json).unwrap(); + let merged_log = ourlog_merge_otel(data); + assert_eq!( + merged_log.value().unwrap().other.get("severity_number"), + Some(&Annotated::new(Value::I64(0))) ); } + + #[test] + #[allow(deprecated)] + fn ourlog_merge_otel_log_with_timestamp() { + let mut attributes = Object::new(); + attributes.insert( + "foo".to_string(), + Annotated::new(OurLogAttribute::new( + OurLogAttributeType::String, + Value::String("9".to_string()), + )), + ); + let datetime = Utc.with_ymd_and_hms(2021, 11, 29, 0, 0, 0).unwrap(); + let ourlog = OurLog { + timestamp: Annotated::new(Timestamp(datetime)), + attributes: Annotated::new(attributes), + ..Default::default() + }; + + let mut merged_log = ourlog_merge_otel(Annotated::new(ourlog)); + if let Some(log) = merged_log.value_mut() { + log.other.insert( + "observed_timestamp_nanos".to_string(), + Annotated::new(Value::U64(1742481864000000000)), + ); + } + + insta::assert_debug_snapshot!(merged_log, @r###" + OurLog { + timestamp: Timestamp( + 2021-11-29T00:00:00Z, + ), + trace_id: ~, + span_id: ~, + level: ~, + body: ~, + attributes: { + "foo": OurLogAttribute { + value: String( + "9", + ), + type: String, + other: {}, + }, + "sentry.severity_number": OurLogAttribute { + value: I64( + 0, + ), + type: Integer, + other: {}, + }, + "sentry.severity_text": OurLogAttribute { + value: String( + "info", + ), + type: String, + other: {}, + }, + }, + other: { + "observed_timestamp_nanos": U64( + 1742481864000000000, + ), + "severity_number": I64( + 0, + ), + "severity_text": String( + "info", + ), + "timestamp_nanos": U64( + 1638144000000000000, + ), + }, + } + "###); + + insta::assert_json_snapshot!(SerializableAnnotated(&merged_log), @r###" + { + "timestamp": 1638144000.0, + "attributes": { + "foo": { + "type": "string", + "value": "9" + }, + "sentry.severity_number": { + "type": "integer", + "value": 0 + }, + "sentry.severity_text": { + "type": "string", + "value": "info" + } + }, + "observed_timestamp_nanos": 1742481864000000000, + "severity_number": 0, + "severity_text": "info", + "timestamp_nanos": 1638144000000000000 + } + "###); + } } diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs index 6a8209a6ff..77d8208362 100644 --- a/relay-server/src/services/processor/ourlog.rs +++ b/relay-server/src/services/processor/ourlog.rs @@ -17,11 +17,12 @@ use { crate::services::outcome::{DiscardReason, Outcome}, crate::services::processor::ProcessingError, relay_dynamic_config::ProjectConfig, + relay_event_normalization::SchemaProcessor, relay_event_schema::processor::{process_value, ProcessingState}, - relay_event_schema::protocol::OurLog, + relay_event_schema::protocol::{OurLog, OurLogAttributeType}, relay_ourlogs::OtelLog, relay_pii::PiiProcessor, - relay_protocol::Annotated, + relay_protocol::{Annotated, ErrorKind, Value}, }; /// Removes logs from the envelope if the feature is not enabled. @@ -60,7 +61,7 @@ pub fn process(managed_envelope: &mut TypedEnvelope, project_info: Arc } }, ItemType::Log => match Annotated::::from_json_bytes(&item.payload()) { - Ok(our_log) => our_log, + Ok(our_log) => relay_ourlogs::ourlog_merge_otel(our_log), Err(err) => { relay_log::debug!("failed to parse Sentry Log: {}", err); return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidLog)); @@ -75,6 +76,11 @@ pub fn process(managed_envelope: &mut TypedEnvelope, project_info: Arc return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); } + if let Err(e) = normalize(&mut annotated_log) { + relay_log::debug!("failed to normalize log: {}", e); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); + }; + let mut new_item = Item::new(ItemType::Log); let payload = match annotated_log.to_json() { Ok(payload) => payload, @@ -91,6 +97,19 @@ pub fn process(managed_envelope: &mut TypedEnvelope, project_info: Arc }); } +#[cfg(feature = "processing")] +fn normalize(annotated_log: &mut Annotated) -> Result<(), ProcessingError> { + process_value(annotated_log, &mut SchemaProcessor, ProcessingState::root())?; + + let Some(log) = annotated_log.value_mut() else { + return Err(ProcessingError::NoEventPayload); + }; + + process_attribute_types(log); + + Ok(()) +} + #[cfg(feature = "processing")] fn scrub( annotated_log: &mut Annotated, @@ -112,6 +131,53 @@ fn scrub( Ok(()) } +#[cfg(feature = "processing")] +fn process_attribute_types(ourlog: &mut OurLog) { + let Some(attributes) = ourlog.attributes.value_mut() else { + return; + }; + + let attributes = attributes.iter_mut().map(|(_, attr)| attr); + + for attribute in attributes { + use OurLogAttributeType::*; + + let Some(inner) = attribute.value_mut() else { + continue; + }; + + match (&mut inner.value.ty, &mut inner.value.value) { + (Annotated(Some(Boolean), _), Annotated(Some(Value::Bool(_)), _)) => (), + (Annotated(Some(Integer), _), Annotated(Some(Value::I64(_)), _)) => (), + (Annotated(Some(Integer), _), Annotated(Some(Value::U64(_)), _)) => (), + (Annotated(Some(Double), _), Annotated(Some(Value::I64(_)), _)) => (), + (Annotated(Some(Double), _), Annotated(Some(Value::U64(_)), _)) => (), + (Annotated(Some(Double), _), Annotated(Some(Value::F64(_)), _)) => (), + (Annotated(Some(String), _), Annotated(Some(Value::String(_)), _)) => (), + // Note: currently the mapping to Kafka requires that invalid or unknown combinations + // of types and values are removed from the mapping. + // + // Usually Relay would only modify the offending values, but for now, until there + // is better support in the pipeline here, we need to remove the entire attribute. + (Annotated(Some(Unknown(_)), _), _) => { + let original = attribute.value_mut().take(); + attribute.meta_mut().add_error(ErrorKind::InvalidData); + attribute.meta_mut().set_original_value(original); + } + (Annotated(Some(_), _), Annotated(Some(_), _)) => { + let original = attribute.value_mut().take(); + attribute.meta_mut().add_error(ErrorKind::InvalidData); + attribute.meta_mut().set_original_value(original); + } + (Annotated(None, _), _) | (_, Annotated(None, _)) => { + let original = attribute.value_mut().take(); + attribute.meta_mut().add_error(ErrorKind::MissingAttribute); + attribute.meta_mut().set_original_value(original); + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -119,6 +185,7 @@ mod tests { use crate::services::processor::ProcessingGroup; use crate::utils::ManagedEnvelope; use bytes::Bytes; + use relay_dynamic_config::GlobalConfig; use relay_system::Addr; @@ -205,4 +272,246 @@ mod tests { managed_envelope.envelope() ); } + + #[test] + #[cfg(feature = "processing")] + fn test_process_attribute_types() { + let json = r#"{ + "timestamp": 1544719860.0, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b174", + "level": "info", + "body": "Test log message", + "attributes": { + "valid_bool": { + "type": "boolean", + "value": true + }, + "valid_int_i64": { + "type": "integer", + "value": -42 + }, + "valid_int_u64": { + "type": "integer", + "value": 42 + }, + "valid_int_from_string": { + "type": "integer", + "value": "42" + }, + "valid_double": { + "type": "double", + "value": 42.5 + }, + "double_with_i64": { + "type": "double", + "value": -42 + }, + "valid_double_with_u64": { + "type": "double", + "value": 42 + }, + "valid_string": { + "type": "string", + "value": "test" + }, + "valid_string_with_other": { + "type": "string", + "value": "test", + "some_other_field": "some_other_value" + }, + "unknown_type": { + "type": "custom", + "value": "test" + }, + "invalid_int_from_invalid_string": { + "type": "integer", + "value": "abc" + }, + "missing_type": { + "value": "value with missing type" + }, + "missing_value": { + "type": "string" + } + } + }"#; + + let mut data = Annotated::::from_json(json).unwrap(); + + if let Some(log) = data.value_mut() { + process_attribute_types(log); + } + + insta::assert_debug_snapshot!(data.value().unwrap().attributes, @r###" + { + "double_with_i64": OurLogAttribute { + value: I64( + -42, + ), + type: Double, + other: {}, + }, + "invalid_int_from_invalid_string": Meta { + remarks: [], + errors: [ + Error { + kind: InvalidData, + data: {}, + }, + ], + original_length: None, + original_value: Some( + Object( + { + "type": String( + "integer", + ), + "value": String( + "abc", + ), + }, + ), + ), + }, + "missing_type": Meta { + remarks: [], + errors: [ + Error { + kind: MissingAttribute, + data: {}, + }, + ], + original_length: None, + original_value: Some( + Object( + { + "type": ~, + "value": String( + "value with missing type", + ), + }, + ), + ), + }, + "missing_value": Meta { + remarks: [], + errors: [ + Error { + kind: MissingAttribute, + data: {}, + }, + ], + original_length: None, + original_value: Some( + Object( + { + "type": String( + "string", + ), + "value": ~, + }, + ), + ), + }, + "unknown_type": Meta { + remarks: [], + errors: [ + Error { + kind: InvalidData, + data: {}, + }, + ], + original_length: None, + original_value: Some( + Object( + { + "type": String( + "custom", + ), + "value": String( + "test", + ), + }, + ), + ), + }, + "valid_bool": OurLogAttribute { + value: Bool( + true, + ), + type: Boolean, + other: {}, + }, + "valid_double": OurLogAttribute { + value: F64( + 42.5, + ), + type: Double, + other: {}, + }, + "valid_double_with_u64": OurLogAttribute { + value: I64( + 42, + ), + type: Double, + other: {}, + }, + "valid_int_from_string": Meta { + remarks: [], + errors: [ + Error { + kind: InvalidData, + data: {}, + }, + ], + original_length: None, + original_value: Some( + Object( + { + "type": String( + "integer", + ), + "value": String( + "42", + ), + }, + ), + ), + }, + "valid_int_i64": OurLogAttribute { + value: I64( + -42, + ), + type: Integer, + other: {}, + }, + "valid_int_u64": OurLogAttribute { + value: I64( + 42, + ), + type: Integer, + other: {}, + }, + "valid_string": OurLogAttribute { + value: String( + "test", + ), + type: String, + other: {}, + }, + "valid_string_with_other": OurLogAttribute { + value: String( + "test", + ), + type: String, + other: { + "some_other_field": String( + "some_other_value", + ), + }, + }, + } + "###); + } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 7499a7c6dd..ffb29d0733 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -996,6 +996,7 @@ impl StoreService { log.project_id = scoping.project_id.value(); log.retention_days = retention_days; log.received = safe_timestamp(received_at); + let message = KafkaMessage::Log { headers: BTreeMap::from([("project_id".to_string(), scoping.project_id.to_string())]), message: log, @@ -1124,7 +1125,7 @@ where .serialize(serializer) } -pub fn serialize_btreemap_skip_nulls( +fn serialize_btreemap_skip_nulls( map: &Option>>, serializer: S, ) -> Result @@ -1143,6 +1144,66 @@ where m.end() } +fn serialize_log_attributes( + map: &Option>>, + serializer: S, +) -> Result +where + S: serde::Serializer, +{ + let Some(map) = map else { + return serializer.serialize_none(); + }; + let mut m = serializer.serialize_map(Some(map.len()))?; + for (key, value) in map.iter() { + if let Some(value) = value { + if let Some(LogAttributeValue::Unknown(_)) = value.value { + continue; + } + m.serialize_entry(key, value)?; + } + } + m.end() +} + +/** + * This shouldn't be necessary with enum serialization, but since serde's tag doesn't work with `type` as it has to be renamed due to being a keyword, + * this allows us to not emit the 'type' field, which would otherwise break the ourlogs consumer. + */ +fn serialize_log_attribute_value( + attr: &Option, + serializer: S, +) -> Result +where + S: serde::Serializer, +{ + let Some(attr) = attr else { + return serializer.serialize_none(); + }; + + if let LogAttributeValue::Unknown(_) = attr { + return serializer.serialize_none(); + } + + let mut map = serializer.serialize_map(Some(1))?; + match attr { + LogAttributeValue::String(value) => { + map.serialize_entry("string_value", value)?; + } + LogAttributeValue::Int(value) => { + map.serialize_entry("int_value", value)?; + } + LogAttributeValue::Bool(value) => { + map.serialize_entry("bool_value", value)?; + } + LogAttributeValue::Double(value) => { + map.serialize_entry("double_value", value)?; + } + LogAttributeValue::Unknown(_) => (), + } + map.end() +} + /// Container payload for event messages. #[derive(Debug, Serialize)] struct EventKafkaMessage { @@ -1394,6 +1455,29 @@ struct SpanKafkaMessage<'a> { platform: Cow<'a, str>, // We only use this for logging for now } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "type", content = "value")] +enum LogAttributeValue { + #[serde(rename = "string")] + String(String), + #[serde(rename = "boolean")] + Bool(bool), + #[serde(rename = "integer")] + Int(i64), + #[serde(rename = "double")] + Double(f64), + #[serde(rename = "unknown")] + Unknown(String), +} + +/// This is a temporary struct to convert the old attribute format to the new one. +#[derive(Debug, Serialize, Deserialize)] +#[allow(dead_code)] +struct LogAttribute { + #[serde(flatten, serialize_with = "serialize_log_attribute_value")] + value: Option, +} + #[derive(Debug, Deserialize, Serialize)] struct LogKafkaMessage<'a> { #[serde(default)] @@ -1417,8 +1501,12 @@ struct LogKafkaMessage<'a> { severity_text: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] severity_number: Option, - #[serde(default, skip_serializing_if = "none_or_empty_object")] - attributes: Option<&'a RawValue>, + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "serialize_log_attributes" + )] + attributes: Option>>, #[serde(default, skip_serializing_if = "Option::is_none")] trace_flags: Option, } diff --git a/tests/integration/test_ourlogs.py b/tests/integration/test_ourlogs.py index 31b738a1a6..01784fe629 100644 --- a/tests/integration/test_ourlogs.py +++ b/tests/integration/test_ourlogs.py @@ -14,7 +14,18 @@ } -def envelope_with_ourlogs(start: datetime, end: datetime) -> Envelope: +def envelope_with_sentry_logs(payload: dict) -> Envelope: + envelope = Envelope() + envelope.add_item( + Item( + type="log", + payload=PayloadRef(json=payload), + ) + ) + return envelope + + +def envelope_with_otel_logs(start: datetime) -> Envelope: envelope = Envelope() envelope.add_item( Item( @@ -23,7 +34,7 @@ def envelope_with_ourlogs(start: datetime, end: datetime) -> Envelope: bytes=json.dumps( { "timeUnixNano": str(int(start.timestamp() * 1e9)), - "observedTimeUnixNano": str(int(end.timestamp() * 1e9)), + "observedTimeUnixNano": str(int(start.timestamp() * 1e9)), "severityNumber": 10, "severityText": "Information", "traceId": "5B8EFFF798038103D269B633813FC60C", @@ -49,7 +60,7 @@ def envelope_with_ourlogs(start: datetime, end: datetime) -> Envelope: return envelope -def test_ourlog_extraction( +def test_ourlog_extraction_with_otel_logs( mini_sentry, relay_with_processing, ourlogs_consumer, @@ -63,42 +74,169 @@ def test_ourlog_extraction( relay = relay_with_processing(options=TEST_CONFIG) + start = datetime.now(timezone.utc) + duration = timedelta(milliseconds=500) now = datetime.now(timezone.utc) end = now - timedelta(seconds=1) start = end - duration - # Send OTel log and sentry log via envelope - envelope = envelope_with_ourlogs(start, end) + envelope = envelope_with_otel_logs(start) relay.send_envelope(project_id, envelope) ourlogs = ourlogs_consumer.get_ourlogs() - assert len(ourlogs) == 1 expected = { "organization_id": 1, "project_id": 42, "retention_days": 90, "timestamp_nanos": int(start.timestamp() * 1e9), - "observed_timestamp_nanos": time_within_delta( - start.timestamp(), expect_resolution="ns" - ), + "observed_timestamp_nanos": time_within_delta(start, expect_resolution="ns"), "trace_id": "5b8efff798038103d269b633813fc60c", "body": "Example log record", "trace_flags": 0, "span_id": "eee19b7ec3c1b174", "severity_text": "Information", "severity_number": 10, + "received": time_within_delta(start, expect_resolution="s"), "attributes": { "string.attribute": {"string_value": "some string"}, "boolean.attribute": {"bool_value": True}, "int.attribute": {"int_value": 10}, "double.attribute": {"double_value": 637.704}, + "sentry.severity_number": {"int_value": 10}, + "sentry.severity_text": {"string_value": "Information"}, + "sentry.timestamp_nanos": { + "string_value": str(int(start.timestamp() * 1e9)) + }, + "sentry.trace_flags": {"int_value": 0}, + }, + } + + del ourlogs[0]["attributes"]["sentry.observed_timestamp_nanos"] + + assert ourlogs == [expected] + + ourlogs_consumer.assert_empty() + + +def test_ourlog_extraction_with_sentry_logs( + mini_sentry, + relay_with_processing, + ourlogs_consumer, +): + ourlogs_consumer = ourlogs_consumer() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:ourlogs-ingestion", + ] + + relay = relay_with_processing(options=TEST_CONFIG) + + start = datetime.now(timezone.utc) + + envelope = envelope_with_sentry_logs( + { + "timestamp": start.timestamp(), + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b174", + "level": "info", + "body": "Example log record", + "severity_number": 10, + "attributes": { + "boolean.attribute": {"value": True, "type": "boolean"}, + "integer.attribute": {"value": 42, "type": "integer"}, + "double.attribute": {"value": 1.23, "type": "double"}, + "string.attribute": {"value": "some string", "type": "string"}, + "pii": {"value": "4242 4242 4242 4242", "type": "string"}, + "sentry.severity_text": {"value": "info", "type": "string"}, + "unknown_type": {"value": "info", "type": "unknown"}, + "broken_type": {"value": "info", "type": "not_a_real_type"}, + "mismatched_type": {"value": "some string", "type": "boolean"}, + "valid_string_with_other": { + "value": "test", + "type": "string", + "some_other_field": "some_other_value", + }, + }, + } + ) + relay.send_envelope(project_id, envelope) + + ourlogs = ourlogs_consumer.get_ourlogs() + expected = { + "organization_id": 1, + "project_id": 42, + "retention_days": 90, + "timestamp_nanos": time_within_delta(start, expect_resolution="ns"), + "observed_timestamp_nanos": time_within_delta(start, expect_resolution="ns"), + "received": time_within_delta(start, expect_resolution="s"), + "trace_id": "5b8efff798038103d269b633813fc60c", + "body": "Example log record", + "span_id": "eee19b7ec3c1b174", + "severity_text": "info", + "severity_number": 9, + "attributes": { + "boolean.attribute": {"bool_value": True}, + "double.attribute": {"double_value": 1.23}, + "integer.attribute": {"int_value": 42}, + "string.attribute": {"string_value": "some string"}, + "valid_string_with_other": {"string_value": "test"}, + "pii": {"string_value": "[creditcard]"}, + "sentry.severity_number": {"int_value": 9}, + "sentry.severity_text": {"string_value": "info"}, }, } - del ourlogs[0]["received"] - assert ourlogs[0] == expected + assert ourlogs == [expected] + ourlogs_consumer.assert_empty() + + +def test_ourlog_extraction_with_sentry_logs_with_missing_fields( + mini_sentry, + relay_with_processing, + ourlogs_consumer, +): + + ourlogs_consumer = ourlogs_consumer() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:ourlogs-ingestion", + ] + + relay = relay_with_processing(options=TEST_CONFIG) + + start = datetime.now(timezone.utc) + + envelope = envelope_with_sentry_logs( + { + "timestamp": start.timestamp(), + "trace_id": "5b8efff798038103d269b633813fc60c", + "level": "warn", + "body": "Example log record 2", + } + ) + relay.send_envelope(project_id, envelope) + ourlogs = ourlogs_consumer.get_ourlogs() + expected = { + "organization_id": 1, + "project_id": 42, + "retention_days": 90, + "timestamp_nanos": time_within_delta(start, expect_resolution="ns"), + "observed_timestamp_nanos": time_within_delta(start, expect_resolution="ns"), + "received": time_within_delta(start, expect_resolution="s"), + "trace_id": "5b8efff798038103d269b633813fc60c", + "body": "Example log record 2", + "severity_text": "warn", + "severity_number": 13, + "attributes": { + "sentry.severity_number": {"int_value": 13}, + "sentry.severity_text": {"string_value": "warn"}, + }, + } + assert ourlogs == [expected] ourlogs_consumer.assert_empty() @@ -113,15 +251,12 @@ def test_ourlog_extraction_is_disabled_without_feature( project_config = mini_sentry.add_full_project_config(project_id) project_config["config"]["features"] = [] - duration = timedelta(milliseconds=500) - now = datetime.now(timezone.utc) - end = now - timedelta(seconds=1) - start = end - duration + start = datetime.now(timezone.utc) - envelope = envelope_with_ourlogs(start, end) + envelope = envelope_with_otel_logs(start) relay.send_envelope(project_id, envelope) ourlogs = ourlogs_consumer.get_ourlogs() - assert len(ourlogs) == 0 + assert len(ourlogs) == 0 ourlogs_consumer.assert_empty()