Skip to content

Add dagexecute support #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
546aa47
Add the support in the new commands: AI.MODELEXECUTE (AI.MODELRUN is …
alonre24 Apr 26, 2021
d6f5b95
Merge with master
alonre24 Apr 26, 2021
722dfe8
merge with master
alonre24 Apr 26, 2021
05d0b71
Add deprecated to requirements
alonre24 Apr 26, 2021
5cc53b8
Update docker in CI from redisai/redisai to redislabs/redisai
alonre24 Apr 26, 2021
d16bc6d
fix redisai:edge to redisai:edge-cpu in CI
alonre24 Apr 26, 2021
08596b8
Merge branch 'master' into Add_modelstore_and_modelexecute_commands
alonre24 May 12, 2021
8793fc8
Formatting
alonre24 May 12, 2021
f1ed31a
Add formatting transformers
alonre24 May 12, 2021
fbe93d2
Remove transformers, format locally with "black"
alonre24 May 12, 2021
7431087
Merge branch 'master' into Add_modelstore_and_modelexecute_commands
alonre24 May 12, 2021
f98dfdb
More formatting locally
alonre24 May 12, 2021
5448fc3
More formatting locally (to long lines)
alonre24 May 12, 2021
37bfd9a
add support
AvitalFineRedis May 18, 2021
ba1b229
add tests
AvitalFineRedis May 18, 2021
892a42f
add tests
AvitalFineRedis May 20, 2021
f793218
merge
AvitalFineRedis May 21, 2021
f6bec38
Update postprocessor.py
AvitalFineRedis May 21, 2021
78a30f7
Update utils.py
AvitalFineRedis May 21, 2021
e40fae3
Update setup.py
AvitalFineRedis May 21, 2021
9595e72
Update client.py
AvitalFineRedis May 21, 2021
985d9a8
fix test.py
AvitalFineRedis May 23, 2021
0f7c093
Merge remote-tracking branch 'origin/Add_DAGEXECUTE_support' into Add…
AvitalFineRedis May 23, 2021
e1572a2
change from "run" to "execute"
AvitalFineRedis May 23, 2021
cd83c2d
support modelrun and run in Dag
AvitalFineRedis May 23, 2021
d008db9
support modelrun and run in Dag
AvitalFineRedis May 23, 2021
ebce97d
add scriptexecute support with test
AvitalFineRedis May 23, 2021
f17bae5
Merge remote-tracking branch 'origin/Add_DAGEXECUTE_support' into Add…
AvitalFineRedis May 23, 2021
9a4415c
Update dag.py
AvitalFineRedis May 23, 2021
30ab3c7
merge master
AvitalFineRedis Jul 18, 2021
c85e6e1
fix Dag API
AvitalFineRedis Jul 18, 2021
5af89c4
fix tests
AvitalFineRedis Jul 18, 2021
29a0bb5
fix tests
AvitalFineRedis Jul 18, 2021
195dc4a
Merge remote-tracking branch 'origin/Add_DAGEXECUTE_support' into Add…
AvitalFineRedis Jul 18, 2021
398a849
fix tests
AvitalFineRedis Jul 18, 2021
2afd793
Merge remote-tracking branch 'origin/Add_DAGEXECUTE_support' into Add…
AvitalFineRedis Jul 18, 2021
49c1812
format fixes
AvitalFineRedis Jul 18, 2021
0588013
last fix I hope
AvitalFineRedis Jul 19, 2021
2bf04a3
fix review comments
AvitalFineRedis Jul 19, 2021
5fdb65e
restore dagrun and dagrun_ro + tests
AvitalFineRedis Jul 19, 2021
b1b1a19
typo
AvitalFineRedis Jul 19, 2021
296dcc0
typo
AvitalFineRedis Jul 19, 2021
a7a96dd
try add skimage to dependencies
AvitalFineRedis Jul 19, 2021
af07b5f
try add skimage to dependencies
AvitalFineRedis Jul 19, 2021
22405ef
Merge remote-tracking branch 'origin/Add_DAGEXECUTE_support' into Add…
AvitalFineRedis Jul 19, 2021
b2d8abb
try add skimage to dependencies
AvitalFineRedis Jul 19, 2021
77696fb
try add skimage to dependencies
AvitalFineRedis Jul 19, 2021
1ec8036
add data files
AvitalFineRedis Jul 19, 2021
8764acd
fix error
AvitalFineRedis Jul 19, 2021
2cc477b
fix error
AvitalFineRedis Jul 19, 2021
199db94
check w
AvitalFineRedis Jul 19, 2021
678c735
fix warning catching
AvitalFineRedis Jul 19, 2021
faf1810
change error and warning messages
AvitalFineRedis Jul 20, 2021
3a75397
add test for scriptexecute with redis commands
AvitalFineRedis Jul 21, 2021
d56d969
error message
AvitalFineRedis Jul 21, 2021
c934940
remove redundant function in script
AvitalFineRedis Jul 21, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tox-poetry = "^0.3.0"
bandit = "^1.7.0"
pylint = "^2.8.2"
vulture = "^2.3"
scikit-image = "==0.16.2"

[tool.poetry.urls]
url = "https://redisai.io"
Expand Down
22 changes: 17 additions & 5 deletions redisai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ def pipeline(self, transaction: bool = True, shard_hint: bool = None) -> "Pipeli
)

