From 04fccf92d6ab67399de75dfa685e5e6d5b2a6780 Mon Sep 17 00:00:00 2001 From: Jake Barnwell Date: Fri, 11 Feb 2022 16:50:13 -0500 Subject: [PATCH 1/4] Add cluster support for scripting --- redis/cluster.py | 87 ++++++++++++++++++++++++++++-------- redis/commands/cluster.py | 9 +++- redis/commands/core.py | 2 +- redis/commands/parser.py | 17 ++++++- redis/utils.py | 2 +- tests/test_command_parser.py | 15 +++++++ tests/test_scripting.py | 55 ++++++++++++++++++++++- 7 files changed, 163 insertions(+), 24 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index b594d3084a..c4bd498f9c 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -289,6 +289,9 @@ class RedisCluster(RedisClusterCommands): [ "FLUSHALL", "FLUSHDB", + "SCRIPT EXISTS", + "SCRIPT FLUSH", + "SCRIPT LOAD", ], PRIMARIES, ), @@ -379,6 +382,24 @@ class RedisCluster(RedisClusterCommands): ], parse_scan_result, ), + list_keys_to_dict( + [ + "SCRIPT LOAD", + ], + lambda command, res: list(res.values()).pop(), + ), + list_keys_to_dict( + [ + "SCRIPT EXISTS", + ], + lambda command, res: [all(k) for k in zip(*res.values())], + ), + list_keys_to_dict( + [ + "SCRIPT FLUSH", + ], + lambda command, res: all(res.values()), + ), ) ERRORS_ALLOW_RETRY = ( @@ -777,40 +798,70 @@ def _get_command_keys(self, *args): """ Get the keys in the command. If the command has no keys in in, None is returned. + + NOTE: Due to a bug in redis<7.0, this function does not work properly + for EVAL or EVALSHA when the `numkeys` arg is 0. + - issue: https://github.com/redis/redis/issues/9493 + - fix: https://github.com/redis/redis/pull/9733 + + So, don't use this function with EVAL or EVALSHA. """ redis_conn = self.get_default_node().redis_connection return self.commands_parser.get_keys(redis_conn, *args) def determine_slot(self, *args): """ - Figure out what slot based on command and args + Figure out what slot to use based on args. + + Raises a RedisClusterException if there's a missing key and we can't + determine what slots to map the command to; or, if the keys don't + all map to the same key slot. """ - if self.command_flags.get(args[0]) == SLOT_ID: + command = args[0] + if self.command_flags.get(command) == SLOT_ID: # The command contains the slot ID return args[1] # Get the keys in the command - keys = self._get_command_keys(*args) - if keys is None or len(keys) == 0: - raise RedisClusterException( - "No way to dispatch this command to Redis Cluster. " - "Missing key.\nYou can execute the command by specifying " - f"target nodes.\nCommand: {args}" - ) - if len(keys) > 1: - # multi-key command, we need to make sure all keys are mapped to - # the same slot - slots = {self.keyslot(key) for key in keys} - if len(slots) != 1: + # EVAL and EVALSHA are common enough that it's wasteful to go to the + # redis server to parse the keys. Besides, there is a bug in redis<7.0 + # where `self._get_command_keys()` fails anyway. So, we special case + # EVAL/EVALSHA. + if command in ("EVAL", "EVALSHA"): + # command syntax: EVAL "script body" num_keys ... + if len(args) <= 2: + raise RedisClusterException(f"Invalid args in command: {args}") + num_actual_keys = args[2] + eval_keys = args[3 : 3 + num_actual_keys] + # if there are 0 keys, that means the script can be run on any node + # so we can just return a random slot + if len(eval_keys) == 0: + return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) + keys = eval_keys + else: + keys = self._get_command_keys(*args) + if keys is None or len(keys) == 0: raise RedisClusterException( - f"{args[0]} - all keys must map to the same key slot" + "No way to dispatch this command to Redis Cluster. " + "Missing key.\nYou can execute the command by specifying " + f"target nodes.\nCommand: {args}" ) - return slots.pop() - else: - # single key command + + # single key command + if len(keys) == 1: return self.keyslot(keys[0]) + # multi-key command; we need to make sure all keys are mapped to + # the same slot + slots = {self.keyslot(key) for key in keys} + if len(slots) != 1: + raise RedisClusterException( + f"{command} - all keys must map to the same key slot" + ) + + return slots.pop() + def reinitialize_caches(self): self.nodes_manager.initialize() diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index 5d0e804628..8bdcbbadf6 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -1,7 +1,13 @@ from redis.crc import key_slot from redis.exceptions import RedisClusterException, RedisError -from .core import ACLCommands, DataAccessCommands, ManagementCommands, PubSubCommands +from .core import ( + ACLCommands, + DataAccessCommands, + ManagementCommands, + PubSubCommands, + ScriptCommands, +) from .helpers import list_or_args @@ -205,6 +211,7 @@ class RedisClusterCommands( ACLCommands, PubSubCommands, ClusterDataAccessCommands, + ScriptCommands, ): """ A class for all Redis Cluster commands diff --git a/redis/commands/core.py b/redis/commands/core.py index cef0c28e57..cc41969a2b 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -4663,7 +4663,7 @@ def __init__(self, registered_client, script): if isinstance(script, str): # We need the encoding from the client in order to generate an # accurate byte representation of the script - encoder = registered_client.connection_pool.get_encoder() + encoder = registered_client.get_encoder() script = encoder.encode(script) self.sha = hashlib.sha1(script).hexdigest() diff --git a/redis/commands/parser.py b/redis/commands/parser.py index 4cce800ec3..2bb0576910 100644 --- a/redis/commands/parser.py +++ b/redis/commands/parser.py @@ -24,7 +24,14 @@ def initialize(self, r): # https://github.com/redis/redis/pull/8324 def get_keys(self, redis_conn, *args): """ - Get the keys from the passed command + Get the keys from the passed command. + + NOTE: Due to a bug in redis<7.0, this function does not work properly + for EVAL or EVALSHA when the `numkeys` arg is 0. + - issue: https://github.com/redis/redis/issues/9493 + - fix: https://github.com/redis/redis/pull/9733 + + So, don't use this function with EVAL or EVALSHA. """ if len(args) < 2: # The command has no keys in it @@ -72,6 +79,14 @@ def get_keys(self, redis_conn, *args): return keys def _get_moveable_keys(self, redis_conn, *args): + """ + NOTE: Due to a bug in redis<7.0, this function does not work properly + for EVAL or EVALSHA when the `numkeys` arg is 0. + - issue: https://github.com/redis/redis/issues/9493 + - fix: https://github.com/redis/redis/pull/9733 + + So, don't use this function with EVAL or EVALSHA. + """ pieces = [] cmd_name = args[0] # The command name should be splitted into separate arguments, diff --git a/redis/utils.py b/redis/utils.py index 56fec49b70..9ab75f2a97 100644 --- a/redis/utils.py +++ b/redis/utils.py @@ -67,7 +67,7 @@ def merge_result(command, res): Merge all items in `res` into a list. This command is used when sending a command to multiple nodes - and they result from each node should be merged into a single list. + and the result from each node should be merged into a single list. res : 'dict' """ diff --git a/tests/test_command_parser.py b/tests/test_command_parser.py index ad29e69f37..ab050a74e7 100644 --- a/tests/test_command_parser.py +++ b/tests/test_command_parser.py @@ -2,6 +2,8 @@ from redis.commands import CommandsParser +from .conftest import skip_if_server_version_lt + class TestCommandsParser: def test_init_commands(self, r): @@ -68,6 +70,19 @@ def test_get_moveable_keys(self, r): assert commands_parser.get_keys(r, *args8) is None assert commands_parser.get_keys(r, *args9).sort() == ["key1", "key2"].sort() + # A bug in redis<7.0 causes this to fail: https://github.com/redis/redis/issues/9493 + @skip_if_server_version_lt("7.0.0") + def test_get_eval_keys_with_0_keys(self, r): + commands_parser = CommandsParser(r) + args = [ + "EVAL", + "return {ARGV[1],ARGV[2]}", + 0, + "key1", + "key2", + ] + assert commands_parser.get_keys(r, *args) == [] + def test_get_pubsub_keys(self, r): commands_parser = CommandsParser(r) args1 = ["PUBLISH", "foo", "bar"] diff --git a/tests/test_scripting.py b/tests/test_scripting.py index f4671a5f9a..bd08187fb6 100644 --- a/tests/test_scripting.py +++ b/tests/test_scripting.py @@ -21,24 +21,70 @@ """ -@pytest.mark.onlynoncluster class TestScripting: @pytest.fixture(autouse=True) def reset_scripts(self, r): r.script_flush() - def test_eval(self, r): + def test_eval_multiply(self, r): r.set("a", 2) # 2 * 3 == 6 assert r.eval(multiply_script, 1, "a", 3) == 6 # @skip_if_server_version_lt("7.0.0") turn on after redis 7 release + @pytest.mark.onlynoncluster def test_eval_ro(self, unstable_r): unstable_r.set("a", "b") assert unstable_r.eval_ro("return redis.call('GET', KEYS[1])", 1, "a") == b"b" with pytest.raises(redis.ResponseError): unstable_r.eval_ro("return redis.call('DEL', KEYS[1])", 1, "a") + def test_eval_msgpack(self, r): + msgpack_message_dumped = b"\x81\xa4name\xa3Joe" + # this is msgpack.dumps({"name": "joe"}) + assert r.eval(msgpack_hello_script, 0, msgpack_message_dumped) == b"hello Joe" + + def test_eval_same_slot(self, r): + """ + In a clustered redis, the script keys must be in the same slot. + + This test isn't very interesting for standalone redis, but it doesn't + hurt anything. + """ + r.set("A{foo}", 2) + r.set("B{foo}", 4) + # 2 * 4 == 8 + + script = """ + local value = redis.call('GET', KEYS[1]) + local value2 = redis.call('GET', KEYS[2]) + return value * value2 + """ + result = r.eval(script, 2, "A{foo}", "B{foo}") + assert result == 8 + + @pytest.mark.onlycluster + def test_eval_crossslot(self, r): + """ + In a clustered redis, the script keys must be in the same slot. + + This test should fail, because the two keys we send are in different + slots. This test assumes that {foo} and {bar} will not go to the same + server when used. In a setup with 3 primaries and 3 secondaries, this + assumption holds. + """ + r.set("A{foo}", 2) + r.set("B{bar}", 4) + # 2 * 4 == 8 + + script = """ + local value = redis.call('GET', KEYS[1]) + local value2 = redis.call('GET', KEYS[2]) + return value * value2 + """ + with pytest.raises(exceptions.RedisClusterException): + r.eval(script, 2, "A{foo}", "B{bar}") + @skip_if_server_version_lt("6.2.0") def test_script_flush_620(self, r): r.set("a", 2) @@ -75,6 +121,7 @@ def test_evalsha(self, r): assert r.evalsha(sha, 1, "a", 3) == 6 # @skip_if_server_version_lt("7.0.0") turn on after redis 7 release + @pytest.mark.onlynoncluster def test_evalsha_ro(self, unstable_r): unstable_r.set("a", "b") get_sha = unstable_r.script_load("return redis.call('GET', KEYS[1])") @@ -114,6 +161,8 @@ def test_script_object(self, r): # Test first evalsha block assert multiply(keys=["a"], args=[3]) == 6 + # Scripting is not supported in cluster pipelines + @pytest.mark.onlynoncluster def test_script_object_in_pipeline(self, r): multiply = r.register_script(multiply_script) precalculated_sha = multiply.sha @@ -142,6 +191,8 @@ def test_script_object_in_pipeline(self, r): assert pipe.execute() == [True, b"2", 6] assert r.script_exists(multiply.sha) == [True] + # Scripting is not supported in cluster pipelines + @pytest.mark.onlynoncluster def test_eval_msgpack_pipeline_error_in_lua(self, r): msgpack_hello = r.register_script(msgpack_hello_script) assert msgpack_hello.sha From bce24ed7a3064d07f818e3a2f908f9057022a605 Mon Sep 17 00:00:00 2001 From: Jake Barnwell Date: Mon, 14 Feb 2022 10:00:28 -0500 Subject: [PATCH 2/4] Fall back to connection_pool.get_encoder if necessary --- redis/commands/core.py | 20 +++++++++++++++++++- tests/test_scripting.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/redis/commands/core.py b/redis/commands/core.py index cc41969a2b..48e0707b78 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -4663,7 +4663,7 @@ def __init__(self, registered_client, script): if isinstance(script, str): # We need the encoding from the client in order to generate an # accurate byte representation of the script - encoder = registered_client.get_encoder() + encoder = self.get_encoder() script = encoder.encode(script) self.sha = hashlib.sha1(script).hexdigest() @@ -4687,6 +4687,24 @@ def __call__(self, keys=[], args=[], client=None): self.sha = client.script_load(self.script) return client.evalsha(self.sha, len(keys), *args) + def get_encoder(self): + """Get the encoder to encode string scripts into bytes.""" + try: + return self.registered_client.get_encoder() + except AttributeError: + # DEPRECATED + # In version <=4.1.2, this was the code we used to get the encoder. + # However, after 4.1.2 we added support for scripting in clustered + # redis. ClusteredRedis doesn't have a `.connection_pool` attribute + # so we changed the Script class to use + # `self.registered_client.get_encoder` (see above). + # However, that is technically a breaking change, as consumers who + # use Scripts directly might inject a `registered_client` that + # doesn't have a `.get_encoder` field. This try/except prevents us + # from breaking backward-compatibility. Ideally, it would be + # removed in the next major release. + return self.registered_client.connection_pool.get_encoder() + class BitFieldOperation: """ diff --git a/tests/test_scripting.py b/tests/test_scripting.py index bd08187fb6..d70727ed18 100644 --- a/tests/test_scripting.py +++ b/tests/test_scripting.py @@ -2,6 +2,7 @@ import redis from redis import exceptions +from redis.commands.core import Script from tests.conftest import skip_if_server_version_lt multiply_script = """ @@ -21,6 +22,39 @@ """ +class TestScript: + """ + We have a few tests to directly test the Script class. + + However, most of the behavioral tests are covered by `TestScripting`. + """ + + @pytest.fixture() + def script_str(self): + return "fake-script" + + @pytest.fixture() + def script_bytes(self): + return b"\xcf\x84o\xcf\x81\xce\xbdo\xcf\x82" + + def test_script_text(self, r, script_str, script_bytes): + assert Script(r, script_str).script == "fake-script" + assert Script(r, script_bytes).script == b"\xcf\x84o\xcf\x81\xce\xbdo\xcf\x82" + + def test_string_script_sha(self, r, script_str): + script = Script(r, script_str) + assert script.sha == "505e4245f0866b60552741b3cce9a0c3d3b66a87" + + def test_bytes_script_sha(self, r, script_bytes): + script = Script(r, script_bytes) + assert script.sha == "1329344e6bf995a35a8dc57ab1a6af8b2d54a763" + + def test_encoder(self, r, script_bytes): + encoder = Script(r, script_bytes).get_encoder() + assert encoder is not None + assert encoder.encode("fake-script") == b"fake-script" + + class TestScripting: @pytest.fixture(autouse=True) def reset_scripts(self, r): From b9787e9f3a44cda070950c36f98c0385e815a2f8 Mon Sep 17 00:00:00 2001 From: Jake Barnwell Date: Mon, 14 Feb 2022 14:17:51 -0500 Subject: [PATCH 3/4] Add documentation for cluster-based scripting --- README.md | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 867f921d99..0a212f2613 100644 --- a/README.md +++ b/README.md @@ -857,7 +857,8 @@ Monitor object to block until a command is received. redis-py supports the EVAL, EVALSHA, and SCRIPT commands. However, there are a number of edge cases that make these commands tedious to use in real world scenarios. Therefore, redis-py exposes a Script object that -makes scripting much easier to use. +makes scripting much easier to use. (RedisClusters have limited support for +scripting.) To create a Script instance, use the register_script function on a client instance passing the Lua code as the first @@ -950,7 +951,7 @@ C 3 ### Cluster Mode -redis-py is now supports cluster mode and provides a client for +redis-py now supports cluster mode and provides a client for [Redis Cluster](). The cluster client is based on Grokzen's @@ -958,6 +959,8 @@ The cluster client is based on Grokzen's fixes, and now supersedes that library. Support for these changes is thanks to his contributions. +To learn more about Redis Cluster, see +[Redis Cluster specifications](https://redis.io/topics/cluster-spec). **Create RedisCluster:** @@ -1213,10 +1216,29 @@ according to their respective destination nodes. This means that we can not turn the pipeline commands into one transaction block, because in most cases they are split up into several smaller pipelines. - -See [Redis Cluster tutorial](https://redis.io/topics/cluster-tutorial) and -[Redis Cluster specifications](https://redis.io/topics/cluster-spec) -to learn more about Redis Cluster. +**Lua Scripting in Cluster Mode** + +Cluster mode has limited support for lua scripting. + +The following commands are supported, with caveats: +- `EVAL` and `EVALSHA`: The command is sent to the relevant node, depending on +the keys (i.e., in `EVAL "