Skip to content

fix(profiling): Classify profile chunks for rate limits and outcomes #4595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Add experimental playstation endpoint. ([#4555](https://github.com/getsentry/relay/pull/4555))

**Bug Fixes**:

- Separates profiles into backend and ui profiles. ([#4595](https://github.com/getsentry/relay/pull/4595))

**Internal**:

- Add ui chunk profiling data category. ([#4593](https://github.com/getsentry/relay/pull/4593))
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions relay-profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true

[dependencies]
android_trace_log = { workspace = true, features = ["serde"] }
bytes = { workspace = true }
chrono = { workspace = true }
data-encoding = { workspace = true }
itertools = { workspace = true }
Expand Down
99 changes: 69 additions & 30 deletions relay-profiling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
use std::error::Error;
use std::net::IpAddr;
use std::time::Duration;

use bytes::Bytes;
use url::Url;

use relay_base_schema::project::ProjectId;
Expand Down Expand Up @@ -80,6 +82,12 @@ const MAX_PROFILE_CHUNK_DURATION: Duration = Duration::from_secs(66);
/// Same format as event IDs.
pub type ProfileId = EventId;

#[derive(Debug, Clone, Copy)]
pub enum ProfileType {
Backend,
Ui,
}

#[derive(Debug, Deserialize)]
struct MinimalProfile {
#[serde(alias = "profile_id", alias = "chunk_id")]
Expand Down Expand Up @@ -275,41 +283,72 @@ pub fn expand_profile(
}
}

pub fn expand_profile_chunk(
payload: &[u8],
client_ip: Option<IpAddr>,
filter_settings: &ProjectFiltersConfig,
global_config: &GlobalConfig,
) -> Result<Vec<u8>, ProfileError> {
let profile = match minimal_profile_from_json(payload) {
Ok(profile) => profile,
Err(err) => {
relay_log::warn!(
error = &err as &dyn Error,
from = "minimal",
"invalid profile chunk",
);
return Err(ProfileError::InvalidJson(err));
/// Intermediate type for all processing on a profile chunk.
pub struct ProfileChunk {
profile: MinimalProfile,
payload: Bytes,
}

impl ProfileChunk {
/// Parses a new [`Self`] from raw bytes.
pub fn new(payload: Bytes) -> Result<Self, ProfileError> {
match minimal_profile_from_json(&payload) {
Ok(profile) => Ok(Self { profile, payload }),
Err(err) => {
relay_log::debug!(
error = &err as &dyn Error,
from = "minimal",
"invalid profile chunk",
);
Err(ProfileError::InvalidJson(err))
}
}
};
}

if let Err(filter_stat_key) = relay_filter::should_filter(
&profile,
client_ip,
filter_settings,
global_config.filters(),
) {
return Err(ProfileError::Filtered(filter_stat_key));
/// Returns the [`ProfileType`] this chunk belongs to.
///
/// The profile type is currently determined based on the contained profile
/// platform. It determines the data category this profile chunk belongs to.
///
/// This needs to be synchronized with the implementation in Sentry:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can add the profile_type in the payload before we send it, this would become the source of truth.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use add it to the Kafka message https://github.com/getsentry/relay/blob/master/relay-server/src/services/store.rs#L1035-L1041 and the worker will handle it, no need to deserialize/serialize again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that works, let's figure that out in a follow up where we wanna put it (payload / kafka header) and how it should look?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add it to the payload and not the header, it's easier to manipulate later on.

/// <https://github.com/getsentry/sentry/blob/ed2e1c8bcd0d633e6f828fcfbeefbbdd98ef3dba/src/sentry/profiles/task.py#L995>
pub fn profile_type(&self) -> ProfileType {
match self.profile.platform.as_str() {
"cocoa" | "android" | "javascript" => ProfileType::Ui,
_ => ProfileType::Backend,
}
}
Comment on lines +315 to 320
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


match (profile.platform.as_str(), profile.version) {
("android", _) => android::chunk::parse(payload),
(_, sample::Version::V2) => {
let mut profile = sample::v2::parse(payload)?;
profile.normalize()?;
serde_json::to_vec(&profile).map_err(|_| ProfileError::CannotSerializePayload)
/// Applies inbound filters to the profile chunk.
///
/// The profile needs to be filtered (rejected) when this returns an error.
pub fn filter(
&self,
client_ip: Option<IpAddr>,
filter_settings: &ProjectFiltersConfig,
global_config: &GlobalConfig,
) -> Result<(), ProfileError> {
relay_filter::should_filter(
&self.profile,
client_ip,
filter_settings,
global_config.filters(),
)
.map_err(ProfileError::Filtered)
}

/// Normalizes and 'expands' the profile chunk into its normalized form Sentry expects.
pub fn expand(&self) -> Result<Vec<u8>, ProfileError> {
match (self.profile.platform.as_str(), self.profile.version) {
("android", _) => android::chunk::parse(&self.payload),
(_, sample::Version::V2) => {
let mut profile = sample::v2::parse(&self.payload)?;
profile.normalize()?;
Ok(serde_json::to_vec(&profile)
.map_err(|_| ProfileError::CannotSerializePayload)?)
}
(_, _) => Err(ProfileError::PlatformNotSupported),
}
(_, _) => Err(ProfileError::PlatformNotSupported),
}
}

Expand Down
2 changes: 1 addition & 1 deletion relay-profiling/src/sample/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod v1;
pub mod v2;

/// Possible values for the version field of the Sample Format.
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
#[derive(Debug, Serialize, Deserialize, Copy, Clone, Default, PartialEq, Eq)]
pub enum Version {
#[default]
Unknown,
Expand Down
24 changes: 23 additions & 1 deletion relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! ```

use relay_base_schema::project::ProjectKey;
use relay_profiling::ProfileType;
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt;
Expand Down Expand Up @@ -596,6 +597,12 @@ pub struct ItemHeaders {
#[serde(default, skip)]
ingest_span_in_eap: bool,

/// Tracks whether the item is a backend or ui profile chunk.
///
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
profile_type: Option<ProfileType>,

/// Other attributes for forward compatibility.
#[serde(flatten)]
other: BTreeMap<String, Value>,
Expand Down Expand Up @@ -673,6 +680,7 @@ impl Item {
sampled: true,
fully_normalized: false,
ingest_span_in_eap: false,
profile_type: None,
},
payload: Bytes::new(),
}
Expand Down Expand Up @@ -725,7 +733,11 @@ impl Item {
ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)],
// NOTE: semantically wrong, but too expensive to parse.
ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)],
ItemType::ProfileChunk => smallvec![(DataCategory::ProfileChunk, 1)], // TODO: should be seconds?
ItemType::ProfileChunk => match self.headers.profile_type {
Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, 1)],
Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, 1)],
None => smallvec![],
},
ItemType::Unknown(_) => smallvec![],
}
}
Expand Down Expand Up @@ -890,6 +902,16 @@ impl Item {
self.headers.ingest_span_in_eap = ingest_span_in_eap;
}

/// Returns the associated profile type of a profile chunk.
pub fn profile_type(&self) -> Option<ProfileType> {
self.headers.profile_type
}

/// Set the profile type of the profile chunk.
pub fn set_profile_type(&mut self, profile_type: ProfileType) {
self.headers.profile_type = Some(profile_type);
}

/// Gets the `sampled` flag.
pub fn sampled(&self) -> bool {
self.headers.sampled
Expand Down
22 changes: 17 additions & 5 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1922,15 +1922,25 @@ impl EnvelopeProcessorService {
&self,
managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
project_info: Arc<ProjectInfo>,
_rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
profile_chunk::filter(managed_envelope, project_info.clone());
if_processing!(self.inner.config, {
profile_chunk::process(
managed_envelope,
project_info,
project_info.clone(),
&self.inner.global_config.current(),
&self.inner.config,
);

self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut ProcessingExtractedMetrics::new(),
project_info,
_rate_limits,
)
.await?;
});

Ok(None)
Expand All @@ -1943,7 +1953,7 @@ impl EnvelopeProcessorService {
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
_rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();
Expand All @@ -1964,7 +1974,7 @@ impl EnvelopeProcessorService {
Annotated::empty(),
&mut extracted_metrics,
project_info.clone(),
rate_limits,
_rate_limits,
)
.await?;
});
Expand Down Expand Up @@ -2297,7 +2307,9 @@ impl EnvelopeProcessorService {
rate_limits,
reservoir_counters
),
ProcessingGroup::ProfileChunk => run!(process_profile_chunks, project_info),
ProcessingGroup::ProfileChunk => {
run!(process_profile_chunks, project_info, rate_limits)
}
// Currently is not used.
ProcessingGroup::Metrics => {
// In proxy mode we simply forward the metrics.
Expand Down Expand Up @@ -3650,7 +3662,7 @@ impl UpstreamRequest for SendMetricsRequest {

/// Container for global and project level [`Quota`].
#[cfg(feature = "processing")]
#[derive(Copy, Clone)]
#[derive(Copy, Clone, Debug)]
struct CombinedQuotas<'a> {
global_quotas: &'a [Quota],
project_quotas: &'a [Quota],
Expand Down
60 changes: 36 additions & 24 deletions relay-server/src/services/processor/profile_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
crate::services::processor::ProfileChunkGroup,
relay_config::Config,
relay_dynamic_config::GlobalConfig,
relay_profiling::ProfileError,
};

/// Removes profile chunks from the envelope if the feature is not enabled.
Expand All @@ -40,44 +41,55 @@ pub fn process(
) {
let client_ip = managed_envelope.envelope().meta().client_addr();
let filter_settings = &project_info.config.filter_settings;

let continuous_profiling_enabled =
if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
project_info.has_feature(Feature::ContinuousProfilingBeta)
} else {
project_info.has_feature(Feature::ContinuousProfiling)
};

managed_envelope.retain_items(|item| match item.ty() {
ItemType::ProfileChunk => {
if !continuous_profiling_enabled {
return ItemAction::DropSilently;
}

match relay_profiling::expand_profile_chunk(
&item.payload(),
client_ip,
filter_settings,
global_config,
) {
Ok(payload) => {
if payload.len() <= config.max_profile_size() {
item.set_payload(ContentType::Json, payload);
ItemAction::Keep
} else {
ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(
relay_profiling::ProfileError::ExceedSizeLimit,
),
)))
}
}
Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
ItemAction::Drop(Outcome::Filtered(filter_stat_key))
}
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
let chunk = match relay_profiling::ProfileChunk::new(item.payload()) {
Ok(chunk) => chunk,
Err(err) => return error_to_action(err),
};
// Important: set the profile type to get outcomes in the correct category.
item.set_profile_type(chunk.profile_type());

if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) {
return error_to_action(err);
}

let payload = match chunk.expand() {
Ok(expanded) => expanded,
Err(err) => return error_to_action(err),
};

if payload.len() > config.max_profile_size() {
return error_to_action(relay_profiling::ProfileError::ExceedSizeLimit);
}

item.set_payload(ContentType::Json, payload);
ItemAction::Keep
}
_ => ItemAction::Keep,
});
}

#[cfg(feature = "processing")]
fn error_to_action(err: ProfileError) -> ItemAction {
match err {
ProfileError::Filtered(filter_stat_key) => {
ItemAction::Drop(Outcome::Filtered(filter_stat_key))
}
err => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
}
}
8 changes: 8 additions & 0 deletions relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,14 @@ impl ManagedEnvelope {
);
}

if self.context.summary.profile_chunk_ui_quantity > 0 {
self.track_outcome(
outcome.clone(),
DataCategory::ProfileChunkUi,
self.context.summary.profile_chunk_ui_quantity,
);
}

self.finish(RelayCounters::EnvelopeRejected, handling);
}

Expand Down
Loading
Loading