Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ _Arguments_
* **BATCHSIZE**: when provided with an `n` that is greater than 0, the engine will batch incoming requests from multiple clients that use the model with input tensors of the same shape. When `AI.MODELEXECUTE` (or `AI.MODELRUN`) is called the requests queue is visited and input tensors from compatible requests are concatenated along the 0th (batch) dimension until `n` is exceeded. The model is then run for the entire batch and the results are unpacked back to the individual requests unblocking their respective clients. If the batch size of the inputs to of first request in the queue exceeds `BATCHSIZE`, the request is served immediately (default value: 0).
* **MINBATCHSIZE**: when provided with an `m` that is greater than 0, the engine will postpone calls to `AI.MODELEXECUTE` until the batch's size had reached `m`. In this case, note that requests for which `m` is not reached will hang indefinitely (default value: 0), unless `MINBATCHTIMEOUT` is provided.
* **MINBATCHTIMEOUT**: when provided with a `t` (expressed in milliseconds) that is greater than 0, the engine will trigger a run even though `MINBATCHSIZE` has not been reached after `t` milliseconds from the time a `MODELEXECUTE` (or the enclosing `DAGRUN`) is enqueued. This only applies to cases where both `BATCHSIZE` and `MINBATCHSIZE` are greater than 0.
* **INPUTS**: denotes that one or more names of the model's input nodes are following (applicable only for TensorFlow models)
* **input_count**: a positive number that indicates the number of following input nodes.
* **OUTPUTS**: denotes that one or more names of the model's output nodes are following (applicable only for TensorFlow models)
* **output_count**: a positive number that indicates the number of following input nodes.
* **INPUTS**: denotes that one or more names of the model's input nodes are following, applicable only for TensorFlow models (specifying INPUTS for other backends will cause an error)
* **input_count**: a positive number that indicates the number of following input nodes (also applicable only for TensorFlow)
* **OUTPUTS**: denotes that one or more names of the model's output nodes are following, applicable only for TensorFlow models (specifying OUTPUTS for other backends will cause an error)
* **output_count**: a positive number that indicates the number of following input nodes (also applicable only for TensorFlow)
* **model**: the Protobuf-serialized model. Since Redis supports strings up to 512MB, blobs for very large models need to be chunked, e.g. `BLOB chunk1 chunk2 ...`.

_Return_
Expand Down
2 changes: 1 addition & 1 deletion docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ AI.TENSORSET tB FLOAT 2 VALUES 3 5
The model can now be run with the [`AI.MODELEXECUTE` command](commands.md#aimodelexecute) as follows:

```
AI.MODELEXECUTE mymodel INPUTS tA tB OUTPUTS tResult
AI.MODELEXECUTE mymodel INPUTS 2 tA tB OUTPUTS 1 tResult
```

!!! example "Example: running a model"
Expand Down
102 changes: 62 additions & 40 deletions src/backends/onnxruntime.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ DLDataType RAI_GetDLDataTypeFromORT(ONNXTensorElementDataType dtype) {
}
}

OrtValue *RAI_OrtValueFromTensors(RAI_Tensor **ts, size_t count, RAI_Error *error) {
int RAI_OrtValueFromTensors(RAI_Tensor **ts, size_t count, OrtValue **input,
OrtStatus **status_ptr) {
OrtStatus *status = NULL;
const OrtApi *ort = OrtGetApiBase()->GetApi(1);

Expand Down Expand Up @@ -218,12 +219,12 @@ OrtValue *RAI_OrtValueFromTensors(RAI_Tensor **ts, size_t count, RAI_Error *erro
RAI_TensorByteSize(t0), t0->tensor.dl_tensor.shape, t0->tensor.dl_tensor.ndim,
RAI_GetOrtDataTypeFromDL(t0->tensor.dl_tensor.dtype), &out))
}
return out;
*input = out;
return REDISMODULE_OK;

error:
RAI_SetError(error, RAI_EMODELRUN, ort->GetErrorMessage(status));
ort->ReleaseStatus(status);
return NULL;
*status_ptr = status;
return REDISMODULE_ERR;
}

