Skip to content

feat(integrations): add support for cluster clients from redis sdk #2394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 126 additions & 25 deletions sentry_sdk/integrations/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
)

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Dict, Sequence
from redis import Redis, RedisCluster
from redis.asyncio.cluster import (
RedisCluster as AsyncRedisCluster,
ClusterPipeline as AsyncClusterPipeline,
)
from sentry_sdk.tracing import Span

_SINGLE_KEY_COMMANDS = frozenset(
Expand Down Expand Up @@ -83,8 +89,7 @@ def _set_pipeline_data(
):
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
span.set_tag("redis.is_cluster", is_cluster)
transaction = is_transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)
span.set_tag("redis.transaction", is_transaction)

commands = []
for i, arg in enumerate(command_stack):
Expand Down Expand Up @@ -118,7 +123,7 @@ def _set_client_data(span, is_cluster, name, *args):
span.set_tag("redis.key", args[0])


def _set_db_data(span, connection_params):
def _set_db_data_on_span(span, connection_params):
# type: (Span, Dict[str, Any]) -> None
span.set_data(SPANDATA.DB_SYSTEM, "redis")

Expand All @@ -135,8 +140,43 @@ def _set_db_data(span, connection_params):
span.set_data(SPANDATA.SERVER_PORT, port)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
# type: (Any, bool, Any) -> None
def _set_db_data(span, redis_instance):
# type: (Span, Redis[Any]) -> None
try:
_set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs)
except AttributeError:
pass # connections_kwargs may be missing in some cases


def _set_cluster_db_data(span, redis_cluster_instance):
# type: (Span, RedisCluster[Any]) -> None
default_node = redis_cluster_instance.get_default_node()
if default_node is not None:
_set_db_data_on_span(
span, {"host": default_node.host, "port": default_node.port}
)


def _set_async_cluster_db_data(span, async_redis_cluster_instance):
# type: (Span, AsyncRedisCluster[Any]) -> None
default_node = async_redis_cluster_instance.get_default_node()
if default_node is not None and default_node.connection_kwargs is not None:
_set_db_data_on_span(span, default_node.connection_kwargs)


def _set_async_cluster_pipeline_db_data(span, async_redis_cluster_pipeline_instance):
# type: (Span, AsyncClusterPipeline[Any]) -> None
with capture_internal_exceptions():
_set_async_cluster_db_data(
span,
# the AsyncClusterPipeline has always had a `_client` attr but it is private so potentially problematic and mypy
# does not recognize it - see https://github.com/redis/redis-py/blame/v5.0.0/redis/asyncio/cluster.py#L1386
async_redis_cluster_pipeline_instance._client, # type: ignore[attr-defined]
)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn):
# type: (Any, bool, Any, Callable[[Span, Any], None]) -> None
old_execute = pipeline_cls.execute

def sentry_patched_execute(self, *args, **kwargs):
Expand All @@ -150,12 +190,12 @@ def sentry_patched_execute(self, *args, **kwargs):
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
is_cluster,
get_command_args_fn,
self.transaction,
False if is_cluster else self.transaction,
self.command_stack,
)

Expand All @@ -164,8 +204,8 @@ def sentry_patched_execute(self, *args, **kwargs):
pipeline_cls.execute = sentry_patched_execute


