diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 18fdf94174..37dc04fb57 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -257,6 +257,8 @@ def __init__( if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: self.response_callbacks.update(self.__class__.RESP3_RESPONSE_CALLBACKS) + else: + self.response_callbacks.update(self.__class__.RESP2_RESPONSE_CALLBACKS) # If using a single connection client, we need to lock creation-of and use-of # the client in order to avoid race conditions such as using asyncio.gather diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 4a606ad38f..1c4222c885 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -321,6 +321,8 @@ def __init__( kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy() if kwargs.get("protocol") in ["3", 3]: kwargs["response_callbacks"].update(self.__class__.RESP3_RESPONSE_CALLBACKS) + else: + kwargs["response_callbacks"].update(self.__class__.RESP2_RESPONSE_CALLBACKS) self.connection_kwargs = kwargs if startup_nodes: diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 2e24f253c2..c64e282fe0 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -355,10 +355,9 @@ async def on_connect(self) -> None: auth_args = ["default", auth_args[0]] await self.send_command("HELLO", self.protocol, "AUTH", *auth_args) response = await self.read_response() - if response.get(b"proto") not in [2, "2"] and response.get("proto") not in [ - 2, - "2", - ]: + if response.get(b"proto") != int(self.protocol) and response.get( + "proto" + ) != int(self.protocol): raise ConnectionError("Invalid RESP version") # avoid checking health here -- PING will fail if we try # to check the health prior to the AUTH @@ -379,7 +378,7 @@ async def on_connect(self) -> None: raise AuthenticationError("Invalid Username or Password") # if resp version is specified, switch to it - elif self.protocol != 2: + elif self.protocol not in [2, "2"]: if isinstance(self._parser, _AsyncRESP2Parser): self.set_parser(_AsyncRESP3Parser) # update cluster exception classes diff --git a/redis/client.py b/redis/client.py index e4e82981e9..96ed584cfc 100755 --- a/redis/client.py +++ b/redis/client.py @@ -726,101 +726,52 @@ def parse_set_result(response, **options): class AbstractRedis: RESPONSE_CALLBACKS = { - **string_keys_to_dict( - "AUTH COPY EXPIRE EXPIREAT PEXPIRE PEXPIREAT " - "HEXISTS HMSET MOVE MSETNX PERSIST " - "PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX", - bool, - ), - **string_keys_to_dict( - "BITCOUNT BITPOS DECRBY DEL EXISTS GEOADD GETBIT HDEL HLEN " - "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD " - "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN " - "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM " - "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE", - int, - ), + **string_keys_to_dict("EXPIRE EXPIREAT PEXPIRE PEXPIREAT AUTH", bool), + **string_keys_to_dict("EXISTS", int), **string_keys_to_dict("INCRBYFLOAT HINCRBYFLOAT", float), - **string_keys_to_dict( - # these return OK, or int if redis-server is >=1.3.4 - "LPUSH RPUSH", - lambda r: isinstance(r, int) and r or str_if_bytes(r) == "OK", - ), - **string_keys_to_dict("SORT", sort_return_tuples), - **string_keys_to_dict("ZSCORE ZINCRBY GEODIST", float_or_none), - **string_keys_to_dict( - "FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE ASKING READONLY READWRITE " - "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ", - bool_ok, - ), - **string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None), - **string_keys_to_dict( - "SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set() - ), - **string_keys_to_dict( - "ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE " - "ZREVRANGE ZREVRANGEBYSCORE", - zset_score_pairs, - ), - **string_keys_to_dict( - "BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None - ), - **string_keys_to_dict("ZRANK ZREVRANK", int_or_none), - **string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list), - **string_keys_to_dict("XREAD XREADGROUP", parse_xread), - **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True), - "ACL CAT": lambda r: list(map(str_if_bytes, r)), - "ACL DELUSER": int, - "ACL GENPASS": str_if_bytes, - "ACL GETUSER": parse_acl_getuser, - "ACL HELP": lambda r: list(map(str_if_bytes, r)), - "ACL LIST": lambda r: list(map(str_if_bytes, r)), - "ACL LOAD": bool_ok, - "ACL LOG": parse_acl_log, - "ACL SAVE": bool_ok, - "ACL SETUSER": bool_ok, - "ACL USERS": lambda r: list(map(str_if_bytes, r)), - "ACL WHOAMI": str_if_bytes, - "CLIENT GETNAME": str_if_bytes, + **string_keys_to_dict("READONLY", bool_ok), + "CLUSTER DELSLOTS": bool_ok, + "CLUSTER ADDSLOTS": bool_ok, + "COMMAND": parse_command, + "INFO": parse_info, + "SET": parse_set_result, "CLIENT ID": int, "CLIENT KILL": parse_client_kill, "CLIENT LIST": parse_client_list, "CLIENT INFO": parse_client_info, "CLIENT SETNAME": bool_ok, - "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False, - "CLIENT PAUSE": bool_ok, - "CLIENT GETREDIR": int, "CLIENT TRACKINGINFO": lambda r: list(map(str_if_bytes, r)), - "CLUSTER ADDSLOTS": bool_ok, - "CLUSTER ADDSLOTSRANGE": bool_ok, + "LASTSAVE": timestamp_to_datetime, + "RESET": str_if_bytes, + "SLOWLOG GET": parse_slowlog_get, + "TIME": lambda x: (int(x[0]), int(x[1])), + **string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None), + "SCAN": parse_scan, + "CLIENT GETNAME": str_if_bytes, + "SSCAN": parse_scan, + "ACL LOG": parse_acl_log, + "ACL WHOAMI": str_if_bytes, + "ACL GENPASS": str_if_bytes, + "ACL CAT": lambda r: list(map(str_if_bytes, r)), + "HSCAN": parse_hscan, + "ZSCAN": parse_zscan, + **string_keys_to_dict( + "BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None + ), "CLUSTER COUNT-FAILURE-REPORTS": lambda x: int(x), "CLUSTER COUNTKEYSINSLOT": lambda x: int(x), - "CLUSTER DELSLOTS": bool_ok, - "CLUSTER DELSLOTSRANGE": bool_ok, "CLUSTER FAILOVER": bool_ok, "CLUSTER FORGET": bool_ok, - "CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)), "CLUSTER INFO": parse_cluster_info, "CLUSTER KEYSLOT": lambda x: int(x), "CLUSTER MEET": bool_ok, "CLUSTER NODES": parse_cluster_nodes, - "CLUSTER REPLICAS": parse_cluster_nodes, "CLUSTER REPLICATE": bool_ok, "CLUSTER RESET": bool_ok, "CLUSTER SAVECONFIG": bool_ok, - "CLUSTER SET-CONFIG-EPOCH": bool_ok, "CLUSTER SETSLOT": bool_ok, "CLUSTER SLAVES": parse_cluster_nodes, - "COMMAND": parse_command, - "COMMAND COUNT": int, - "COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)), - "CONFIG GET": parse_config_get, - "CONFIG RESETSTAT": bool_ok, - "CONFIG SET": bool_ok, - "DEBUG OBJECT": parse_debug_object, - "FUNCTION DELETE": bool_ok, - "FUNCTION FLUSH": bool_ok, - "FUNCTION RESTORE": bool_ok, + **string_keys_to_dict("GEODIST", float_or_none), "GEOHASH": lambda r: list(map(str_if_bytes, r)), "GEOPOS": lambda r: list( map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r) @@ -828,60 +779,104 @@ class AbstractRedis: "GEOSEARCH": parse_geosearch_generic, "GEORADIUS": parse_geosearch_generic, "GEORADIUSBYMEMBER": parse_geosearch_generic, - "HGETALL": lambda r: r and pairs_to_dict(r) or {}, - "HSCAN": parse_hscan, - "INFO": parse_info, - "LASTSAVE": timestamp_to_datetime, - "MEMORY PURGE": bool_ok, - "MEMORY STATS": parse_memory_stats, - "MEMORY USAGE": int_or_none, - "MODULE LOAD": parse_module_result, - "MODULE UNLOAD": parse_module_result, - "MODULE LIST": lambda r: [pairs_to_dict(m) for m in r], - "OBJECT": parse_object, + "XAUTOCLAIM": parse_xautoclaim, + "XINFO STREAM": parse_xinfo_stream, + "XPENDING": parse_xpending, + **string_keys_to_dict("XREAD XREADGROUP", parse_xread), + "COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)), + **string_keys_to_dict("SORT", sort_return_tuples), "PING": lambda r: str_if_bytes(r) == "PONG", - "QUIT": bool_ok, - "STRALGO": parse_stralgo, + "ACL SETUSER": bool_ok, "PUBSUB NUMSUB": parse_pubsub_numsub, - "PUBSUB SHARDNUMSUB": parse_pubsub_numsub, - "RANDOMKEY": lambda r: r and r or None, - "RESET": str_if_bytes, - "SCAN": parse_scan, - "SCRIPT EXISTS": lambda r: list(map(bool, r)), "SCRIPT FLUSH": bool_ok, - "SCRIPT KILL": bool_ok, "SCRIPT LOAD": str_if_bytes, - "SENTINEL CKQUORUM": bool_ok, - "SENTINEL FAILOVER": bool_ok, - "SENTINEL FLUSHCONFIG": bool_ok, - "SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master, - "SENTINEL MASTER": parse_sentinel_master, - "SENTINEL MASTERS": parse_sentinel_masters, - "SENTINEL MONITOR": bool_ok, - "SENTINEL RESET": bool_ok, - "SENTINEL REMOVE": bool_ok, - "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels, - "SENTINEL SET": bool_ok, - "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels, - "SET": parse_set_result, - "SLOWLOG GET": parse_slowlog_get, - "SLOWLOG LEN": int, - "SLOWLOG RESET": bool_ok, - "SSCAN": parse_scan, - "TIME": lambda x: (int(x[0]), int(x[1])), + "ACL GETUSER": parse_acl_getuser, + "CONFIG SET": bool_ok, + **string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list), "XCLAIM": parse_xclaim, - "XAUTOCLAIM": parse_xautoclaim, - "XGROUP CREATE": bool_ok, - "XGROUP DELCONSUMER": int, - "XGROUP DESTROY": bool, - "XGROUP SETID": bool_ok, - "XINFO CONSUMERS": parse_list_of_dicts, - "XINFO GROUPS": parse_list_of_dicts, - "XINFO STREAM": parse_xinfo_stream, - "XPENDING": parse_xpending, + } + + RESP2_RESPONSE_CALLBACKS = { + "CONFIG GET": parse_config_get, + **string_keys_to_dict( + "SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set() + ), + **string_keys_to_dict( + "ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE " + "ZREVRANGE ZREVRANGEBYSCORE", + zset_score_pairs, + ), + **string_keys_to_dict("ZSCORE ZINCRBY", float_or_none), "ZADD": parse_zadd, - "ZSCAN": parse_zscan, "ZMSCORE": parse_zmscore, + "HGETALL": lambda r: r and pairs_to_dict(r) or {}, + "MEMORY STATS": parse_memory_stats, + "MODULE LIST": lambda r: [pairs_to_dict(m) for m in r], + # **string_keys_to_dict( + # "COPY " + # "HEXISTS HMSET MOVE MSETNX PERSIST " + # "PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX", + # bool, + # ), + # **string_keys_to_dict( + # "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD " + # "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN " + # "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM " + # "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE", + # int, + # ), + # **string_keys_to_dict( + # "FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE ASKING READWRITE " + # "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ", + # bool_ok, + # ), + # **string_keys_to_dict("ZRANK ZREVRANK", int_or_none), + # **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True), + # "ACL HELP": lambda r: list(map(str_if_bytes, r)), + # "ACL LIST": lambda r: list(map(str_if_bytes, r)), + # "ACL LOAD": bool_ok, + # "ACL SAVE": bool_ok, + # "ACL USERS": lambda r: list(map(str_if_bytes, r)), + # "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False, + # "CLIENT PAUSE": bool_ok, + # "CLUSTER ADDSLOTSRANGE": bool_ok, + # "CLUSTER DELSLOTSRANGE": bool_ok, + # "CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)), + # "CLUSTER REPLICAS": parse_cluster_nodes, + # "CLUSTER SET-CONFIG-EPOCH": bool_ok, + # "CONFIG RESETSTAT": bool_ok, + # "DEBUG OBJECT": parse_debug_object, + # "FUNCTION DELETE": bool_ok, + # "FUNCTION FLUSH": bool_ok, + # "FUNCTION RESTORE": bool_ok, + # "MEMORY PURGE": bool_ok, + # "MEMORY USAGE": int_or_none, + # "MODULE LOAD": parse_module_result, + # "MODULE UNLOAD": parse_module_result, + # "OBJECT": parse_object, + # "QUIT": bool_ok, + # "STRALGO": parse_stralgo, + # "RANDOMKEY": lambda r: r and r or None, + # "SCRIPT EXISTS": lambda r: list(map(bool, r)), + # "SCRIPT KILL": bool_ok, + # "SENTINEL CKQUORUM": bool_ok, + # "SENTINEL FAILOVER": bool_ok, + # "SENTINEL FLUSHCONFIG": bool_ok, + # "SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master, + # "SENTINEL MASTER": parse_sentinel_master, + # "SENTINEL MASTERS": parse_sentinel_masters, + # "SENTINEL MONITOR": bool_ok, + # "SENTINEL RESET": bool_ok, + # "SENTINEL REMOVE": bool_ok, + # "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels, + # "SENTINEL SET": bool_ok, + # "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels, + # "SLOWLOG RESET": bool_ok, + # "XGROUP CREATE": bool_ok, + # "XGROUP DESTROY": bool, + # "XGROUP SETID": bool_ok, + "XINFO CONSUMERS": parse_list_of_dicts, + "XINFO GROUPS": parse_list_of_dicts, } RESP3_RESPONSE_CALLBACKS = { @@ -1122,6 +1117,8 @@ def __init__( if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: self.response_callbacks.update(self.__class__.RESP3_RESPONSE_CALLBACKS) + else: + self.response_callbacks.update(self.__class__.RESP2_RESPONSE_CALLBACKS) def __repr__(self): return f"{type(self).__name__}<{repr(self.connection_pool)}>" diff --git a/redis/connection.py b/redis/connection.py index b2e6eaac83..023edd3fef 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -288,7 +288,7 @@ def on_connect(self): auth_args = cred_provider.get_credentials() # if resp version is specified and we have auth args, # we need to send them via HELLO - if auth_args and self.protocol != 2: + if auth_args and self.protocol not in [2, "2"]: if isinstance(self._parser, _RESP2Parser): self.set_parser(_RESP3Parser) # update cluster exception classes @@ -321,7 +321,7 @@ def on_connect(self): raise AuthenticationError("Invalid Username or Password") # if resp version is specified, switch to it - elif self.protocol != 2: + elif self.protocol not in [2, "2"]: if isinstance(self._parser, _RESP2Parser): self.set_parser(_RESP3Parser) # update cluster exception classes diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 78376fd0e9..b7d830e1f8 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -85,7 +85,7 @@ async def test_response_callbacks(self, r: redis.Redis): assert await r.get("a") == "static" async def test_case_insensitive_command_names(self, r: redis.Redis): - assert r.response_callbacks["del"] == r.response_callbacks["DEL"] + assert r.response_callbacks["ping"] == r.response_callbacks["PING"] class TestRedisCommands: @@ -2718,7 +2718,7 @@ async def test_xgroup_setid(self, r: redis.Redis): ] assert await r.xinfo_groups(stream) == expected - @skip_if_server_version_lt("5.0.0") + @skip_if_server_version_lt("7.2.0") async def test_xinfo_consumers(self, r: redis.Redis): stream = "stream" group = "group" @@ -2734,8 +2734,8 @@ async def test_xinfo_consumers(self, r: redis.Redis): info = await r.xinfo_consumers(stream, group) assert len(info) == 2 expected = [ - {"name": consumer1.encode(), "pending": 1}, - {"name": consumer2.encode(), "pending": 2}, + {"name": consumer1.encode(), "pending": 1, "inactive": 2}, + {"name": consumer2.encode(), "pending": 2, "inactive": 2}, ] # we can't determine the idle time, so just make sure it's an int diff --git a/tests/test_commands.py b/tests/test_commands.py index 97fbb34925..0bbdcb27db 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -68,7 +68,7 @@ def test_response_callbacks(self, r): assert r["a"] == "static" def test_case_insensitive_command_names(self, r): - assert r.response_callbacks["del"] == r.response_callbacks["DEL"] + assert r.response_callbacks["ping"] == r.response_callbacks["PING"] class TestRedisCommands: @@ -152,9 +152,8 @@ def teardown(): r.acl_setuser(username, keys=["*"], commands=["+set"]) assert r.acl_dryrun(username, "set", "key", "value") == b"OK" - assert r.acl_dryrun(username, "get", "key").startswith( - b"This user has no permissions to run the" - ) + no_permissions_message = b"user has no permissions to run the" + assert no_permissions_message in r.acl_dryrun(username, "get", "key") @skip_if_server_version_lt("6.0.0") @skip_if_redis_enterprise() @@ -232,12 +231,12 @@ def teardown(): enabled=True, reset=True, passwords=["+pass1", "+pass2"], - categories=["+set", "+@hash", "-geo"], + categories=["+set", "+@hash", "-@geo"], commands=["+get", "+mget", "-hset"], keys=["cache:*", "objects:*"], ) acl = r.acl_getuser(username) - assert set(acl["categories"]) == {"-@all", "+@set", "+@hash"} + assert set(acl["categories"]) == {"-@all", "+@set", "+@hash", "-@geo"} assert set(acl["commands"]) == {"+get", "+mget", "-hset"} assert acl["enabled"] is True assert "on" in acl["flags"] @@ -315,7 +314,7 @@ def teardown(): selectors=[("+set", "%W~app*")], ) acl = r.acl_getuser(username) - assert set(acl["categories"]) == {"-@all", "+@set", "+@hash"} + assert set(acl["categories"]) == {"-@all", "+@set", "+@hash", "-@geo"} assert set(acl["commands"]) == {"+get", "+mget", "-hset"} assert acl["enabled"] is True assert "on" in acl["flags"] @@ -325,7 +324,7 @@ def teardown(): assert_resp_response( r, acl["selectors"], - ["commands", "-@all +set", "keys", "%W~app*", "channels", ""], + [["commands", "-@all +set", "keys", "%W~app*", "channels", ""]], [{"commands": "-@all +set", "keys": "%W~app*", "channels": ""}], ) @@ -4214,7 +4213,7 @@ def test_xgroup_setid(self, r): ] assert r.xinfo_groups(stream) == expected - @skip_if_server_version_lt("5.0.0") + @skip_if_server_version_lt("7.2.0") def test_xinfo_consumers(self, r): stream = "stream" group = "group" @@ -4230,8 +4229,8 @@ def test_xinfo_consumers(self, r): info = r.xinfo_consumers(stream, group) assert len(info) == 2 expected = [ - {"name": consumer1.encode(), "pending": 1}, - {"name": consumer2.encode(), "pending": 2}, + {"name": consumer1.encode(), "pending": 1, "inactive": 2}, + {"name": consumer2.encode(), "pending": 2, "inactive": 2}, ] # we can't determine the idle time, so just make sure it's an int