RAI_Tensor *RAI_TensorCreateFromOrtValue(OrtValue *v, size_t batch_offset, long long batch_size,
Expand All @@ -233,6 +234,7 @@ RAI_Tensor *RAI_TensorCreateFromOrtValue(OrtValue *v, size_t batch_offset, long
RAI_Tensor *ret = NULL;
int64_t *shape = NULL;
int64_t *strides = NULL;
OrtTensorTypeAndShapeInfo *info = NULL;

int is_tensor;
ONNX_VALIDATE_STATUS(ort->IsTensor(v, &is_tensor))
Expand All @@ -245,7 +247,6 @@ RAI_Tensor *RAI_TensorCreateFromOrtValue(OrtValue *v, size_t batch_offset, long
ret = RAI_TensorNew();
DLDevice device = (DLDevice){.device_type = kDLCPU, .device_id = 0};

OrtTensorTypeAndShapeInfo *info;
ONNX_VALIDATE_STATUS(ort->GetTensorTypeAndShape(v, &info))

{
Expand Down Expand Up @@ -323,6 +324,9 @@ RAI_Tensor *RAI_TensorCreateFromOrtValue(OrtValue *v, size_t batch_offset, long
if (ret != NULL) {
RedisModule_Free(ret);
}
if (info != NULL) {
ort->ReleaseTensorTypeAndShapeInfo(info);
}
return NULL;
}

Expand Down Expand Up @@ -497,19 +501,16 @@ int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error) {
}

OrtStatus *status = NULL;
size_t n_input_nodes;
ONNX_VALIDATE_STATUS(ort->SessionGetInputCount(session, &n_input_nodes))

size_t n_output_nodes;
ONNX_VALIDATE_STATUS(ort->SessionGetOutputCount(session, &n_output_nodes)) {
const char *input_names[n_input_nodes];
const char *output_names[n_output_nodes];

OrtValue *inputs[n_input_nodes];
OrtValue *outputs[n_output_nodes];

const size_t ninputs = array_len(mctxs[0]->inputs);
const size_t noutputs = array_len(mctxs[0]->outputs);
const size_t ninputs = array_len(mctxs[0]->inputs);
const size_t noutputs = array_len(mctxs[0]->outputs);
array_new_on_stack(const char *, 5, input_names)
array_new_on_stack(const char *, 5, output_names) array_new_on_stack(OrtValue *, 5, inputs)
array_new_on_stack(OrtValue *, 5, outputs) OrtTensorTypeAndShapeInfo *info = NULL;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
array_new_on_stack(const char *, 5, input_names)
array_new_on_stack(const char *, 5, output_names) array_new_on_stack(OrtValue *, 5, inputs)
array_new_on_stack(OrtValue *, 5, outputs) OrtTensorTypeAndShapeInfo *info = NULL;
int stack_arr_size = 5;
array_new_on_stack(const char *, stack_arr_size , input_names);
array_new_on_stack(const char *, stack_arr_size , output_names);
array_new_on_stack(OrtValue *, stack_arr_size , inputs);
array_new_on_stack(OrtValue *, stack_arr_size , outputs);
OrtTensorTypeAndShapeInfo *info = NULL;

you also need to free inputs and outputs in the end

{
size_t n_input_nodes;
size_t n_output_nodes;
ONNX_VALIDATE_STATUS(ort->SessionGetInputCount(session, &n_input_nodes))
ONNX_VALIDATE_STATUS(ort->SessionGetOutputCount(session, &n_output_nodes))

if (ninputs != n_input_nodes) {
char msg[70];
Expand All @@ -529,26 +530,26 @@ int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error) {
char *input_name;
ONNX_VALIDATE_STATUS(
ort->SessionGetInputName(session, i, global_allocator, &input_name))
input_names[i] = input_name;
input_names = array_append(input_names, input_name);

RAI_Tensor *batched_input_tensors[nbatches];
for (size_t b = 0; b < nbatches; b++) {
batched_input_tensors[b] = mctxs[b]->inputs[i].tensor;
}

inputs[i] = RAI_OrtValueFromTensors(batched_input_tensors, nbatches, error);
if (error->code != RAI_OK) {
ort->ReleaseStatus(status);
return REDISMODULE_ERR;
OrtValue *input;
if (RAI_OrtValueFromTensors(batched_input_tensors, nbatches, &input, &status) !=
REDISMODULE_OK) {
goto error;
}
inputs = array_append(inputs, input);
}

for (size_t i = 0; i < n_output_nodes; i++) {
char *output_name;
ONNX_VALIDATE_STATUS(
ort->SessionGetOutputName(session, i, global_allocator, &output_name))
output_names[i] = output_name;
outputs[i] = NULL;
output_names = array_append(output_names, output_name);
outputs = array_append(outputs, NULL);
}

OrtRunOptions *run_options = NULL;
Expand All @@ -559,13 +560,14 @@ int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error) {
for (uint32_t i = 0; i < ninputs; i++) {
status = ort->AllocatorFree(global_allocator, (void *)input_names[i]);
}
array_free(input_names);
for (uint32_t i = 0; i < noutputs; i++) {
status = ort->AllocatorFree(global_allocator, (void *)output_names[i]);
}
array_free(output_names);

for (size_t i = 0; i < n_output_nodes; i++) {
if (nbatches > 1) {
OrtTensorTypeAndShapeInfo *info;
ONNX_VALIDATE_STATUS(ort->GetTensorTypeAndShape(outputs[i], &info))
size_t ndims;
ONNX_VALIDATE_STATUS(ort->GetDimensionsCount(info, &ndims))
Expand All @@ -575,37 +577,36 @@ int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error) {
if (dims[0] != total_batch_size) {
RAI_SetError(error, RAI_EMODELRUN,
"ERR Model did not generate the expected batch size");
ort->ReleaseStatus(status);
return REDISMODULE_ERR;
goto error;
}

for (size_t b = 0; b < nbatches; b++) {
RAI_Tensor *output_tensor = RAI_TensorCreateFromOrtValue(
outputs[i], batch_offsets[b], batch_sizes[b], error);
if (error->code != RAI_OK) {
ort->ReleaseStatus(status);
return REDISMODULE_ERR;
goto error;
}
if (output_tensor) {
mctxs[b]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
RAI_TensorFree(output_tensor);
} else {
printf("ERR: non-tensor output from ONNX models, ignoring (currently "
"unsupported)");
RedisModule_Log(NULL, "warning",
"non-tensor output from ONNX models, ignoring (currently "
"unsupported)");
}
}
} else {
RAI_Tensor *output_tensor = RAI_TensorCreateFromOrtValue(outputs[i], 0, -1, error);
if (error->code != RAI_OK) {
ort->ReleaseStatus(status);
return REDISMODULE_ERR;
goto error;
}
if (output_tensor) {
mctxs[0]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
RAI_TensorFree(output_tensor);
} else {
printf("ERR: non-tensor output from ONNX models, ignoring (currently "
"unsupported)");
RedisModule_Log(NULL, "warning",
"non-tensor output from ONNX models, ignoring (currently "
"unsupported)");
}
}
ort->ReleaseValue(outputs[i]);
Expand All @@ -617,8 +618,29 @@ int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error) {
}

error:
RAI_SetError(error, RAI_EMODELRUN, ort->GetErrorMessage(status));
ort->ReleaseStatus(status);
if (status) {
RAI_SetError(error, RAI_EMODELRUN, ort->GetErrorMessage(status));
ort->ReleaseStatus(status);
}
for (uint32_t i = 0; i < array_len(input_names); i++) {
status = ort->AllocatorFree(global_allocator, (void *)input_names[i]);
}
array_free(input_names);
for (uint32_t i = 0; i < array_len(output_names); i++) {
status = ort->AllocatorFree(global_allocator, (void *)output_names[i]);
}
array_free(output_names);
for (size_t i = 0; i < array_len(inputs); i++) {
ort->ReleaseValue(inputs[i]);
}
array_free(inputs);
for (size_t i = 0; i < array_len(outputs); i++) {
ort->ReleaseValue(outputs[i]);
}
array_free(outputs);
if (info) {
ort->ReleaseTensorTypeAndShapeInfo(info);
}
return REDISMODULE_ERR;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/flow/includes.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def tmpfn():


# Load a model/script from a file located in test_data dir.
def load_from_file(file_name):
def load_file_content(file_name):
test_data_path = os.path.join(os.path.dirname(__file__), 'test_data')
filename = os.path.join(test_data_path, file_name)
with open(filename, 'rb') as f:
Expand Down
4 changes: 2 additions & 2 deletions tests/flow/tests_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_modelstore_errors(env):
return

con = env.getConnection()
model_pb = load_from_file('pt-minimal.pt')
model_pb = load_file_content('pt-minimal.pt')

# Check that the basic arguments are valid (model's key, device, backend, blob)
check_error_message(env, con, "wrong number of arguments for 'AI.MODELSTORE' command",
Expand Down Expand Up @@ -79,7 +79,7 @@ def test_modelexecute_errors(env):
return
con = env.getConnection()

model_pb = load_from_file('graph.pb')
model_pb = load_file_content('graph.pb')
ret = con.execute_command('AI.MODELSTORE', 'm{1}', 'TF', DEVICE,
'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', model_pb)
env.assertEqual(ret, b'OK')
Expand Down
18 changes: 9 additions & 9 deletions tests/flow/tests_deprecated_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_modelset_errors(env):
return

con = env.getConnection()
model_pb = load_from_file('pt-minimal.pt')
model_pb = load_file_content('pt-minimal.pt')

# test validity of backend and device args.
check_error_message(env, con, "wrong number of arguments for 'AI.MODELSET' command",
Expand Down Expand Up @@ -45,7 +45,7 @@ def test_modelset_errors(env):
'AI.MODELSET', 'm{1}', 'TORCH', DEVICE, 'BATCHSIZE', 2, 'BLOB')

# test INPUTS and OUTPUTS args for TF backend
model_pb = load_from_file('graph.pb')
model_pb = load_file_content('graph.pb')
check_error_message(env, con, "Insufficient arguments, INPUTS and OUTPUTS not specified",
'AI.MODELSET', 'm_1{1}', 'TF', DEVICE, 'BLOB', model_pb)
check_error_message(env, con, "INPUTS not specified",
Expand All @@ -62,7 +62,7 @@ def test_modelrun_errors(env):
return
con = env.getConnection()

model_pb = load_from_file('graph.pb')
model_pb = load_file_content('graph.pb')
ret = con.execute_command('AI.MODELSET', 'm{1}', 'TF', DEVICE,
'INPUTS', 'a', 'b', 'OUTPUTS', 'mul', 'BLOB', model_pb)
env.assertEqual(ret, b'OK')
Expand Down Expand Up @@ -94,7 +94,7 @@ def test_modelset_modelrun_tf(env):
return
con = env.getConnection()

model_pb = load_from_file('graph.pb')
model_pb = load_file_content('graph.pb')
ret = con.execute_command('AI.MODELSET', 'm{1}', 'TF', DEVICE, 'TAG', 'version:1',
'INPUTS', 'a', 'b', 'OUTPUTS', 'mul', 'BLOB', model_pb)
env.assertEqual(ret, b'OK')
Expand Down Expand Up @@ -123,8 +123,8 @@ def test_modelset_modelrun_tflite(env):
return

con = env.getConnection()
model_pb = load_from_file('mnist_model_quant.tflite')
sample_raw = load_from_file('one.raw')
model_pb = load_file_content('mnist_model_quant.tflite')
sample_raw = load_file_content('one.raw')

ret = con.execute_command('AI.MODELSET', 'm{1}', 'TFLITE', 'CPU', 'TAG', 'asdf', 'BLOB', model_pb)
env.assertEqual(ret, b'OK')
Expand Down Expand Up @@ -153,7 +153,7 @@ def test_modelset_modelrun_pytorch(env):
return

con = env.getConnection()
model_pb = load_from_file('pt-minimal.pt')
model_pb = load_file_content('pt-minimal.pt')

ret = con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3)
env.assertEqual(ret, b'OK')
Expand Down Expand Up @@ -187,8 +187,8 @@ def test_modelset_modelrun_onnx(env):
return

con = env.getConnection()
model_pb = load_from_file('mnist.onnx')
sample_raw = load_from_file('one.raw')
model_pb = load_file_content('mnist.onnx')
sample_raw = load_file_content('one.raw')

ret = con.execute_command('AI.MODELSET', 'm{1}', 'ONNX', DEVICE, 'TAG', 'version:2', 'BLOB', model_pb)
env.assertEqual(ret, b'OK')
Expand Down
Loading