def dag(
self, load: Sequence = None, persist: Sequence = None, readonly: bool = False
self,
load: Sequence = None,
persist: Sequence = None,
routing: AnyStr = None,
timeout: int = None,
readonly: bool = False
) -> "Dag":
"""
It returns a DAG object on which other DAG-allowed operations can be called. For
Expand All @@ -81,7 +86,16 @@ def dag(
load : Union[AnyStr, List[AnyStr]]
Load the list of given values from the keyspace to DAG scope
persist : Union[AnyStr, List[AnyStr]]
Write the list of given key, values to the keyspace from DAG scope
For each tensor key in the given list, write its values to the keyspace from
DAG scope after the DAG execution is finished.
routing : AnyStr
Denotes a key to be used in the DAG or a tag that will assist in routing the dag
execution command to the right shard. Redis will verify that all potential key
accesses are done to within the target shard.
timeout : int
The max number on milisecinds that may pass before the request is prossced
(meaning that the result will not be computed after that time and TIMEDOUT
is returned in that case)
readonly : bool
If True, it triggers AI.DAGRUN_RO, the read only DAG which cannot write (PERSIST) to
the keyspace. But since it can't write, it can execute on replicas
Expand All @@ -105,9 +119,7 @@ def dag(
>>> # You can even chain the operations
>>> result = dag.tensorset(**akwargs).modelrun(**bkwargs).tensorget(**ckwargs).run()
"""
return Dag(
load, persist, self.execute_command, readonly, self.enable_postprocess
)
return Dag(load, persist, routing, timeout, self.execute_command, readonly)

