Skip to content

feat(integrations): Add support for celery-redbeat cron tasks #2643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
except ImportError:
raise DidNotEnable("Celery not installed")

try:
from redbeat.schedulers import RedBeatScheduler # type: ignore
except ImportError:
RedBeatScheduler = None


CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject)

Expand All @@ -76,6 +81,7 @@ def __init__(

if monitor_beat_tasks:
_patch_beat_apply_entry()
_patch_redbeat_maybe_due()
_setup_celery_beat_signals()

@staticmethod
Expand Down Expand Up @@ -535,6 +541,62 @@ def sentry_apply_entry(*args, **kwargs):
Scheduler.apply_entry = sentry_apply_entry


def _patch_redbeat_maybe_due():
# type: () -> None

if RedBeatScheduler is None:
return

original_maybe_due = RedBeatScheduler.maybe_due

def sentry_maybe_due(*args, **kwargs):
# type: (*Any, **Any) -> None
scheduler, schedule_entry = args
app = scheduler.app

celery_schedule = schedule_entry.schedule
monitor_name = schedule_entry.name

hub = Hub.current
integration = hub.get_integration(CeleryIntegration)
if integration is None:
return original_maybe_due(*args, **kwargs)

if match_regex_list(monitor_name, integration.exclude_beat_tasks):
return original_maybe_due(*args, **kwargs)

with hub.configure_scope() as scope:
# When tasks are started from Celery Beat, make sure each task has its own trace.
scope.set_new_propagation_context()

monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)

is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers

return original_maybe_due(*args, **kwargs)

RedBeatScheduler.maybe_due = sentry_maybe_due


def _setup_celery_beat_signals():
# type: () -> None
task_success.connect(crons_task_success)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def get_file_text(file_name):
"beam": ["apache-beam>=2.12"],
"bottle": ["bottle>=0.12.13"],
"celery": ["celery>=3"],
"celery-redbeat": ["celery-redbeat>=2"],
"chalice": ["chalice>=1.16.0"],
"clickhouse-driver": ["clickhouse-driver>=0.2.0"],
"django": ["django>=1.8"],
Expand Down
54 changes: 54 additions & 0 deletions tests/integrations/celery/test_celery_beat_crons.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
_get_humanized_interval,
_get_monitor_config,
_patch_beat_apply_entry,
_patch_redbeat_maybe_due,
crons_task_success,
crons_task_failure,
crons_task_retry,
Expand Down Expand Up @@ -447,3 +448,56 @@ def test_exclude_beat_tasks_option(
# The original Scheduler.apply_entry() is called, AND _get_monitor_config is called.
assert fake_apply_entry.call_count == 1
assert _get_monitor_config.call_count == 1


@pytest.mark.parametrize(
"task_name,exclude_beat_tasks,task_in_excluded_beat_tasks",
[
["some_task_name", ["xxx", "some_task.*"], True],
["some_task_name", ["xxx", "some_other_task.*"], False],
],
)
def test_exclude_redbeat_tasks_option(
task_name, exclude_beat_tasks, task_in_excluded_beat_tasks
):
"""
Test excluding Celery RedBeat tasks from automatic instrumentation.
"""
fake_maybe_due = MagicMock()

fake_redbeat_scheduler = MagicMock()
fake_redbeat_scheduler.maybe_due = fake_maybe_due

fake_integration = MagicMock()
fake_integration.exclude_beat_tasks = exclude_beat_tasks

fake_schedule_entry = MagicMock()
fake_schedule_entry.name = task_name

fake_get_monitor_config = MagicMock()

with mock.patch(
"sentry_sdk.integrations.celery.RedBeatScheduler", fake_redbeat_scheduler
) as RedBeatScheduler: # noqa: N806
with mock.patch(
"sentry_sdk.integrations.celery.Hub.current.get_integration",
return_value=fake_integration,
):
with mock.patch(
"sentry_sdk.integrations.celery._get_monitor_config",
fake_get_monitor_config,
) as _get_monitor_config:
# Mimic CeleryIntegration patching of RedBeatScheduler.maybe_due()
_patch_redbeat_maybe_due()
# Mimic Celery RedBeat calling a task from the RedBeat schedule
RedBeatScheduler.maybe_due(fake_redbeat_scheduler, fake_schedule_entry)

if task_in_excluded_beat_tasks:
# Only the original RedBeatScheduler.maybe_due() is called, _get_monitor_config is NOT called.
assert fake_maybe_due.call_count == 1
_get_monitor_config.assert_not_called()

else:
# The original RedBeatScheduler.maybe_due() is called, AND _get_monitor_config is called.
assert fake_maybe_due.call_count == 1
assert _get_monitor_config.call_count == 1