def patch_redis_client(cls, is_cluster):
# type: (Any, bool) -> None
def patch_redis_client(cls, is_cluster, set_db_data_fn):
# type: (Any, bool, Callable[[Span, Any], None]) -> None
"""
This function can be used to instrument custom redis client classes or
subclasses.
Expand All @@ -189,11 +229,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
description = description[: integration.max_data_size - len("...")] + "..."

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
try:
_set_db_data(span, self.connection_pool.connection_kwargs)
except AttributeError:
pass # connections_kwargs may be missing in some cases

set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)

return old_execute_command(self, name, *args, **kwargs)
Expand All @@ -203,14 +239,16 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):

def _patch_redis(StrictRedis, client): # noqa: N803
# type: (Any, Any) -> None
patch_redis_client(StrictRedis, is_cluster=False)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
patch_redis_client(StrictRedis, is_cluster=False, set_db_data_fn=_set_db_data)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args, _set_db_data)
try:
strict_pipeline = client.StrictPipeline
except AttributeError:
pass
else:
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
patch_redis_pipeline(
strict_pipeline, False, _get_redis_command_args, _set_db_data
)

try:
import redis.asyncio
Expand All @@ -222,8 +260,56 @@ def _patch_redis(StrictRedis, client): # noqa: N803
patch_redis_async_pipeline,
)

patch_redis_async_client(redis.asyncio.client.StrictRedis)
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
patch_redis_async_client(
redis.asyncio.client.StrictRedis,
is_cluster=False,
set_db_data_fn=_set_db_data,
)
patch_redis_async_pipeline(
redis.asyncio.client.Pipeline,
False,
_get_redis_command_args,
set_db_data_fn=_set_db_data,
)


def _patch_redis_cluster():
# type: () -> None
"""Patches the cluster module on redis SDK (as opposed to rediscluster library)"""
try:
from redis import RedisCluster, cluster
except ImportError:
pass
else:
patch_redis_client(RedisCluster, True, _set_cluster_db_data)
patch_redis_pipeline(
cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
_set_cluster_db_data,
)

try:
from redis.asyncio import cluster as async_cluster
except ImportError:
pass
else:
from sentry_sdk.integrations.redis.asyncio import (
patch_redis_async_client,
patch_redis_async_pipeline,
)

patch_redis_async_client(
async_cluster.RedisCluster,
is_cluster=True,
set_db_data_fn=_set_async_cluster_db_data,
)
patch_redis_async_pipeline(
async_cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
set_db_data_fn=_set_async_cluster_pipeline_db_data,
)


def _patch_rb():
Expand All @@ -233,9 +319,15 @@ def _patch_rb():
except ImportError:
pass
else:
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
patch_redis_client(
rb.clients.FanoutClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(
rb.clients.MappingClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(
rb.clients.RoutingClient, is_cluster=False, set_db_data_fn=_set_db_data
)


def _patch_rediscluster():
Expand All @@ -245,7 +337,9 @@ def _patch_rediscluster():
except ImportError:
return

patch_redis_client(rediscluster.RedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.RedisCluster, is_cluster=True, set_db_data_fn=_set_db_data
)

# up to v1.3.6, __version__ attribute is a tuple
# from v2.0.0, __version__ is a string and VERSION a tuple
Expand All @@ -255,11 +349,17 @@ def _patch_rediscluster():
# https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst
if (0, 2, 0) < version < (2, 0, 0):
pipeline_cls = rediscluster.pipeline.StrictClusterPipeline
patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.StrictRedisCluster,
is_cluster=True,
set_db_data_fn=_set_db_data,
)
else:
pipeline_cls = rediscluster.pipeline.ClusterPipeline

patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command)
patch_redis_pipeline(
pipeline_cls, True, _parse_rediscluster_command, set_db_data_fn=_set_db_data
)


class RedisIntegration(Integration):
Expand All @@ -278,6 +378,7 @@ def setup_once():
raise DidNotEnable("Redis client not installed")

_patch_redis(StrictRedis, client)
_patch_redis_cluster()
_patch_rb()

try:
Expand Down
36 changes: 20 additions & 16 deletions sentry_sdk/integrations/redis/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@
from sentry_sdk.consts import OP
from sentry_sdk.integrations.redis import (
RedisIntegration,
_get_redis_command_args,
_get_span_description,
_set_client_data,
_set_db_data,
_set_pipeline_data,
)
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.tracing import Span
from sentry_sdk.utils import capture_internal_exceptions

if TYPE_CHECKING:
from typing import Any
from collections.abc import Callable
from typing import Any, Union
from redis.asyncio.client import Pipeline, StrictRedis
from redis.asyncio.cluster import ClusterPipeline, RedisCluster


def patch_redis_async_pipeline(pipeline_cls):
# type: (Any) -> None
def patch_redis_async_pipeline(
pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn
):
# type: (Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]], bool, Any, Callable[[Span, Any], None]) -> None
old_execute = pipeline_cls.execute

async def _sentry_execute(self, *args, **kwargs):
Expand All @@ -32,22 +36,22 @@ async def _sentry_execute(self, *args, **kwargs):
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
False,
_get_redis_command_args,
self.is_transaction,
self.command_stack,
is_cluster,
get_command_args_fn,
False if is_cluster else self.is_transaction,
self._command_stack if is_cluster else self.command_stack,
)

return await old_execute(self, *args, **kwargs)

pipeline_cls.execute = _sentry_execute
pipeline_cls.execute = _sentry_execute # type: ignore[method-assign]


def patch_redis_async_client(cls):
# type: (Any) -> None
def patch_redis_async_client(cls, is_cluster, set_db_data_fn):
# type: (Union[type[StrictRedis[Any]], type[RedisCluster[Any]]], bool, Callable[[Span, Any], None]) -> None
old_execute_command = cls.execute_command

async def _sentry_execute_command(self, name, *args, **kwargs):
Expand All @@ -60,9 +64,9 @@ async def _sentry_execute_command(self, name, *args, **kwargs):
description = _get_span_description(name, *args)

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_db_data(span, self.connection_pool.connection_kwargs)
_set_client_data(span, False, name, *args)
set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)

return await old_execute_command(self, name, *args, **kwargs)

cls.execute_command = _sentry_execute_command
cls.execute_command = _sentry_execute_command # type: ignore[method-assign]
3 changes: 3 additions & 0 deletions tests/integrations/redis/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("redis.cluster")
Loading