Skip to content

Commit 0a72ae0

Browse files
feat(celery): Queues module producer implementation
1 parent 4bec867 commit 0a72ae0

File tree

3 files changed

+82
-0
lines changed

3 files changed

+82
-0
lines changed

sentry_sdk/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ class OP:
388388
LANGCHAIN_AGENT = "ai.agent.langchain"
389389
LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain"
390390
QUEUE_PROCESS = "queue.process"
391+
QUEUE_PUBLISH = "queue.publish"
391392
QUEUE_SUBMIT_ARQ = "queue.submit.arq"
392393
QUEUE_TASK_ARQ = "queue.task.arq"
393394
QUEUE_SUBMIT_CELERY = "queue.submit.celery"

sentry_sdk/integrations/celery/__init__.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import sys
2+
from collections.abc import Mapping
23
from functools import wraps
34

45
import sentry_sdk
@@ -47,6 +48,7 @@
4748
Retry,
4849
SoftTimeLimitExceeded,
4950
)
51+
from kombu import Producer
5052
except ImportError:
5153
raise DidNotEnable("Celery not installed")
5254

@@ -82,6 +84,7 @@ def setup_once():
8284
_patch_build_tracer()
8385
_patch_task_apply_async()
8486
_patch_worker_exit()
87+
_patch_producer_publish()
8588

8689
# This logger logs every status of every task that ran on the worker.
8790
# Meaning that every task's breadcrumbs are full of stuff like "Task
@@ -433,3 +436,44 @@ def sentry_workloop(*args, **kwargs):
433436
sentry_sdk.flush()
434437

435438
Worker.workloop = sentry_workloop
439+
440+
441+
def _patch_producer_publish():
442+
# type: () -> None
443+
original_publish = Producer.publish
444+
445+
@ensure_integration_enabled(CeleryIntegration, original_publish)
446+
def sentry_publish(self, *args, **kwargs):
447+
# type: (Producer, *Any, **Any) -> Any
448+
kwargs_headers = kwargs.get("headers", {})
449+
if not isinstance(kwargs_headers, Mapping):
450+
# Ensure kwargs_headers is a Mapping, so we can safely call get()
451+
kwargs_headers = {}
452+
453+
task_name = kwargs_headers.get("task")
454+
task_id = kwargs_headers.get("id")
455+
retries = kwargs_headers.get("retries")
456+
457+
routing_key = kwargs.get("routing_key")
458+
exchange = kwargs.get("exchange")
459+
460+
with sentry_sdk.start_span(op=OP.QUEUE_PUBLISH, description=task_name) as span:
461+
if task_id is not None:
462+
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
463+
464+
if exchange == "" and routing_key is not None:
465+
# Empty exchange indicates the default exchange, meaning messages are
466+
# routed to the queue with the same name as the routing key.
467+
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
468+
469+
if kwargs_headers.get("retries") is not None:
470+
span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
471+
472+
with capture_internal_exceptions():
473+
span.set_data(
474+
SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
475+
)
476+
477+
return original_publish(self, *args, **kwargs)
478+
479+
Producer.publish = sentry_publish

tests/integrations/celery/test_celery.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,19 @@ def inner(propagate_traces=True, backend="always_eager", **kwargs):
7474
return inner
7575

7676

77+
@pytest.fixture
78+
def init_celery_producer_only():
79+
"""
80+
Initialize Celery integration without the worker. We mock out the actual
81+
publishing of messages to the broker, so tasks are not actually executed.
82+
This allows testing without `always_eager`.
83+
84+
Unline `init_celery`, this fixture does not initialize sentry_sdk.
85+
"""
86+
with mock.patch("kombu.messaging.Producer.publish"):
87+
yield Celery
88+
89+
7790
@pytest.fixture
7891
def celery(init_celery):
7992
return init_celery()
@@ -722,3 +735,27 @@ def task(): ...
722735
(event,) = events
723736
(span,) = event["spans"]
724737
assert span["data"]["messaging.system"] == system
738+
739+
740+
@pytest.mark.forked
741+
@pytest.mark.parametrize("system", ("amqp", "redis"))
742+
def test_producer_span_data(
743+
system, sentry_init, init_celery_producer_only, capture_events
744+
):
745+
sentry_init(integrations=[CeleryIntegration()], enable_tracing=True)
746+
celery = init_celery_producer_only(__name__, broker=f"{system}://example.com")
747+
events = capture_events()
748+
749+
@celery.task()
750+
def task(): ...
751+
752+
with start_transaction():
753+
task.apply_async()
754+
755+
(event,) = events
756+
span = next(span for span in event["spans"] if span["op"] == "queue.publish")
757+
assert span["data"]["messaging.system"] == system
758+
759+
assert span["data"]["messaging.destination.name"] == "celery"
760+
assert "messaging.message.id" in span["data"]
761+
assert span["data"]["messaging.message.retry.count"] == 0

0 commit comments

Comments
 (0)