Skip to content

Introduce AI.DAGEXECUTE command, AI.DAGRUN is now deprecated. #723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
fecf3ab
Introduce AI.DAGEXECUTE command, AI.DAGRUN is now deprecated.
alonre24 May 6, 2021
274b7bf
Move ParseKeys to parse_utils and use it from both script and DAG par…
alonre24 May 7, 2021
7925071
Move ParseKeys to parse_utils and use it from both script and DAG par…
alonre24 May 8, 2021
108f662
PR fixes + added documentation.
alonre24 May 8, 2021
4f7a1d2
Merge branch 'DAG_execute_command' of https://github.com/RedisAI/Redi…
alonre24 May 8, 2021
ca08d21
Refactor DAG tests, separate tests scripts to another dir.
alonre24 May 9, 2021
f7efb27
Small fix - refactor error message for invalid DAG command.
alonre24 May 9, 2021
ac0eb8c
Fix paths to test_requirments.txt
alonre24 May 9, 2021
370008a
Fix typo
alonre24 May 9, 2021
9dd3775
- Fix bug in "report_keys" func for AI.DAGEXECUTE + add tests.
alonre24 May 10, 2021
3772631
linter
alonre24 May 10, 2021
2df75de
Merge branch 'master' into DAG_execute_command
alonre24 May 10, 2021
3cbfd9e
Fix for building docker gpu-test
alonre24 May 10, 2021
b4a31a2
Merge branch 'DAG_execute_command' of https://github.com/RedisAI/Redi…
alonre24 May 10, 2021
d4d4277
Fix for building docker gpu-test (take 2)
alonre24 May 10, 2021
112649b
Fix valgrind (in dag parsing error)
alonre24 May 10, 2021
c78efe4
Merge branch 'master' into DAG_execute_command
alonre24 May 10, 2021
71b49d2
Patch an error caused by the valgrind fix in dag parsing error.
alonre24 May 10, 2021
8bdbc93
Add ai.modelstore to RAMP.
alonre24 May 10, 2021
2e8ba43
Fix leak in DAG LLAPI
alonre24 May 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ WORKDIR /build
COPY --from=redis /usr/local/ /usr/local/

COPY ./opt/ opt/
COPY ./tests/flow/test_requirements.txt tests/flow/
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow/

RUN FORCE=1 ./opt/readies/bin/getpy3
RUN ./opt/system-setup.py
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.arm
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ WORKDIR /build
COPY --from=redis /usr/local/ /usr/local/

COPY ./opt/ opt/
COPY ./tests/flow/test_requirements.txt tests/flow
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow

RUN ./opt/readies/bin/getpy3
RUN ./opt/system-setup.py
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.gpu
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ COPY --from=redis /usr/local/ /usr/local/
RUN echo export LD_LIBRARY_PATH=/usr/local/cuda/lib64:/usr/local/cuda-11.0/lib64:$LD_LIBRARY_PATH > /etc/profile.d/cuda.sh

COPY ./opt/ opt/
COPY ./tests/flow/test_requirements.txt tests/flow/
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow/

RUN FORCE=1 ./opt/readies/bin/getpy3
RUN ./opt/system-setup.py
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.gpu-test
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ COPY --from=redis /usr/local/ /usr/local/
RUN echo export LD_LIBRARY_PATH=/usr/local/cuda/lib64:/usr/local/cuda-11.0/lib64:$LD_LIBRARY_PATH > /etc/profile.d/cuda.sh

COPY ./opt/ opt/
COPY ./tests/flow/test_requirements.txt tests/flow/
COPY ./tests/flow/Install_RedisGears.sh tests/flow/
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow/tests_setup/
COPY ./tests/flow/tests_setup/Install_RedisGears.sh tests/flow/tests_setup/

RUN VENV=venv FORCE=1 ./opt/readies/bin/getpy3

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.jetson
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ WORKDIR /build
COPY --from=redis /usr/local/ /usr/local/

COPY ./opt/ opt/
COPY ./tests/flow/test_requirements.txt tests/flow/
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow/

RUN FORCE=1 ./opt/readies/bin/getpy3
RUN ./opt/system-setup.py
Expand Down
107 changes: 105 additions & 2 deletions docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ OK

## AI.MODELRUN

