diff --git a/pyproject.toml b/pyproject.toml index e2cfe99..3c3b4ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ tox-poetry = "^0.3.0" bandit = "^1.7.0" pylint = "^2.8.2" vulture = "^2.3" +scikit-image = "==0.16.2" [tool.poetry.urls] url = "https://redisai.io" diff --git a/redisai/client.py b/redisai/client.py index 1411ad8..e2a6848 100644 --- a/redisai/client.py +++ b/redisai/client.py @@ -70,7 +70,12 @@ def pipeline(self, transaction: bool = True, shard_hint: bool = None) -> "Pipeli ) def dag( - self, load: Sequence = None, persist: Sequence = None, readonly: bool = False + self, + load: Sequence = None, + persist: Sequence = None, + routing: AnyStr = None, + timeout: int = None, + readonly: bool = False ) -> "Dag": """ It returns a DAG object on which other DAG-allowed operations can be called. For @@ -81,7 +86,16 @@ def dag( load : Union[AnyStr, List[AnyStr]] Load the list of given values from the keyspace to DAG scope persist : Union[AnyStr, List[AnyStr]] - Write the list of given key, values to the keyspace from DAG scope + For each tensor key in the given list, write its values to the keyspace from + DAG scope after the DAG execution is finished. + routing : AnyStr + Denotes a key to be used in the DAG or a tag that will assist in routing the dag + execution command to the right shard. Redis will verify that all potential key + accesses are done to within the target shard. + timeout : int + The max number on milisecinds that may pass before the request is prossced + (meaning that the result will not be computed after that time and TIMEDOUT + is returned in that case) readonly : bool If True, it triggers AI.DAGRUN_RO, the read only DAG which cannot write (PERSIST) to the keyspace. But since it can't write, it can execute on replicas @@ -105,9 +119,7 @@ def dag( >>> # You can even chain the operations >>> result = dag.tensorset(**akwargs).modelrun(**bkwargs).tensorget(**ckwargs).run() """ - return Dag( - load, persist, self.execute_command, readonly, self.enable_postprocess - ) + return Dag(load, persist, routing, timeout, self.execute_command, readonly) def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str: """ diff --git a/redisai/dag.py b/redisai/dag.py index 1746010..dcbf1d7 100644 --- a/redisai/dag.py +++ b/redisai/dag.py @@ -5,36 +5,57 @@ from redisai import command_builder as builder from redisai.postprocessor import Processor +from deprecated import deprecated +import warnings processor = Processor() class Dag: - def __init__(self, load, persist, executor, readonly=False, postprocess=True): + def __init__(self, load, persist, routing, timeout, executor, readonly=False): self.result_processors = [] - self.enable_postprocess = postprocess - if readonly: - if persist: - raise RuntimeError( - "READONLY requests cannot write (duh!) and should not " - "have PERSISTing values" - ) - self.commands = ["AI.DAGRUN_RO"] + self.enable_postprocess = True + self.deprecatedDagrunMode = load is None and persist is None and routing is None + self.readonly = readonly + self.executor = executor + + if readonly and persist: + raise RuntimeError( + "READONLY requests cannot write (duh!) and should not " + "have PERSISTing values" + ) + + if self.deprecatedDagrunMode: + # Throw warning about using deprecated dagrun + warnings.warn("Creating Dag without any of LOAD, PERSIST and ROUTING arguments" + "is allowed only in deprecated AI.DAGRUN or AI.DAGRUN_RO commands", DeprecationWarning) + # Use dagrun + if readonly: + self.commands = ["AI.DAGRUN_RO"] + else: + self.commands = ["AI.DAGRUN"] else: - self.commands = ["AI.DAGRUN"] - if load: + # Use dagexecute + if readonly: + self.commands = ["AI.DAGEXECUTE_RO"] + else: + self.commands = ["AI.DAGEXECUTE"] + if load is not None: if not isinstance(load, (list, tuple)): self.commands += ["LOAD", 1, load] else: self.commands += ["LOAD", len(load), *load] - if persist: + if persist is not None: if not isinstance(persist, (list, tuple)): - self.commands += ["PERSIST", 1, persist, "|>"] + self.commands += ["PERSIST", 1, persist] else: - self.commands += ["PERSIST", len(persist), *persist, "|>"] - else: - self.commands.append("|>") - self.executor = executor + self.commands += ["PERSIST", len(persist), *persist] + if routing is not None: + self.commands += ["ROUTING", routing] + if timeout is not None: + self.commands += ["TIMEOUT", timeout] + + self.commands.append("|>") def tensorset( self, @@ -69,20 +90,71 @@ def tensorget( ) return self + @deprecated(version="1.2.0", reason="Use modelexecute instead") def modelrun( + self, + key: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], + ) -> Any: + if self.deprecatedDagrunMode: + args = builder.modelrun(key, inputs, outputs) + self.commands.extend(args) + self.commands.append("|>") + self.result_processors.append(bytes.decode) + return self + else: + return self.modelexecute(key, inputs, outputs) + + def modelexecute( self, key: AnyStr, inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]], ) -> Any: - args = builder.modelrun(key, inputs, outputs) + if self.deprecatedDagrunMode: + raise RuntimeError( + "You are using deprecated version of DAG, that does not supports MODELEXECUTE." + "The new version requires giving at least one of LOAD, PERSIST and ROUTING" + "arguments when constructing the Dag" + ) + args = builder.modelexecute(key, inputs, outputs, None) self.commands.extend(args) self.commands.append("|>") self.result_processors.append(bytes.decode) return self + def scriptexecute( + self, + key: AnyStr, + function: str, + keys: Union[AnyStr, Sequence[AnyStr]] = None, + inputs: Union[AnyStr, Sequence[AnyStr]] = None, + args: Union[AnyStr, Sequence[AnyStr]] = None, + outputs: Union[AnyStr, List[AnyStr]] = None, + ) -> Any: + if self.readonly: + raise RuntimeError( + "AI.SCRIPTEXECUTE cannot be used in readonly mode" + ) + if self.deprecatedDagrunMode: + raise RuntimeError( + "You are using deprecated version of DAG, that does not supports SCRIPTEXECUTE." + "The new version requires giving at least one of LOAD, PERSIST and ROUTING" + "arguments when constructing the Dag" + ) + args = builder.scriptexecute(key, function, keys, inputs, args, outputs, None) + self.commands.extend(args) + self.commands.append("|>") + self.result_processors.append(bytes.decode) + return self + + @deprecated(version="1.2.0", reason="Use execute instead") def run(self): - commands = self.commands[:-1] # removing the last "|> + return self.execute() + + def execute(self): + commands = self.commands[:-1] # removing the last "|>" results = self.executor(*commands) if self.enable_postprocess: out = [] diff --git a/test/test.py b/test/test.py index 4690117..1787f18 100644 --- a/test/test.py +++ b/test/test.py @@ -1,7 +1,11 @@ import os.path import sys +import warnings + from io import StringIO from unittest import TestCase +from skimage.io import imread +from skimage.transform import resize import numpy as np from ml2rt import load_model @@ -9,9 +13,11 @@ from redisai import Client + DEBUG = False tf_graph = "graph.pb" torch_graph = "pt-minimal.pt" +dog_img = "dog.jpg" class Capturing(list): @@ -67,6 +73,15 @@ def func(tensors: List[Tensor], keys: List[str], args: List[str]): return b + a """ +data_processing_script = r""" +def pre_process_3ch(tensors: List[Tensor], keys: List[str], args: List[str]): + return tensors[0].float().div(255).unsqueeze(0) + +def post_process(tensors: List[Tensor], keys: List[str], args: List[str]): + # tf model has 1001 classes, hence negative 1 + return tensors[0].max(1)[1] - 1 +""" + class RedisAITestBase(TestCase): def setUp(self): @@ -599,6 +614,16 @@ def test_debug(self): self.assertEqual(["AI.TENSORSET x FLOAT 4 VALUES 2 3 4 5"], output) +def load_image(): + image_filename = os.path.join(MODEL_DIR, dog_img) + img_height, img_width = 224, 224 + + img = imread(image_filename) + img = resize(img, (img_height, img_width), mode='constant', anti_aliasing=True) + img = img.astype(np.uint8) + return img + + class DagTestCase(RedisAITestBase): def setUp(self): super().setUp() @@ -607,66 +632,158 @@ def setUp(self): ptmodel = load_model(model_path) con.modelstore("pt_model", "torch", "cpu", ptmodel, tag="v7.0") - def test_dagrun_with_load(self): + def test_deprecated_dugrun(self): con = self.get_client() - con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") - dag = con.dag(load="a") + # test the warning of using dagrun + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("default") + dag = con.dag() + self.assertTrue(issubclass(w[-1].category, DeprecationWarning)) + self.assertEqual(str(w[-1].message), + "Creating Dag without any of LOAD, PERSIST and ROUTING arguments" + "is allowed only in deprecated AI.DAGRUN or AI.DAGRUN_RO commands") + + # test that dagrun and model run hadn't been broken + dag.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") dag.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") + # can't use modelexecute or scriptexecute when using DAGRUN + with self.assertRaises(RuntimeError) as e: + dag.modelexecute("pt_model", ["a", "b"], ["output"]) + self.assertEqual(str(e.exception), + "You are using deprecated version of DAG, that does not supports MODELEXECUTE." + "The new version requires giving at least one of LOAD, PERSIST and ROUTING" + "arguments when constructing the Dag") + with self.assertRaises(RuntimeError) as e: + dag.scriptexecute("myscript{1}", "bar", inputs=["a{1}", "b{1}"], outputs=["c{1}"]) + self.assertEqual(str(e.exception), + "You are using deprecated version of DAG, that does not supports SCRIPTEXECUTE." + "The new version requires giving at least one of LOAD, PERSIST and ROUTING" + "arguments when constructing the Dag") dag.modelrun("pt_model", ["a", "b"], ["output"]) dag.tensorget("output") result = dag.run() + expected = [ + "OK", + "OK", + "OK", + np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32), + ] + self.assertTrue(np.allclose(expected.pop(), result.pop())) + self.assertEqual(expected, result) + + def test_deprecated_modelrun_and_run(self): + # use modelrun&run method but perform modelexecute&dagexecute behind the scene + con = self.get_client() + + con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") + con.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") + dag = con.dag(load=["a", "b"], persist="output") + dag.modelrun("pt_model", ["a", "b"], ["output"]) + dag.tensorget("output") + result = dag.run() + expected = ["OK", np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] + result_outside_dag = con.tensorget("output") + self.assertTrue(np.allclose(expected.pop(), result.pop())) + result = dag.run() + self.assertTrue(np.allclose(result_outside_dag, result.pop())) + self.assertEqual(expected, result) + + def test_dagexecute_with_scriptexecute_redis_commands(self): + con = self.get_client() + con.scriptstore("myscript{1}", "cpu", script_with_redis_commands, "func") + dag = con.dag(persist='my_output{1}', routing='{1}') + dag.tensorset("mytensor1{1}", [40], dtype="float") + dag.tensorset("mytensor2{1}", [10], dtype="float") + dag.tensorset("mytensor3{1}", [1], dtype="float") + dag.scriptexecute("myscript{1}", "func", + keys=["key{1}"], + inputs=["mytensor1{1}", "mytensor2{1}", "mytensor3{1}"], + args=["3"], + outputs=["my_output{1}"]) + dag.execute() + values = con.tensorget("my_output{1}", as_numpy=False) + self.assertTrue(np.allclose(values["values"], [54])) + + def test_dagexecute_modelexecute_with_scriptexecute(self): + con = self.get_client() + script_name = 'imagenet_script:{1}' + model_name = 'imagenet_model:{1}' + + img = load_image() + model_path = os.path.join(MODEL_DIR, "resnet50.pb") + model = load_model(model_path) + con.scriptstore(script_name, 'cpu', data_processing_script, entry_points=['post_process', 'pre_process_3ch']) + con.modelstore(model_name, 'TF', 'cpu', model, inputs='images', outputs='output') + + dag = con.dag(persist='output:{1}') + dag.tensorset('image:{1}', tensor=img, shape=(img.shape[1], img.shape[0]), dtype='UINT8') + dag.scriptexecute(script_name, 'pre_process_3ch', inputs='image:{1}', outputs='temp_key1') + dag.modelexecute(model_name, inputs='temp_key1', outputs='temp_key2') + dag.scriptexecute(script_name, 'post_process', inputs='temp_key2', outputs='output:{1}') + ret = dag.execute() + self.assertEqual(['OK', 'OK', 'OK', 'OK'], ret) + + def test_dagexecute_with_load(self): + con = self.get_client() + con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") + + dag = con.dag(load="a") + dag.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") + dag.modelexecute("pt_model", ["a", "b"], ["output"]) + dag.tensorget("output") + result = dag.execute() expected = ["OK", "OK", np.array( [[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] self.assertTrue(np.allclose(expected.pop(), result.pop())) self.assertEqual(expected, result) self.assertRaises(ResponseError, con.tensorget, "b") - def test_dagrun_with_persist(self): + def test_dagexecute_with_persist(self): con = self.get_client() with self.assertRaises(ResponseError): dag = con.dag(persist="wrongkey") - dag.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float").run() + dag.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float").execute() dag = con.dag(persist=["b"]) dag.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") dag.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") dag.tensorget("b") - result = dag.run() + result = dag.execute() b = con.tensorget("b") self.assertTrue(np.allclose(b, result[-1])) self.assertEqual(b.dtype, np.float32) self.assertEqual(len(result), 3) - def test_dagrun_calling_on_return(self): + def test_dagexecute_calling_on_return(self): con = self.get_client() con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") result = ( con.dag(load="a") .tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") - .modelrun("pt_model", ["a", "b"], ["output"]) + .modelexecute("pt_model", ["a", "b"], ["output"]) .tensorget("output") - .run() + .execute() ) expected = ["OK", "OK", np.array( [[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] self.assertTrue(np.allclose(expected.pop(), result.pop())) self.assertEqual(expected, result) - def test_dagrun_without_load_and_persist(self): + def test_dagexecute_without_load_and_persist(self): con = self.get_client() - dag = con.dag(load="wrongkey") - with self.assertRaises(ResponseError): - dag.tensorget("wrongkey").run() + with self.assertRaises(ResponseError) as e: + dag.tensorget("wrongkey").execute() + self.assertEqual(str(e.exception), "tensor key is empty or in a different shard") - dag = con.dag() + dag = con.dag(persist="output") dag.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") dag.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") - dag.modelrun("pt_model", ["a", "b"], ["output"]) + dag.modelexecute("pt_model", ["a", "b"], ["output"]) dag.tensorget("output") - result = dag.run() + result = dag.execute() expected = [ "OK", "OK", @@ -676,31 +793,36 @@ def test_dagrun_without_load_and_persist(self): self.assertTrue(np.allclose(expected.pop(), result.pop())) self.assertEqual(expected, result) - def test_dagrun_with_load_and_persist(self): + def test_dagexecute_with_load_and_persist(self): con = self.get_client() con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") con.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") dag = con.dag(load=["a", "b"], persist="output") - dag.modelrun("pt_model", ["a", "b"], ["output"]) + dag.modelexecute("pt_model", ["a", "b"], ["output"]) dag.tensorget("output") - result = dag.run() + result = dag.execute() expected = ["OK", np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] result_outside_dag = con.tensorget("output") self.assertTrue(np.allclose(expected.pop(), result.pop())) - result = dag.run() + result = dag.execute() self.assertTrue(np.allclose(result_outside_dag, result.pop())) self.assertEqual(expected, result) - def test_dagrunRO(self): + def test_dagexecuteRO(self): con = self.get_client() con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") con.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") with self.assertRaises(RuntimeError): con.dag(load=["a", "b"], persist="output", readonly=True) dag = con.dag(load=["a", "b"], readonly=True) - dag.modelrun("pt_model", ["a", "b"], ["output"]) + + with self.assertRaises(RuntimeError) as e: + dag.scriptexecute("myscript{1}", "bar", inputs=["a{1}", "b{1}"], outputs=["c{1}"]) + self.assertEqual(str(e.exception), "AI.SCRIPTEXECUTE cannot be used in readonly mode") + + dag.modelexecute("pt_model", ["a", "b"], ["output"]) dag.tensorget("output") - result = dag.run() + result = dag.execute() expected = ["OK", np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] self.assertTrue(np.allclose(expected.pop(), result.pop())) diff --git a/test/testdata/dog.jpg b/test/testdata/dog.jpg new file mode 100644 index 0000000..f100a88 Binary files /dev/null and b/test/testdata/dog.jpg differ diff --git a/test/testdata/resnet50.pb b/test/testdata/resnet50.pb new file mode 100644 index 0000000..cead792 Binary files /dev/null and b/test/testdata/resnet50.pb differ