diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index 16d183ffb0..2927f40495 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -15,7 +15,6 @@ get_default_release, handle_in_app, logger, - is_gevent, ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace, has_tracing_enabled @@ -251,18 +250,14 @@ def _capture_envelope(envelope): self.metrics_aggregator = None # type: Optional[MetricsAggregator] experiments = self.options.get("_experiments", {}) if experiments.get("enable_metrics", True): - if is_gevent(): - logger.warning("Metrics currently not supported with gevent.") + from sentry_sdk.metrics import MetricsAggregator - else: - from sentry_sdk.metrics import MetricsAggregator - - self.metrics_aggregator = MetricsAggregator( - capture_func=_capture_envelope, - enable_code_locations=bool( - experiments.get("metric_code_locations", True) - ), - ) + self.metrics_aggregator = MetricsAggregator( + capture_func=_capture_envelope, + enable_code_locations=bool( + experiments.get("metric_code_locations", True) + ), + ) max_request_body_size = ("always", "never", "small", "medium") if self.options["max_request_body_size"] not in max_request_body_size: diff --git a/sentry_sdk/metrics.py b/sentry_sdk/metrics.py index 52aa735013..8f4066c570 100644 --- a/sentry_sdk/metrics.py +++ b/sentry_sdk/metrics.py @@ -1,24 +1,25 @@ -import os import io +import os +import random import re import sys import threading -import random import time import zlib +from contextlib import contextmanager from datetime import datetime from functools import wraps, partial -from threading import Event, Lock, Thread -from contextlib import contextmanager import sentry_sdk -from sentry_sdk._compat import text_type, utc_from_timestamp, iteritems +from sentry_sdk._compat import PY2, text_type, utc_from_timestamp, iteritems from sentry_sdk.utils import ( + ContextVar, now, nanosecond_time, to_timestamp, serialize_frame, json_dumps, + is_gevent, ) from sentry_sdk.envelope import Envelope, Item from sentry_sdk.tracing import ( @@ -53,7 +54,18 @@ from sentry_sdk._types import MetricValue -_thread_local = threading.local() +try: + from gevent.monkey import get_original # type: ignore + from gevent.threadpool import ThreadPool # type: ignore +except ImportError: + import importlib + + def get_original(module, name): + # type: (str, str) -> Any + return getattr(importlib.import_module(module), name) + + +_in_metrics = ContextVar("in_metrics") _sanitize_key = partial(re.compile(r"[^a-zA-Z0-9_/.-]+").sub, "_") _sanitize_value = partial(re.compile(r"[^\w\d_:/@\.{}\[\]$-]+", re.UNICODE).sub, "_") _set = set # set is shadowed below @@ -84,15 +96,12 @@ def get_code_location(stacklevel): def recursion_protection(): # type: () -> Generator[bool, None, None] """Enters recursion protection and returns the old flag.""" + old_in_metrics = _in_metrics.get(False) + _in_metrics.set(True) try: - in_metrics = _thread_local.in_metrics - except AttributeError: - in_metrics = False - _thread_local.in_metrics = True - try: - yield in_metrics + yield old_in_metrics finally: - _thread_local.in_metrics = in_metrics + _in_metrics.set(old_in_metrics) def metrics_noop(func): @@ -411,12 +420,22 @@ def __init__( self._pending_locations = {} # type: Dict[int, List[Tuple[MetricMetaKey, Any]]] self._buckets_total_weight = 0 self._capture_func = capture_func - self._lock = Lock() self._running = True - self._flush_event = Event() + self._lock = threading.Lock() + + if is_gevent() and PY2: + # get_original on threading.Event in Python 2 incorrectly returns + # the gevent-patched class. Luckily, threading.Event is just an alias + # for threading._Event in Python 2, and get_original on + # threading._Event correctly gets us the stdlib original. + event_cls = get_original("threading", "_Event") + else: + event_cls = get_original("threading", "Event") + self._flush_event = event_cls() # type: threading.Event + self._force_flush = False - # The aggregator shifts it's flushing by up to an entire rollup window to + # The aggregator shifts its flushing by up to an entire rollup window to # avoid multiple clients trampling on end of a 10 second window as all the # buckets are anchored to multiples of ROLLUP seconds. We randomize this # number once per aggregator boot to achieve some level of offsetting @@ -424,7 +443,7 @@ def __init__( # jittering. self._flush_shift = random.random() * self.ROLLUP_IN_SECONDS - self._flusher = None # type: Optional[Thread] + self._flusher = None # type: Optional[Union[threading.Thread, ThreadPool]] self._flusher_pid = None # type: Optional[int] self._ensure_thread() @@ -435,25 +454,35 @@ def _ensure_thread(self): """ if not self._running: return False + pid = os.getpid() if self._flusher_pid == pid: return True + with self._lock: self._flusher_pid = pid - self._flusher = Thread(target=self._flush_loop) - self._flusher.daemon = True + + if not is_gevent(): + self._flusher = threading.Thread(target=self._flush_loop) + self._flusher.daemon = True + start_flusher = self._flusher.start + else: + self._flusher = ThreadPool(1) + start_flusher = partial(self._flusher.spawn, func=self._flush_loop) + try: - self._flusher.start() + start_flusher() except RuntimeError: # Unfortunately at this point the interpreter is in a state that no # longer allows us to spawn a thread and we have to bail. self._running = False return False + return True def _flush_loop(self): # type: (...) -> None - _thread_local.in_metrics = True + _in_metrics.set(True) while self._running or self._force_flush: self._flush() if self._running: @@ -608,7 +637,6 @@ def kill(self): self._running = False self._flush_event.set() - self._flusher.join() self._flusher = None @metrics_noop diff --git a/sentry_sdk/profiler.py b/sentry_sdk/profiler.py index 8f90855b42..be954b2a2c 100644 --- a/sentry_sdk/profiler.py +++ b/sentry_sdk/profiler.py @@ -490,7 +490,7 @@ def _set_initial_sampling_decision(self, sampling_context): # type: (SamplingContext) -> None """ Sets the profile's sampling decision according to the following - precdence rules: + precedence rules: 1. If the transaction to be profiled is not sampled, that decision will be used, regardless of anything else. diff --git a/tests/test_metrics.py b/tests/test_metrics.py index f8c054c273..773d98617a 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -13,13 +13,6 @@ except ImportError: import mock # python < 3.3 -try: - import gevent -except ImportError: - gevent = None - -requires_gevent = pytest.mark.skipif(gevent is None, reason="gevent not enabled") - def parse_metrics(bytes): rv = [] @@ -52,7 +45,8 @@ def parse_metrics(bytes): return rv -def test_incr(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_incr(sentry_init, capture_envelopes, maybe_monkeypatched_threading): sentry_init( release="fun-release", environment="not-fun-env", @@ -103,7 +97,8 @@ def test_incr(sentry_init, capture_envelopes): } -def test_timing(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_timing(sentry_init, capture_envelopes, maybe_monkeypatched_threading): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -162,7 +157,10 @@ def test_timing(sentry_init, capture_envelopes): ) -def test_timing_decorator(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_timing_decorator( + sentry_init, capture_envelopes, maybe_monkeypatched_threading +): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -254,7 +252,8 @@ def amazing_nano(): assert line.strip() == "assert amazing() == 42" -def test_timing_basic(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_timing_basic(sentry_init, capture_envelopes, maybe_monkeypatched_threading): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -308,7 +307,8 @@ def test_timing_basic(sentry_init, capture_envelopes): } -def test_distribution(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_distribution(sentry_init, capture_envelopes, maybe_monkeypatched_threading): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -369,7 +369,8 @@ def test_distribution(sentry_init, capture_envelopes): ) -def test_set(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_set(sentry_init, capture_envelopes, maybe_monkeypatched_threading): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -421,7 +422,8 @@ def test_set(sentry_init, capture_envelopes): } -def test_gauge(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_gauge(sentry_init, capture_envelopes, maybe_monkeypatched_threading): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -453,6 +455,7 @@ def test_gauge(sentry_init, capture_envelopes): } +@pytest.mark.forked def test_multiple(sentry_init, capture_envelopes): sentry_init( release="fun-release@1.0.0", @@ -506,7 +509,10 @@ def test_multiple(sentry_init, capture_envelopes): } -def test_transaction_name(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_transaction_name( + sentry_init, capture_envelopes, maybe_monkeypatched_threading +): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -543,8 +549,11 @@ def test_transaction_name(sentry_init, capture_envelopes): } +@pytest.mark.forked @pytest.mark.parametrize("sample_rate", [1.0, None]) -def test_metric_summaries(sentry_init, capture_envelopes, sample_rate): +def test_metric_summaries( + sentry_init, capture_envelopes, sample_rate, maybe_monkeypatched_threading +): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -650,7 +659,10 @@ def test_metric_summaries(sentry_init, capture_envelopes, sample_rate): } -def test_metrics_summary_disabled(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_metrics_summary_disabled( + sentry_init, capture_envelopes, maybe_monkeypatched_threading +): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -691,7 +703,10 @@ def test_metrics_summary_disabled(sentry_init, capture_envelopes): assert "_metrics_summary" not in t["spans"][0] -def test_metrics_summary_filtered(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_metrics_summary_filtered( + sentry_init, capture_envelopes, maybe_monkeypatched_threading +): def should_summarize_metric(key, tags): return key == "foo" @@ -757,7 +772,10 @@ def should_summarize_metric(key, tags): } in t["d:foo@second"] -def test_tag_normalization(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_tag_normalization( + sentry_init, capture_envelopes, maybe_monkeypatched_threading +): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -801,7 +819,10 @@ def test_tag_normalization(sentry_init, capture_envelopes): # fmt: on -def test_before_emit_metric(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_before_emit_metric( + sentry_init, capture_envelopes, maybe_monkeypatched_threading +): def before_emit(key, tags): if key == "removed-metric": return False @@ -841,7 +862,10 @@ def before_emit(key, tags): } -def test_aggregator_flush(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_aggregator_flush( + sentry_init, capture_envelopes, maybe_monkeypatched_threading +): sentry_init( release="fun-release@1.0.0", environment="not-fun-env", @@ -858,7 +882,10 @@ def test_aggregator_flush(sentry_init, capture_envelopes): assert Hub.current.client.metrics_aggregator.buckets == {} -def test_tag_serialization(sentry_init, capture_envelopes): +@pytest.mark.forked +def test_tag_serialization( + sentry_init, capture_envelopes, maybe_monkeypatched_threading +): sentry_init( release="fun-release", environment="not-fun-env", @@ -895,7 +922,10 @@ def test_tag_serialization(sentry_init, capture_envelopes): } -def test_flush_recursion_protection(sentry_init, capture_envelopes, monkeypatch): +@pytest.mark.forked +def test_flush_recursion_protection( + sentry_init, capture_envelopes, monkeypatch, maybe_monkeypatched_threading +): sentry_init( release="fun-release", environment="not-fun-env", @@ -924,8 +954,9 @@ def bad_capture_envelope(*args, **kwargs): assert m[0][1] == "counter@none" +@pytest.mark.forked def test_flush_recursion_protection_background_flush( - sentry_init, capture_envelopes, monkeypatch + sentry_init, capture_envelopes, monkeypatch, maybe_monkeypatched_threading ): monkeypatch.setattr(metrics.MetricsAggregator, "FLUSHER_SLEEP_TIME", 0.1) sentry_init( @@ -954,26 +985,3 @@ def bad_capture_envelope(*args, **kwargs): m = parse_metrics(envelope.items[0].payload.get_bytes()) assert len(m) == 1 assert m[0][1] == "counter@none" - - -@pytest.mark.forked -@requires_gevent -def test_no_metrics_with_gevent(sentry_init, capture_envelopes): - from gevent import monkey - - monkey.patch_all() - - sentry_init( - release="fun-release", - environment="not-fun-env", - _experiments={"enable_metrics": True, "metric_code_locations": True}, - ) - ts = time.time() - envelopes = capture_envelopes() - - metrics.incr("foobar", 1.0, tags={"foo": "bar", "blub": "blah"}, timestamp=ts) - metrics.incr("foobar", 2.0, tags={"foo": "bar", "blub": "blah"}, timestamp=ts) - Hub.current.flush() - - assert Hub.current.client.metrics_aggregator is None - assert len(envelopes) == 0