_This command is deprecated and will not be available in future versions. consider using AI.MODELEXECUTE command instead._
_This command is deprecated and will not be available in future versions. consider using `AI.MODELEXECUTE` command instead._

The **`AI.MODELRUN`** command runs a model stored as a key's value using its specified backend and device. It accepts one or more input tensors and store output tensors.

Expand Down Expand Up @@ -730,7 +730,110 @@ redis> > AI._SCRIPTSCAN
2) "myscript:v0.1"
```

## AI.DAGEXECUTE
The **`AI.DAGEXECUTE`** command specifies a direct acyclic graph of operations to run within RedisAI.

It accepts one or more operations, split by the pipe-forward operator (`|>`).

By default, the DAG execution context is local, meaning that tensor keys appearing in the DAG only live in the scope of the command. That is, setting a tensor with `TENSORSET` will store it local memory and not set it to an actual database key. One can refer to that key in subsequent commands within the DAG, but that key won't be visible outside the DAG or to other clients - no keys are open at the database level.

Loading and persisting tensors from/to keyspace should be done explicitly. The user should specify which key tensors to load from keyspace using the `LOAD` keyword, and which command outputs to persist to the keyspace using the `PERSIST` keyspace. The user can also specify keys in Redis that are going to be accessed for read/write operations (for example, from within `AI.SCRIPTEXECUTE` command), by using the keyword `KEYS`.

As an example, if `command 1` sets a tensor, it can be referenced by any further command on the chaining.

A `TIMEOUT t` argument can be specified to cause a request to be removed from the queue after it sits there `t` milliseconds, meaning that the client won't be interested in the result being computed after that time (`TIMEDOUT` is returned in that case). Note that individual `MODELEXECUTE` or `SCRIPTEXECUTE` commands within the DAG do not support `TIMEOUT`. `TIMEOUT` only applies to the `DAGEXECUTE` request as a whole.


**Redis API**

```
AI.DAGEXECUTE [[LOAD <n> <key-1> <key-2> ... <key-n>] |
[PERSIST <n> <key-1> <key-2> ... <key-n>] |
[KEYS <n> <key-1> <key-2> ... <key-n>]]+
[TIMEOUT t]
|> <command> [|> command ...]
```

_Arguments_

* **LOAD**: denotes the beginning of the input tensors keys' list, followed by the number of keys, and one or more key names
* **PERSIST**: denotes the beginning of the output tensors keys' list, followed by the number of keys, and one or more key names
* **KEYS**: denotes the beginning of keys' list which are used within this command, followed by the number of keys, and one or more key names. Alternately, the keys names list can be replaced with a tag which all of those keys share. Redis will verify that all potential key accesses are done to the right shard.

_While each of the LOAD, PERSIST and KEYS sections are optional (and may appear at most once in the command), the command must contain **at least one** of these 3 keywords._
* **TIMEOUT**: an optional argument, denotes the time (in ms) after which the client is unblocked and a `TIMEDOUT` string is returned
* **|> command**: the chaining operator, that denotes the beginning of a RedisAI command, followed by one of RedisAI's commands. Command splitting is done by the presence of another `|>`. The supported commands are:
* `AI.TENSORSET`
* `AI.TENSORGET`
* `AI.MODELEXECUTE`
* `AI.SCRIPTEXECUTE`


`AI.MODELEXECUTE` and `AI.SCRIPTEXECUTE` commands can run on models or scripts that were set on different devices. RedisAI will analyze the DAG and execute commands in parallel if they are located on different devices and their inputs are available.
Note that KEYS should not be specified in `AI.SCRIPTEXECUTE` commands of the DAG.

_Return_

An array with an entry per command's reply. Each entry format respects the specified command reply.
In case the `DAGEXEUTE` request times out, a `TIMEDOUT` simple string is returned.

**Examples**

Assuming that running the model that's stored at 'mymodel', we define a temporary tensor 'mytensor' and use it as input, and persist only one of the two outputs - discarding 'classes' and persisting 'predictions'. In the same command return the tensor value of 'predictions'. The following command does that:


```
redis> AI.DAGEXECUTE PERSIST 1 predictions{tag} |>
AI.TENSORSET mytensor FLOAT 1 2 VALUES 5 10 |>
AI.MODELEXECUTE mymodel{tag} INPUTS 1 mytensor OUTPUTS 2 classes predictions{tag} |>
AI.TENSORGET predictions{tag} VALUES
1) OK
2) OK
3) 1) FLOAT
2) 1) (integer) 2
2) (integer) 2
3) "\x00\x00\x80?\x00\x00\x00@\x00\x00@@\x00\x00\x80@"
```

A common pattern is enqueuing multiple SCRIPTEXECUTE and MODELEXECUTE commands within a DAG. The following example uses ResNet-50,to classify images into 1000 object categories. Given that our input tensor contains each color represented as a 8-bit integer and that neural networks usually work with floating-point tensors as their input we need to cast a tensor to floating-point and normalize the values of the pixels - for that we will use `pre_process_3ch` function.

To optimize the classification process we can use a post process script to return only the category position with the maximum classification - for that we will use `post_process` script. Using the DAG capabilities we've removed the necessity of storing the intermediate tensors in the keyspace. You can even run the entire process without storing the output tensor, as follows:

```
redis> AI.DAGEXECUTE KEYS 1 {tag} |>
AI.TENSORSET image UINT8 224 224 3 BLOB b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00....' |>
AI.SCRIPTEXECUTE imagenet_script{tag} pre_process_3ch INPUTS 1 image OUTPUTS 1 temp_key1 |>
AI.MODELEXECUTE imagenet_model{tag} INPUTS 1 temp_key1 OUTPUTS 1 temp_key2 |>
AI.SCRIPTEXECUTE imagenet_script{tag} post_process INPUTS 1 temp_key2 OUTPUTS 1 output |>
AI.TENSORGET output VALUES
1) OK
2) OK
3) OK
4) OK
5) 1) 1) (integer) 111
```

As visible on the array reply, the label position with higher classification was 111.

By combining DAG with multiple SCRIPTEXECUTE and MODELEXECUTE commands we've substantially removed the overall required bandwith and network RX ( we're now returning a tensor with 1000 times less elements per classification ).



!!! warning "Intermediate memory overhead"
The execution of models and scripts within the DAG may generate intermediate tensors that are not allocated by the Redis allocator, but by whatever allocator is used in the backends (which may act on main memory or GPU memory, depending on the device), thus not being limited by `maxmemory` configuration settings of Redis.

## AI.DAGEXECUTE_RO

The **`AI.DAGEXEUTE_RO`** command is a read-only variant of `AI.DAGEXECUTE`.
`AI.DAGEXECUTE` is flagged as a 'write' command in the Redis command table (as it provides the `PERSIST` option, for example). Hence, read-only cluster replicas will refuse to run the command and it will be redirected to the master even if the connection is using read-only mode.

`AI.DAGEXECUTE_RO` behaves exactly like the original command, excluding the `PERSIST` option and `AI.SCRIPTEXECUTE` commands. It is a read-only command that can safely be with read-only replicas.

!!! info "Further reference"
Refer to the Redis [`READONLY` command](https://redis.io/commands/readonly) for further information about read-only cluster replicas.

## AI.DAGRUN
_This command is deprecated and will not be available in future versions. consider using `AI.DAGEXECUTE` command instead._
The **`AI.DAGRUN`** command specifies a direct acyclic graph of operations to run within RedisAI.

It accepts one or more operations, split by the pipe-forward operator (`|>`).
Expand Down Expand Up @@ -817,7 +920,7 @@ By combining DAG with multiple SCRIPTRUN and MODELRUN commands we've substantial
The execution of models and scripts within the DAG may generate intermediate tensors that are not allocated by the Redis allocator, but by whatever allocator is used in the backends (which may act on main memory or GPU memory, depending on the device), thus not being limited by `maxmemory` configuration settings of Redis.

## AI.DAGRUN_RO

_This command is deprecated and will not be available in future versions. consider using `AI.DAGEXECUTE_RO` command instead._
The **`AI.DAGRUN_RO`** command is a read-only variant of `AI.DAGRUN`.

Because `AI.DAGRUN` provides the `PERSIST` option it is flagged as a 'write' command in the Redis command table. However, even when `PERSIST` isn't used, read-only cluster replicas will refuse to run the command and it will be redirected to the master even if the connection is using read-only mode.
Expand Down
2 changes: 1 addition & 1 deletion docs/developer.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ To run all tests in a Python virtualenv, follow these steps:
$ mkdir -p .env
$ virtualenv .env
$ source .env/bin/activate
$ pip install -r tests/flow/test_requirements.txt
$ pip install -r tests/flow/tests_setup/test_requirements.txt
$ make -C opt test

**Integration tests**
Expand Down
10 changes: 5 additions & 5 deletions opt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ MODULE=$(realpath $(INSTALLED_TARGET)) \
CLUSTER=$(CLUSTER) \
GEN=$(GEN) AOF=$(AOF) SLAVES=$(SLAVES) \
VALGRIND=$(VALGRIND) \
$(ROOT)/tests/flow/tests.sh
$(ROOT)/tests/flow/tests_setup/tests.sh
endef

unit_tests: build
Expand All @@ -265,7 +265,7 @@ flow_tests: build
GEN=$(GEN) AOF=$(AOF) SLAVES=$(SLAVES) \
VALGRIND=$(VALGRIND) \
REDIS=$(REDIS) \
$(ROOT)/tests/flow/tests.sh
$(ROOT)/tests/flow/tests_setup/tests.sh

$(COVERAGE_COLLECT_REPORT)

Expand All @@ -283,16 +283,16 @@ test: build
GEN=$(GEN) AOF=$(AOF) SLAVES=$(SLAVES) \
VALGRIND=$(VALGRIND) \
REDIS=$(REDIS) \
$(ROOT)/tests/flow/tests.sh
$(ROOT)/tests/flow/tests_setup/tests.sh
$(COVERAGE_COLLECT_REPORT)

#----------------------------------------------------------------------------------------------

valgrind:
$(SHOW)$(ROOT)/tests/flow/valgrind.sh $(realpath $(INSTALLED_TARGET))
$(SHOW)$(ROOT)/tests/flow/tests_setup/valgrind.sh $(realpath $(INSTALLED_TARGET))

callgrind:
$(SHOW)CALLGRIND=1 $(ROOT)/tests/flow/valgrind.sh $(realpath $(INSTALLED_TARGET))
$(SHOW)CALLGRIND=1 $(ROOT)/tests/flow/tests_setup/valgrind.sh $(realpath $(INSTALLED_TARGET))

#----------------------------------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion opt/system-setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def common_last(self):

self.run("{PYTHON} {READIES}/bin/getrmpytools".format(PYTHON=self.python, READIES=READIES))

self.pip_install("-r %s/tests/flow/test_requirements.txt" % ROOT)
self.pip_install("-r %s/tests/flow/tests_setup/test_requirements.txt" % ROOT)

self.pip_install("awscli")
self.pip_install("mkdocs mkdocs-material mkdocs-extensions")
Expand Down
1 change: 1 addition & 0 deletions ramp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ capabilities:
- backup_restore
- intershard_tls
exclude_commands:
- ai.modelstore
- ai.modelset
- ai.modeldel
- ai.scriptset
Expand Down
39 changes: 25 additions & 14 deletions src/execution/DAG/dag.c
Original file line number Diff line number Diff line change
Expand Up @@ -668,25 +668,36 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
return REDISMODULE_OK;
}

int RedisAI_DagRun_IsKeysPositionRequest_ReportKeys(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {
for (size_t argpos = 1; argpos < argc; argpos++) {
const char *arg_string = RedisModule_StringPtrLen(argv[argpos], NULL);
if ((!strcasecmp(arg_string, "LOAD") || !strcasecmp(arg_string, "PERSIST")) &&
(argpos + 1 < argc)) {
int RedisAI_DagExecute_IsKeysPositionRequest_ReportKeys(RedisModuleCtx *ctx,
RedisModuleString **argv, int argc) {

size_t argpos = 1;
while (argpos < argc) {
const char *arg_string = RedisModule_StringPtrLen(argv[argpos++], NULL);
if (!strcasecmp(arg_string, "LOAD") || !strcasecmp(arg_string, "PERSIST") ||
!strcasecmp(arg_string, "KEYS")) {
if (argpos >= argc) {
return REDISMODULE_ERR;
}
long long n_keys;
argpos++;
const int retval = RedisModule_StringToLongLong(argv[argpos], &n_keys);
const int retval = RedisModule_StringToLongLong(argv[argpos++], &n_keys);
if (retval != REDISMODULE_OK) {
return REDISMODULE_ERR;
}
argpos++;
if (n_keys > 0) {
size_t last_persist_argpos = n_keys + argpos;
for (; argpos < last_persist_argpos && argpos < argc; argpos++) {
RedisModule_KeyAtPos(ctx, argpos);
}
size_t last_argpos = n_keys + argpos;
if (last_argpos >= argc) {
return REDISMODULE_ERR;
}
for (; argpos < last_argpos; argpos++) {
RedisModule_KeyAtPos(ctx, argpos);
}
} else if (!strcasecmp(arg_string, "AI.MODELEXECUTE") ||
!strcasecmp(arg_string, "AI.SCRIPTEXECUTE")) {
if (argpos >= argc) {
return REDISMODULE_ERR;
}
// After every AI.MODEL/SCRIPTEXECUTE arg comes the model/script key.
RedisModule_KeyAtPos(ctx, argpos++);
}
}
return REDISMODULE_OK;
Expand Down
4 changes: 2 additions & 2 deletions src/execution/DAG/dag.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
* @param argc Redis command number of arguments
* @return
*/
int RedisAI_DagRun_IsKeysPositionRequest_ReportKeys(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc);
int RedisAI_DagExecute_IsKeysPositionRequest_ReportKeys(RedisModuleCtx *ctx,
RedisModuleString **argv, int argc);

/**
* @brief This callback is called at the end of a DAG run and performs unblock client and reply.
Expand Down
8 changes: 7 additions & 1 deletion src/execution/DAG/dag_builder.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,20 @@ int RAI_DAGAddOpsFromString(RAI_DAGRunCtx *run_info, const char *dag, RAI_Error
}
}

if (ParseDAGOps(rinfo, new_ops) != REDISMODULE_OK) {
if (ParseDAGExecuteOps(rinfo, new_ops, false) != REDISMODULE_OK) {
RAI_SetError(err, RAI_GetErrorCode(rinfo->err), RAI_GetError(rinfo->err));
goto cleanup;
}
rinfo->dagOpCount = array_len(rinfo->dagOps);
res = REDISMODULE_OK;

cleanup:
if (res != REDISMODULE_OK) {
// Release the ops in case of an error (otherwise the ownership is given to run_info)
for (size_t i = 0; i < array_len(new_ops); i++) {
RAI_FreeDagOp(new_ops[i]);
}
}
array_free(new_ops);
for (size_t i = 0; i < argc; i++) {
RedisModule_FreeString(NULL, argv[i]);
Expand Down
17 changes: 10 additions & 7 deletions src/execution/command_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ int RedisAI_ExecuteCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
rinfo->dagOps = array_append(rinfo->dagOps, scriptRunOp);
status = ParseScriptRunCommand(rinfo, scriptRunOp, argv, argc);
break;
case CMD_SCRIPTEXECUTE:
rinfo->single_op_dag = 1;
RAI_DagOp *scriptExecOp;
RAI_InitDagOp(&scriptExecOp);
rinfo->dagOps = array_append(rinfo->dagOps, scriptExecOp);
status = ParseScriptExecuteCommand(rinfo, scriptExecOp, argv, argc);
break;
case CMD_DAGRUN:
status = ParseDAGRunCommand(rinfo, ctx, argv, argc, ro_dag);
break;
Expand All @@ -52,6 +45,16 @@ int RedisAI_ExecuteCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
rinfo->dagOps = array_append(rinfo->dagOps, modelExecuteOp);
status = ParseModelExecuteCommand(rinfo, modelExecuteOp, argv, argc);
break;
case CMD_SCRIPTEXECUTE:
rinfo->single_op_dag = 1;
RAI_DagOp *scriptExecOp;
RAI_InitDagOp(&scriptExecOp);
rinfo->dagOps = array_append(rinfo->dagOps, scriptExecOp);
status = ParseScriptExecuteCommand(rinfo, scriptExecOp, argv, argc);
break;
case CMD_DAGEXECUTE:
status = ParseDAGExecuteCommand(rinfo, ctx, argv, argc, ro_dag);
break;
default:
break;
}
Expand Down
3 changes: 2 additions & 1 deletion src/execution/command_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ typedef enum RunCommand {
CMD_SCRIPTRUN,
CMD_DAGRUN,
CMD_MODELEXECUTE,
CMD_SCRIPTEXECUTE
CMD_SCRIPTEXECUTE,
CMD_DAGEXECUTE
} RunCommand;

/**
Expand Down
Loading