Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use relay_event_schema::protocol::OurLog;
use relay_event_schema::protocol::{OurLog, Span, SpanV2};
use relay_protocol::Annotated;
use relay_quotas::DataCategory;
use smallvec::SmallVec;

Expand Down Expand Up @@ -87,6 +88,18 @@ impl Counted for WithHeader<OurLog> {
}
}

impl Counted for WithHeader<SpanV2> {
fn quantities(&self) -> Quantities {
smallvec::smallvec![(DataCategory::Span, 1), (DataCategory::SpanIndexed, 1)]
}
}

impl Counted for Annotated<Span> {
fn quantities(&self) -> Quantities {
smallvec::smallvec![(DataCategory::Span, 1), (DataCategory::SpanIndexed, 1)]
}
}

impl<T> Counted for &T
where
T: Counted,
Expand Down
14 changes: 14 additions & 0 deletions relay-server/src/managed/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,20 @@ impl RecordKeeper<'_> {
}
err
}

/// Rejects an item with an internal error.
///
/// See also: [`Managed::internal_error`].
#[track_caller]
pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
where
E: std::error::Error + 'static,
Q: Counted,
{
relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
debug_assert!(false, "internal error: {error}");
self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
}
}

/// Iterator returned by [`Managed::split`].
Expand Down
104 changes: 82 additions & 22 deletions relay-server/src/processing/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use relay_event_schema::protocol::SpanV2;
use relay_quotas::{DataCategory, RateLimits};

use crate::Envelope;
use crate::envelope::{ContainerItems, EnvelopeHeaders, Item, ItemType};
use crate::envelope::{
ContainerItems, ContainerWriteError, EnvelopeHeaders, Item, ItemContainer, ItemType, Items,
};
use crate::managed::{
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
};
Expand Down Expand Up @@ -72,14 +74,14 @@ impl processing::Processor for SpansProcessor {
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
let _headers = envelope.envelope().headers().clone();
let headers = envelope.envelope().headers().clone();

let spans = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::Span))
.into_vec();

let work = SerializedSpans { _headers, spans };
let work = SerializedSpans { headers, spans };
Some(Managed::from_envelope(envelope, work))
}

Expand Down Expand Up @@ -127,39 +129,74 @@ pub enum SpanOutput {

impl Forward for SpanOutput {
fn serialize_envelope(self) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
debug_assert!(false, "Not Implemented Yet");
Err(match self {
Self::NotProcessed(spans) => {
spans.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
}
Self::Processed(spans) => {
spans.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
}
})
let spans = match self {
Self::NotProcessed(spans) => spans,
Self::Processed(spans) => spans.try_map(|spans, _| {
spans
.serialize()
.map_err(drop)
.with_outcome(Outcome::Invalid(DiscardReason::Internal))
})?,
};

Ok(spans.map(|spans, _| spans.serialize_envelope()))
}

#[cfg(feature = "processing")]
fn forward_store(
self,
_s: &relay_system::Addr<crate::services::store::Store>,
s: &relay_system::Addr<crate::services::store::Store>,
) -> Result<(), Rejected<()>> {
debug_assert!(false, "Not Implemented Yet");
Err(match self {
Self::NotProcessed(spans) => {
spans.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
use crate::envelope::ContentType;
use crate::services::store::StoreEnvelope;

let spans = match self {
SpanOutput::NotProcessed(spans) => {
return Err(spans.internal_error(
"spans must be processed before they can be forwarded to the store",
));
}
Self::Processed(spans) => {
spans.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
SpanOutput::Processed(spans) => spans,
};

// Converts all SpanV2 spans into their SpanV1 counterparts and packages them into an
// envelope to forward them.
//
// This is temporary until we have proper mapping code from SpanV2 -> SpanKafka,
// similar to what we do for logs.
let envelope = spans.map(|spans, records| {
let mut items = Items::with_capacity(spans.spans.len());
for span in spans.spans {
let span = span.value.map_value(relay_spans::span_v2_to_span_v1);

let mut item = Item::new(ItemType::Span);
let payload = match span.to_json() {
Ok(payload) => payload,
Err(error) => {
records.internal_error(error, span);
continue;
}
};
item.set_payload(ContentType::Json, payload);
items.push(item);
}
})

Envelope::from_parts(spans.headers, items)
});

s.send(StoreEnvelope {
envelope: ManagedEnvelope::from(envelope).into_processed(),
});

Ok(())
}
}

/// Spans in their serialized state, as transported in an envelope.
#[derive(Debug)]
pub struct SerializedSpans {
/// Original envelope headers.
_headers: EnvelopeHeaders,
headers: EnvelopeHeaders,

/// A list of spans waiting to be processed.
///
Expand All @@ -176,6 +213,10 @@ impl SerializedSpans {
let c: u32 = self.spans.iter().filter_map(|item| item.item_count()).sum();
c as usize
}

fn serialize_envelope(self) -> Box<Envelope> {
Envelope::from_parts(self.headers, Items::from_vec(self.spans))
}
}

impl Counted for SerializedSpans {
Expand All @@ -196,12 +237,31 @@ impl CountRateLimited for Managed<SerializedSpans> {
#[derive(Debug)]
pub struct ExpandedSpans {
/// Original envelope headers.
_headers: EnvelopeHeaders,
headers: EnvelopeHeaders,

/// Expanded and parsed spans.
spans: ContainerItems<SpanV2>,
}

impl ExpandedSpans {
fn serialize(self) -> Result<SerializedSpans, ContainerWriteError> {
let mut spans = Vec::new();

if !self.spans.is_empty() {
let mut item = Item::new(ItemType::Span);
ItemContainer::from(self.spans)
.write_to(&mut item)
.inspect_err(|err| relay_log::error!("failed to serialize spans: {err}"))?;
spans.push(item);
}

Ok(SerializedSpans {
headers: self.headers,
spans,
})
}
}

impl Counted for ExpandedSpans {
fn quantities(&self) -> Quantities {
let quantity = self.spans.len();
Expand Down
6 changes: 2 additions & 4 deletions relay-server/src/processing/spans/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ use relay_event_schema::protocol::SpanV2;

use crate::envelope::{ContainerItems, Item, ItemContainer};
use crate::managed::Managed;
use crate::processing::spans::{Error, ExpandedSpans, Result};
use crate::processing::spans::{Error, ExpandedSpans, Result, SerializedSpans};
use crate::services::outcome::DiscardReason;

use super::SerializedSpans;

/// Parses all serialized spans.
///
/// Individual, invalid spans are discarded.
Expand All @@ -21,7 +19,7 @@ pub fn expand(spans: Managed<SerializedSpans>) -> Managed<ExpandedSpans> {
}

ExpandedSpans {
_headers: spans._headers,
headers: spans.headers,
spans: all_spans,
}
})
Expand Down
Loading