def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str:
"""
Expand Down
110 changes: 91 additions & 19 deletions redisai/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,57 @@

from redisai import command_builder as builder
from redisai.postprocessor import Processor
from deprecated import deprecated
import warnings

processor = Processor()


class Dag:
def __init__(self, load, persist, executor, readonly=False, postprocess=True):
def __init__(self, load, persist, routing, timeout, executor, readonly=False):
self.result_processors = []
self.enable_postprocess = postprocess
if readonly:
if persist:
raise RuntimeError(
"READONLY requests cannot write (duh!) and should not "
"have PERSISTing values"
)
self.commands = ["AI.DAGRUN_RO"]
self.enable_postprocess = True
self.deprecatedDagrunMode = load is None and persist is None and routing is None
self.readonly = readonly
self.executor = executor

if readonly and persist:
raise RuntimeError(
"READONLY requests cannot write (duh!) and should not "
"have PERSISTing values"
)

if self.deprecatedDagrunMode:
# Throw warning about using deprecated dagrun
warnings.warn("Creating Dag without any of LOAD, PERSIST and ROUTING arguments"
"is allowed only in deprecated AI.DAGRUN or AI.DAGRUN_RO commands", DeprecationWarning)
# Use dagrun
if readonly:
self.commands = ["AI.DAGRUN_RO"]
else:
self.commands = ["AI.DAGRUN"]
else:
self.commands = ["AI.DAGRUN"]
if load:
# Use dagexecute
if readonly:
self.commands = ["AI.DAGEXECUTE_RO"]
else:
self.commands = ["AI.DAGEXECUTE"]
if load is not None:
if not isinstance(load, (list, tuple)):
self.commands += ["LOAD", 1, load]
else:
self.commands += ["LOAD", len(load), *load]
if persist:
if persist is not None:
if not isinstance(persist, (list, tuple)):
self.commands += ["PERSIST", 1, persist, "|>"]
self.commands += ["PERSIST", 1, persist]
else:
self.commands += ["PERSIST", len(persist), *persist, "|>"]
else:
self.commands.append("|>")
self.executor = executor
self.commands += ["PERSIST", len(persist), *persist]
if routing is not None:
self.commands += ["ROUTING", routing]
if timeout is not None:
self.commands += ["TIMEOUT", timeout]

self.commands.append("|>")

def tensorset(
self,
Expand Down Expand Up @@ -69,20 +90,71 @@ def tensorget(
)
return self

@deprecated(version="1.2.0", reason="Use modelexecute instead")
def modelrun(
self,
key: AnyStr,
inputs: Union[AnyStr, List[AnyStr]],
outputs: Union[AnyStr, List[AnyStr]],
) -> Any:
if self.deprecatedDagrunMode:
args = builder.modelrun(key, inputs, outputs)
self.commands.extend(args)
self.commands.append("|>")
self.result_processors.append(bytes.decode)
return self
else:
return self.modelexecute(key, inputs, outputs)

def modelexecute(
self,
key: AnyStr,
inputs: Union[AnyStr, List[AnyStr]],
outputs: Union[AnyStr, List[AnyStr]],
) -> Any:
args = builder.modelrun(key, inputs, outputs)
if self.deprecatedDagrunMode:
raise RuntimeError(
"You are using deprecated version of DAG, that does not supports MODELEXECUTE."
"The new version requires giving at least one of LOAD, PERSIST and ROUTING"
"arguments when constructing the Dag"
)
args = builder.modelexecute(key, inputs, outputs, None)
self.commands.extend(args)
self.commands.append("|>")
self.result_processors.append(bytes.decode)
return self

def scriptexecute(
self,
key: AnyStr,
function: str,
keys: Union[AnyStr, Sequence[AnyStr]] = None,
inputs: Union[AnyStr, Sequence[AnyStr]] = None,
args: Union[AnyStr, Sequence[AnyStr]] = None,
outputs: Union[AnyStr, List[AnyStr]] = None,
) -> Any:
if self.readonly:
raise RuntimeError(
"AI.SCRIPTEXECUTE cannot be used in readonly mode"
)
if self.deprecatedDagrunMode:
raise RuntimeError(
"You are using deprecated version of DAG, that does not supports SCRIPTEXECUTE."
"The new version requires giving at least one of LOAD, PERSIST and ROUTING"
"arguments when constructing the Dag"
)
args = builder.scriptexecute(key, function, keys, inputs, args, outputs, None)
self.commands.extend(args)
self.commands.append("|>")
self.result_processors.append(bytes.decode)
return self

@deprecated(version="1.2.0", reason="Use execute instead")
def run(self):
commands = self.commands[:-1] # removing the last "|>
return self.execute()

def execute(self):
commands = self.commands[:-1] # removing the last "|>"
results = self.executor(*commands)
if self.enable_postprocess:
out = []
Expand Down
Loading