Skip to content

Commit ece8896

Browse files
authored
Merge pull request #723 from RedisAI/DAG_execute_command
Introduce AI.DAGEXECUTE command, AI.DAGRUN is now deprecated.
2 parents dc94901 + 2e8ba43 commit ece8896

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1583
-1095
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ WORKDIR /build
2929
COPY --from=redis /usr/local/ /usr/local/
3030

3131
COPY ./opt/ opt/
32-
COPY ./tests/flow/test_requirements.txt tests/flow/
32+
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow/
3333

3434
RUN FORCE=1 ./opt/readies/bin/getpy3
3535
RUN ./opt/system-setup.py

Dockerfile.arm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ WORKDIR /build
3131
COPY --from=redis /usr/local/ /usr/local/
3232

3333
COPY ./opt/ opt/
34-
COPY ./tests/flow/test_requirements.txt tests/flow
34+
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow
3535

3636
RUN ./opt/readies/bin/getpy3
3737
RUN ./opt/system-setup.py

Dockerfile.gpu

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ COPY --from=redis /usr/local/ /usr/local/
3737
RUN echo export LD_LIBRARY_PATH=/usr/local/cuda/lib64:/usr/local/cuda-11.0/lib64:$LD_LIBRARY_PATH > /etc/profile.d/cuda.sh
3838

3939
COPY ./opt/ opt/
40-
COPY ./tests/flow/test_requirements.txt tests/flow/
40+
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow/
4141

4242
RUN FORCE=1 ./opt/readies/bin/getpy3
4343
RUN ./opt/system-setup.py

Dockerfile.gpu-test

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ COPY --from=redis /usr/local/ /usr/local/
3030
RUN echo export LD_LIBRARY_PATH=/usr/local/cuda/lib64:/usr/local/cuda-11.0/lib64:$LD_LIBRARY_PATH > /etc/profile.d/cuda.sh
3131

3232
COPY ./opt/ opt/
33-
COPY ./tests/flow/test_requirements.txt tests/flow/
34-
COPY ./tests/flow/Install_RedisGears.sh tests/flow/
33+
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow/tests_setup/
34+
COPY ./tests/flow/tests_setup/Install_RedisGears.sh tests/flow/tests_setup/
3535

3636
RUN VENV=venv FORCE=1 ./opt/readies/bin/getpy3
3737

Dockerfile.jetson

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ WORKDIR /build
3838
COPY --from=redis /usr/local/ /usr/local/
3939

4040
COPY ./opt/ opt/
41-
COPY ./tests/flow/test_requirements.txt tests/flow/
41+
COPY ./tests/flow/tests_setup/test_requirements.txt tests/flow/
4242

4343
RUN FORCE=1 ./opt/readies/bin/getpy3
4444
RUN ./opt/system-setup.py

docs/commands.md

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ OK
337337

338338
## AI.MODELRUN
339339

340-
_This command is deprecated and will not be available in future versions. consider using AI.MODELEXECUTE command instead._
340+
_This command is deprecated and will not be available in future versions. consider using `AI.MODELEXECUTE` command instead._
341341

342342
The **`AI.MODELRUN`** command runs a model stored as a key's value using its specified backend and device. It accepts one or more input tensors and store output tensors.
343343

@@ -730,7 +730,110 @@ redis> > AI._SCRIPTSCAN
730730
2) "myscript:v0.1"
731731
```
732732

733+
## AI.DAGEXECUTE
734+
The **`AI.DAGEXECUTE`** command specifies a direct acyclic graph of operations to run within RedisAI.
735+
736+
It accepts one or more operations, split by the pipe-forward operator (`|>`).
737+
738+
By default, the DAG execution context is local, meaning that tensor keys appearing in the DAG only live in the scope of the command. That is, setting a tensor with `TENSORSET` will store it local memory and not set it to an actual database key. One can refer to that key in subsequent commands within the DAG, but that key won't be visible outside the DAG or to other clients - no keys are open at the database level.
739+
740+
Loading and persisting tensors from/to keyspace should be done explicitly. The user should specify which key tensors to load from keyspace using the `LOAD` keyword, and which command outputs to persist to the keyspace using the `PERSIST` keyspace. The user can also specify keys in Redis that are going to be accessed for read/write operations (for example, from within `AI.SCRIPTEXECUTE` command), by using the keyword `KEYS`.
741+
742+
As an example, if `command 1` sets a tensor, it can be referenced by any further command on the chaining.
743+
744+
A `TIMEOUT t` argument can be specified to cause a request to be removed from the queue after it sits there `t` milliseconds, meaning that the client won't be interested in the result being computed after that time (`TIMEDOUT` is returned in that case). Note that individual `MODELEXECUTE` or `SCRIPTEXECUTE` commands within the DAG do not support `TIMEOUT`. `TIMEOUT` only applies to the `DAGEXECUTE` request as a whole.
745+
746+
747+
**Redis API**
748+
749+
```
750+
AI.DAGEXECUTE [[LOAD <n> <key-1> <key-2> ... <key-n>] |
751+
[PERSIST <n> <key-1> <key-2> ... <key-n>] |
752+
[KEYS <n> <key-1> <key-2> ... <key-n>]]+
753+
[TIMEOUT t]
754+
|> <command> [|> command ...]
755+
```
756+
757+
_Arguments_
758+
759+
* **LOAD**: denotes the beginning of the input tensors keys' list, followed by the number of keys, and one or more key names
760+
* **PERSIST**: denotes the beginning of the output tensors keys' list, followed by the number of keys, and one or more key names
761+
* **KEYS**: denotes the beginning of keys' list which are used within this command, followed by the number of keys, and one or more key names. Alternately, the keys names list can be replaced with a tag which all of those keys share. Redis will verify that all potential key accesses are done to the right shard.
762+
763+
_While each of the LOAD, PERSIST and KEYS sections are optional (and may appear at most once in the command), the command must contain **at least one** of these 3 keywords._
764+
* **TIMEOUT**: an optional argument, denotes the time (in ms) after which the client is unblocked and a `TIMEDOUT` string is returned
765+
* **|> command**: the chaining operator, that denotes the beginning of a RedisAI command, followed by one of RedisAI's commands. Command splitting is done by the presence of another `|>`. The supported commands are:
766+
* `AI.TENSORSET`
767+
* `AI.TENSORGET`
768+
* `AI.MODELEXECUTE`
769+
* `AI.SCRIPTEXECUTE`
770+
771+
772+
`AI.MODELEXECUTE` and `AI.SCRIPTEXECUTE` commands can run on models or scripts that were set on different devices. RedisAI will analyze the DAG and execute commands in parallel if they are located on different devices and their inputs are available.
773+
Note that KEYS should not be specified in `AI.SCRIPTEXECUTE` commands of the DAG.
774+
775+
_Return_
776+
777+
An array with an entry per command's reply. Each entry format respects the specified command reply.
778+
In case the `DAGEXEUTE` request times out, a `TIMEDOUT` simple string is returned.
779+
780+
**Examples**
781+
782+
Assuming that running the model that's stored at 'mymodel', we define a temporary tensor 'mytensor' and use it as input, and persist only one of the two outputs - discarding 'classes' and persisting 'predictions'. In the same command return the tensor value of 'predictions'. The following command does that:
783+
784+
785+
```
786+
redis> AI.DAGEXECUTE PERSIST 1 predictions{tag} |>
787+
AI.TENSORSET mytensor FLOAT 1 2 VALUES 5 10 |>
788+
AI.MODELEXECUTE mymodel{tag} INPUTS 1 mytensor OUTPUTS 2 classes predictions{tag} |>
789+
AI.TENSORGET predictions{tag} VALUES
790+
1) OK
791+
2) OK
792+
3) 1) FLOAT
793+
2) 1) (integer) 2
794+
2) (integer) 2
795+
3) "\x00\x00\x80?\x00\x00\x00@\x00\x00@@\x00\x00\x80@"
796+
```
797+
798+
A common pattern is enqueuing multiple SCRIPTEXECUTE and MODELEXECUTE commands within a DAG. The following example uses ResNet-50,to classify images into 1000 object categories. Given that our input tensor contains each color represented as a 8-bit integer and that neural networks usually work with floating-point tensors as their input we need to cast a tensor to floating-point and normalize the values of the pixels - for that we will use `pre_process_3ch` function.
799+
800+
To optimize the classification process we can use a post process script to return only the category position with the maximum classification - for that we will use `post_process` script. Using the DAG capabilities we've removed the necessity of storing the intermediate tensors in the keyspace. You can even run the entire process without storing the output tensor, as follows:
801+
802+
```
803+
redis> AI.DAGEXECUTE KEYS 1 {tag} |>
804+
AI.TENSORSET image UINT8 224 224 3 BLOB b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00....' |>
805+
AI.SCRIPTEXECUTE imagenet_script{tag} pre_process_3ch INPUTS 1 image OUTPUTS 1 temp_key1 |>
806+
AI.MODELEXECUTE imagenet_model{tag} INPUTS 1 temp_key1 OUTPUTS 1 temp_key2 |>
807+
AI.SCRIPTEXECUTE imagenet_script{tag} post_process INPUTS 1 temp_key2 OUTPUTS 1 output |>
808+
AI.TENSORGET output VALUES
809+
1) OK
810+
2) OK
811+
3) OK
812+
4) OK
813+
5) 1) 1) (integer) 111
814+
```
815+
816+
As visible on the array reply, the label position with higher classification was 111.
817+
818+
By combining DAG with multiple SCRIPTEXECUTE and MODELEXECUTE commands we've substantially removed the overall required bandwith and network RX ( we're now returning a tensor with 1000 times less elements per classification ).
819+
820+
821+
822+
!!! warning "Intermediate memory overhead"
823+
The execution of models and scripts within the DAG may generate intermediate tensors that are not allocated by the Redis allocator, but by whatever allocator is used in the backends (which may act on main memory or GPU memory, depending on the device), thus not being limited by `maxmemory` configuration settings of Redis.
824+
825+
## AI.DAGEXECUTE_RO
826+
827+
The **`AI.DAGEXEUTE_RO`** command is a read-only variant of `AI.DAGEXECUTE`.
828+
`AI.DAGEXECUTE` is flagged as a 'write' command in the Redis command table (as it provides the `PERSIST` option, for example). Hence, read-only cluster replicas will refuse to run the command and it will be redirected to the master even if the connection is using read-only mode.
829+
830+
`AI.DAGEXECUTE_RO` behaves exactly like the original command, excluding the `PERSIST` option and `AI.SCRIPTEXECUTE` commands. It is a read-only command that can safely be with read-only replicas.
831+
832+
!!! info "Further reference"
833+
Refer to the Redis [`READONLY` command](https://redis.io/commands/readonly) for further information about read-only cluster replicas.
834+
733835
## AI.DAGRUN
836+
_This command is deprecated and will not be available in future versions. consider using `AI.DAGEXECUTE` command instead._
734837
The **`AI.DAGRUN`** command specifies a direct acyclic graph of operations to run within RedisAI.
735838

736839
It accepts one or more operations, split by the pipe-forward operator (`|>`).
@@ -817,7 +920,7 @@ By combining DAG with multiple SCRIPTRUN and MODELRUN commands we've substantial
817920
The execution of models and scripts within the DAG may generate intermediate tensors that are not allocated by the Redis allocator, but by whatever allocator is used in the backends (which may act on main memory or GPU memory, depending on the device), thus not being limited by `maxmemory` configuration settings of Redis.
818921

819922
## AI.DAGRUN_RO
820-
923+
_This command is deprecated and will not be available in future versions. consider using `AI.DAGEXECUTE_RO` command instead._
821924
The **`AI.DAGRUN_RO`** command is a read-only variant of `AI.DAGRUN`.
822925

823926
Because `AI.DAGRUN` provides the `PERSIST` option it is flagged as a 'write' command in the Redis command table. However, even when `PERSIST` isn't used, read-only cluster replicas will refuse to run the command and it will be redirected to the master even if the connection is using read-only mode.

docs/developer.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ To run all tests in a Python virtualenv, follow these steps:
121121
$ mkdir -p .env
122122
$ virtualenv .env
123123
$ source .env/bin/activate
124-
$ pip install -r tests/flow/test_requirements.txt
124+
$ pip install -r tests/flow/tests_setup/test_requirements.txt
125125
$ make -C opt test
126126

127127
**Integration tests**

opt/Makefile

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ MODULE=$(realpath $(INSTALLED_TARGET)) \
248248
CLUSTER=$(CLUSTER) \
249249
GEN=$(GEN) AOF=$(AOF) SLAVES=$(SLAVES) \
250250
VALGRIND=$(VALGRIND) \
251-
$(ROOT)/tests/flow/tests.sh
251+
$(ROOT)/tests/flow/tests_setup/tests.sh
252252
endef
253253

254254
unit_tests: build
@@ -265,7 +265,7 @@ flow_tests: build
265265
GEN=$(GEN) AOF=$(AOF) SLAVES=$(SLAVES) \
266266
VALGRIND=$(VALGRIND) \
267267
REDIS=$(REDIS) \
268-
$(ROOT)/tests/flow/tests.sh
268+
$(ROOT)/tests/flow/tests_setup/tests.sh
269269

270270
$(COVERAGE_COLLECT_REPORT)
271271

@@ -283,16 +283,16 @@ test: build
283283
GEN=$(GEN) AOF=$(AOF) SLAVES=$(SLAVES) \
284284
VALGRIND=$(VALGRIND) \
285285
REDIS=$(REDIS) \
286-
$(ROOT)/tests/flow/tests.sh
286+
$(ROOT)/tests/flow/tests_setup/tests.sh
287287
$(COVERAGE_COLLECT_REPORT)
288288

289289
#----------------------------------------------------------------------------------------------
290290

291291
valgrind:
292-
$(SHOW)$(ROOT)/tests/flow/valgrind.sh $(realpath $(INSTALLED_TARGET))
292+
$(SHOW)$(ROOT)/tests/flow/tests_setup/valgrind.sh $(realpath $(INSTALLED_TARGET))
293293

294294
callgrind:
295-
$(SHOW)CALLGRIND=1 $(ROOT)/tests/flow/valgrind.sh $(realpath $(INSTALLED_TARGET))
295+
$(SHOW)CALLGRIND=1 $(ROOT)/tests/flow/tests_setup/valgrind.sh $(realpath $(INSTALLED_TARGET))
296296

297297
#----------------------------------------------------------------------------------------------
298298

opt/system-setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def common_last(self):
7777

7878
self.run("{PYTHON} {READIES}/bin/getrmpytools".format(PYTHON=self.python, READIES=READIES))
7979

80-
self.pip_install("-r %s/tests/flow/test_requirements.txt" % ROOT)
80+
self.pip_install("-r %s/tests/flow/tests_setup/test_requirements.txt" % ROOT)
8181

8282
self.pip_install("awscli")
8383
self.pip_install("mkdocs mkdocs-material mkdocs-extensions")

ramp.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ capabilities:
1919
- backup_restore
2020
- intershard_tls
2121
exclude_commands:
22+
- ai.modelstore
2223
- ai.modelset
2324
- ai.modeldel
2425
- ai.scriptset

src/execution/DAG/dag.c

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -668,25 +668,36 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
668668
return REDISMODULE_OK;
669669
}
670670

671-
int RedisAI_DagRun_IsKeysPositionRequest_ReportKeys(RedisModuleCtx *ctx, RedisModuleString **argv,
672-
int argc) {
673-
for (size_t argpos = 1; argpos < argc; argpos++) {
674-
const char *arg_string = RedisModule_StringPtrLen(argv[argpos], NULL);
675-
if ((!strcasecmp(arg_string, "LOAD") || !strcasecmp(arg_string, "PERSIST")) &&
676-
(argpos + 1 < argc)) {
671+
int RedisAI_DagExecute_IsKeysPositionRequest_ReportKeys(RedisModuleCtx *ctx,
672+
RedisModuleString **argv, int argc) {
673+
674+
size_t argpos = 1;
675+
while (argpos < argc) {
676+
const char *arg_string = RedisModule_StringPtrLen(argv[argpos++], NULL);
677+
if (!strcasecmp(arg_string, "LOAD") || !strcasecmp(arg_string, "PERSIST") ||
678+
!strcasecmp(arg_string, "KEYS")) {
679+
if (argpos >= argc) {
680+
return REDISMODULE_ERR;
681+
}
677682
long long n_keys;
678-
argpos++;
679-
const int retval = RedisModule_StringToLongLong(argv[argpos], &n_keys);
683+
const int retval = RedisModule_StringToLongLong(argv[argpos++], &n_keys);
680684
if (retval != REDISMODULE_OK) {
681685
return REDISMODULE_ERR;
682686
}
683-
argpos++;
684-
if (n_keys > 0) {
685-
size_t last_persist_argpos = n_keys + argpos;
686-
for (; argpos < last_persist_argpos && argpos < argc; argpos++) {
687-
RedisModule_KeyAtPos(ctx, argpos);
688-
}
687+
size_t last_argpos = n_keys + argpos;
688+
if (last_argpos >= argc) {
689+
return REDISMODULE_ERR;
690+
}
691+
for (; argpos < last_argpos; argpos++) {
692+
RedisModule_KeyAtPos(ctx, argpos);
693+
}
694+
} else if (!strcasecmp(arg_string, "AI.MODELEXECUTE") ||
695+
!strcasecmp(arg_string, "AI.SCRIPTEXECUTE")) {
696+
if (argpos >= argc) {
697+
return REDISMODULE_ERR;
689698
}
699+
// After every AI.MODEL/SCRIPTEXECUTE arg comes the model/script key.
700+
RedisModule_KeyAtPos(ctx, argpos++);
690701
}
691702
}
692703
return REDISMODULE_OK;

src/execution/DAG/dag.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
144144
* @param argc Redis command number of arguments
145145
* @return
146146
*/
147-
int RedisAI_DagRun_IsKeysPositionRequest_ReportKeys(RedisModuleCtx *ctx, RedisModuleString **argv,
148-
int argc);
147+
int RedisAI_DagExecute_IsKeysPositionRequest_ReportKeys(RedisModuleCtx *ctx,
148+
RedisModuleString **argv, int argc);
149149

150150
/**
151151
* @brief This callback is called at the end of a DAG run and performs unblock client and reply.

src/execution/DAG/dag_builder.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,20 @@ int RAI_DAGAddOpsFromString(RAI_DAGRunCtx *run_info, const char *dag, RAI_Error
163163
}
164164
}
165165

166-
if (ParseDAGOps(rinfo, new_ops) != REDISMODULE_OK) {
166+
if (ParseDAGExecuteOps(rinfo, new_ops, false) != REDISMODULE_OK) {
167167
RAI_SetError(err, RAI_GetErrorCode(rinfo->err), RAI_GetError(rinfo->err));
168168
goto cleanup;
169169
}
170170
rinfo->dagOpCount = array_len(rinfo->dagOps);
171171
res = REDISMODULE_OK;
172172

173173
cleanup:
174+
if (res != REDISMODULE_OK) {
175+
// Release the ops in case of an error (otherwise the ownership is given to run_info)
176+
for (size_t i = 0; i < array_len(new_ops); i++) {
177+
RAI_FreeDagOp(new_ops[i]);
178+
}
179+
}
174180
array_free(new_ops);
175181
for (size_t i = 0; i < argc; i++) {
176182
RedisModule_FreeString(NULL, argv[i]);

src/execution/command_parser.c

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,6 @@ int RedisAI_ExecuteCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
3535
rinfo->dagOps = array_append(rinfo->dagOps, scriptRunOp);
3636
status = ParseScriptRunCommand(rinfo, scriptRunOp, argv, argc);
3737
break;
38-
case CMD_SCRIPTEXECUTE:
39-
rinfo->single_op_dag = 1;
40-
RAI_DagOp *scriptExecOp;
41-
RAI_InitDagOp(&scriptExecOp);
42-
rinfo->dagOps = array_append(rinfo->dagOps, scriptExecOp);
43-
status = ParseScriptExecuteCommand(rinfo, scriptExecOp, argv, argc);
44-
break;
4538
case CMD_DAGRUN:
4639
status = ParseDAGRunCommand(rinfo, ctx, argv, argc, ro_dag);
4740
break;
@@ -52,6 +45,16 @@ int RedisAI_ExecuteCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
5245
rinfo->dagOps = array_append(rinfo->dagOps, modelExecuteOp);
5346
status = ParseModelExecuteCommand(rinfo, modelExecuteOp, argv, argc);
5447
break;
48+
case CMD_SCRIPTEXECUTE:
49+
rinfo->single_op_dag = 1;
50+
RAI_DagOp *scriptExecOp;
51+
RAI_InitDagOp(&scriptExecOp);
52+
rinfo->dagOps = array_append(rinfo->dagOps, scriptExecOp);
53+
status = ParseScriptExecuteCommand(rinfo, scriptExecOp, argv, argc);
54+
break;
55+
case CMD_DAGEXECUTE:
56+
status = ParseDAGExecuteCommand(rinfo, ctx, argv, argc, ro_dag);
57+
break;
5558
default:
5659
break;
5760
}

src/execution/command_parser.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ typedef enum RunCommand {
88
CMD_SCRIPTRUN,
99
CMD_DAGRUN,
1010
CMD_MODELEXECUTE,
11-
CMD_SCRIPTEXECUTE
11+
CMD_SCRIPTEXECUTE,
12+
CMD_DAGEXECUTE
1213
} RunCommand;
1314

1415
/**

0 commit comments

Comments
 (0)