Skip to content

feat(metrics): Implement metric_bucket rate limits #2933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,22 @@ def _parse_rate_limits(header, now=None):

for limit in header.split(","):
try:
retry_after, categories, _ = limit.strip().split(":", 2)
parameters = limit.strip().split(":")
retry_after, categories = parameters[:2]

retry_after = now + timedelta(seconds=int(retry_after))
for category in categories and categories.split(";") or (None,):
yield category, retry_after
if category == "metric_bucket":
try:
namespaces = parameters[4].split(";")
except IndexError:
namespaces = []

if not namespaces or "custom" in namespaces:
yield category, retry_after

else:
yield category, retry_after
except (LookupError, ValueError):
continue

Expand Down Expand Up @@ -210,6 +222,12 @@ def record_lost_event(
# quantity of 0 is actually 1 as we do not want to count
# empty attachments as actually empty.
quantity = len(item.get_bytes()) or 1
if data_category == "statsd":
# The envelope item type used for metrics is statsd
# whereas the client report category for discarded events
# is metric_bucket
data_category = "metric_bucket"

elif data_category is None:
raise TypeError("data category not provided")

Expand Down Expand Up @@ -336,7 +354,14 @@ def _check_disabled(self, category):
# type: (str) -> bool
def _disabled(bucket):
# type: (Any) -> bool

# The envelope item type used for metrics is statsd
# whereas the rate limit category is metric_bucket
if bucket == "statsd":
bucket = "metric_bucket"

ts = self._disabled_until.get(bucket)

return ts is not None and ts > datetime_utcnow()

return _disabled(category) or _disabled(None)
Expand Down Expand Up @@ -402,7 +427,7 @@ def _send_envelope(
new_items = []
for item in envelope.items:
if self._check_disabled(item.data_category):
if item.data_category in ("transaction", "error", "default"):
if item.data_category in ("transaction", "error", "default", "statsd"):
self.on_dropped_event("self_rate_limits")
self.record_lost_event("ratelimit_backoff", item=item)
else:
Expand Down
113 changes: 112 additions & 1 deletion tests/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from sentry_sdk import Hub, Client, add_breadcrumb, capture_message, Scope
from sentry_sdk._compat import datetime_utcnow
from sentry_sdk.transport import KEEP_ALIVE_SOCKET_OPTIONS, _parse_rate_limits
from sentry_sdk.envelope import Envelope, parse_json
from sentry_sdk.envelope import Envelope, Item, parse_json
from sentry_sdk.integrations.logging import LoggingIntegration

try:
Expand Down Expand Up @@ -466,3 +466,114 @@ def test_complex_limits_without_data_category(
client.flush()

assert len(capturing_server.captured) == 0


@pytest.mark.parametrize("response_code", [200, 429])
def test_metric_bucket_limits(capturing_server, response_code, make_client):
client = make_client()
capturing_server.respond_with(
code=response_code,
headers={
"X-Sentry-Rate-Limits": "4711:metric_bucket:organization:quota_exceeded:custom"
},
)

envelope = Envelope()
envelope.add_item(Item(payload=b"{}", type="statsd"))
client.transport.capture_envelope(envelope)
client.flush()

assert len(capturing_server.captured) == 1
assert capturing_server.captured[0].path == "/api/132/envelope/"
capturing_server.clear_captured()

assert set(client.transport._disabled_until) == set(["metric_bucket"])

client.transport.capture_envelope(envelope)
client.capture_event({"type": "transaction"})
client.flush()

assert len(capturing_server.captured) == 2

envelope = capturing_server.captured[0].envelope
assert envelope.items[0].type == "transaction"
envelope = capturing_server.captured[1].envelope
assert envelope.items[0].type == "client_report"
report = parse_json(envelope.items[0].get_bytes())
assert report["discarded_events"] == [
{"category": "metric_bucket", "reason": "ratelimit_backoff", "quantity": 1},
]


@pytest.mark.parametrize("response_code", [200, 429])
def test_metric_bucket_limits_with_namespace(
capturing_server, response_code, make_client
):
client = make_client()
capturing_server.respond_with(
code=response_code,
headers={
"X-Sentry-Rate-Limits": "4711:metric_bucket:organization:quota_exceeded:foo"
},
)

envelope = Envelope()
envelope.add_item(Item(payload=b"{}", type="statsd"))
client.transport.capture_envelope(envelope)
client.flush()

assert len(capturing_server.captured) == 1
assert capturing_server.captured[0].path == "/api/132/envelope/"
capturing_server.clear_captured()

assert set(client.transport._disabled_until) == set([])

client.transport.capture_envelope(envelope)
client.capture_event({"type": "transaction"})
client.flush()

assert len(capturing_server.captured) == 2

envelope = capturing_server.captured[0].envelope
assert envelope.items[0].type == "statsd"
envelope = capturing_server.captured[1].envelope
assert envelope.items[0].type == "transaction"


@pytest.mark.parametrize("response_code", [200, 429])
def test_metric_bucket_limits_with_all_namespaces(
capturing_server, response_code, make_client
):
client = make_client()
capturing_server.respond_with(
code=response_code,
headers={
"X-Sentry-Rate-Limits": "4711:metric_bucket:organization:quota_exceeded"
},
)

envelope = Envelope()
envelope.add_item(Item(payload=b"{}", type="statsd"))
client.transport.capture_envelope(envelope)
client.flush()

assert len(capturing_server.captured) == 1
assert capturing_server.captured[0].path == "/api/132/envelope/"
capturing_server.clear_captured()

assert set(client.transport._disabled_until) == set(["metric_bucket"])

client.transport.capture_envelope(envelope)
client.capture_event({"type": "transaction"})
client.flush()

assert len(capturing_server.captured) == 2

envelope = capturing_server.captured[0].envelope
assert envelope.items[0].type == "transaction"
envelope = capturing_server.captured[1].envelope
assert envelope.items[0].type == "client_report"
report = parse_json(envelope.items[0].get_bytes())
assert report["discarded_events"] == [
{"category": "metric_bucket", "reason": "ratelimit_backoff", "quantity": 1},
]