Skip to content

feat(celery): Queues module producer implementation #3079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ class OP:
LANGCHAIN_AGENT = "ai.agent.langchain"
LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain"
QUEUE_PROCESS = "queue.process"
QUEUE_PUBLISH = "queue.publish"
QUEUE_SUBMIT_ARQ = "queue.submit.arq"
QUEUE_TASK_ARQ = "queue.task.arq"
QUEUE_SUBMIT_CELERY = "queue.submit.celery"
Expand Down
48 changes: 48 additions & 0 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
from collections.abc import Mapping
from functools import wraps

import sentry_sdk
Expand Down Expand Up @@ -47,6 +48,7 @@
Retry,
SoftTimeLimitExceeded,
)
from kombu import Producer # type: ignore
except ImportError:
raise DidNotEnable("Celery not installed")

Expand Down Expand Up @@ -82,6 +84,7 @@ def setup_once():
_patch_build_tracer()
_patch_task_apply_async()
_patch_worker_exit()
_patch_producer_publish()

# This logger logs every status of every task that ran on the worker.
# Meaning that every task's breadcrumbs are full of stuff like "Task
Expand Down Expand Up @@ -433,3 +436,48 @@ def sentry_workloop(*args, **kwargs):
sentry_sdk.flush()

Worker.workloop = sentry_workloop


def _patch_producer_publish():
# type: () -> None
original_publish = Producer.publish

@ensure_integration_enabled(CeleryIntegration, original_publish)
def sentry_publish(self, *args, **kwargs):
# type: (Producer, *Any, **Any) -> Any
kwargs_headers = kwargs.get("headers", {})
if not isinstance(kwargs_headers, Mapping):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If kwargs_headers is a list [("key", "value"), ("k", "v")] then it would be reset to an empty dict?

I am not sure if this list of tuples could happen, but I do not understand this if.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, what I am trying to do here is to make sure that the get operations (line 453 to 455) are successful.

Mapping (full name collections.abc.Mapping) is the abstract base class that all mapping types in Python inherit from. dict inherits from Mapping, so this isinstance is true for any dictionary, but it would also be possible to create a custom Mapping type, which provides similar functionality to a dict, but which does not inherit from dict. This isinstance also evaluates to True for any of these dict-like data structures.

Regarding your example, isinstance([("key", "value"), ("k", "v")], Mapping) would evaluate to False since list does not inherit from Mapping (it does not have a get method, which all Mapping types have). So, we would replace it with an empty dictionary to ensure that the get calls don't raise an exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now I get it!

# Ensure kwargs_headers is a Mapping, so we can safely call get().
# We don't expect this to happen, but it's better to be safe. Even
# if it does happen, only our instrumentation breaks. This line
# does not overwrite kwargs["headers"], so the original publish
# method will still work.
kwargs_headers = {}

task_name = kwargs_headers.get("task")
task_id = kwargs_headers.get("id")
retries = kwargs_headers.get("retries")

routing_key = kwargs.get("routing_key")
exchange = kwargs.get("exchange")

with sentry_sdk.start_span(op=OP.QUEUE_PUBLISH, description=task_name) as span:
if task_id is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)

if exchange == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning messages are
# routed to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)

if retries is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)

with capture_internal_exceptions():
span.set_data(
SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
)

return original_publish(self, *args, **kwargs)

Producer.publish = sentry_publish
32 changes: 32 additions & 0 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import threading
import kombu
from unittest import mock

import pytest
Expand Down Expand Up @@ -722,3 +723,34 @@ def task(): ...
(event,) = events
(span,) = event["spans"]
assert span["data"]["messaging.system"] == system


@pytest.mark.parametrize("system", ("amqp", "redis"))
def test_producer_span_data(system, monkeypatch, sentry_init, capture_events):
old_publish = kombu.messaging.Producer._publish

def publish(*args, **kwargs):
pass

monkeypatch.setattr(kombu.messaging.Producer, "_publish", publish)

sentry_init(integrations=[CeleryIntegration()], enable_tracing=True)
celery = Celery(__name__, broker=f"{system}://example.com") # noqa: E231
events = capture_events()

@celery.task()
def task(): ...

with start_transaction():
task.apply_async()

(event,) = events
span = next(span for span in event["spans"] if span["op"] == "queue.publish")

assert span["data"]["messaging.system"] == system

assert span["data"]["messaging.destination.name"] == "celery"
assert "messaging.message.id" in span["data"]
assert span["data"]["messaging.message.retry.count"] == 0

monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish)