Skip to content

Commit c6c3dd0

Browse files
committed
Merge remote-tracking branch 'origin/rinfo_leaks' into sanitizer-dagrun
2 parents 3a98d77 + 2e885a6 commit c6c3dd0

File tree

17 files changed

+392
-75
lines changed

17 files changed

+392
-75
lines changed

docs/commands.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ _Arguments_
174174
* **device**: the device that will execute the model can be of:
175175
* **CPU**: a CPU device
176176
* **GPU**: a GPU device
177+
* **GPU:0**, ..., **GPU:n**: a specific GPU device on a multi-GPU system
177178
* **TAG**: an optional string for tagging the model such as a version number or any arbitrary identifier
178179
* **BATCHSIZE**: when provided with an `n` that is greater than 0, the engine will batch incoming requests from multiple clients that use the model with input tensors of the same shape. When `AI.MODELRUN` is called the requests queue is visited and input tensors from compatible requests are concatenated along the 0th (batch) dimension until `n` is exceeded. The model is then run for the entire batch and the results are unpacked back to the individual requests unblocking their respective clients. If the batch size of the inputs to of first request in the queue exceeds `BATCHSIZE`, the request is served immediately (default value: 0).
179180
* **MINBATCHSIZE**: when provided with an `m` that is greater than 0, the engine will postpone calls to `AI.MODELRUN` until the batch's size had reached `m`. This is primarily used to force batching during testing, but it can also be used under normal operation. In this case, note that requests for which `m` is not reached will hang indefinitely (default value: 0).
@@ -220,7 +221,7 @@ An array of alternating key-value pairs as follows:
220221
1. **MINBATCHSIZE**: The minimum size of any batch of incoming requests.
221222
1. **INPUTS**: array reply with one or more names of the model's input nodes (applicable only for TensorFlow models)
222223
1. **OUTPUTS**: array reply with one or more names of the model's output nodes (applicable only for TensorFlow models)
223-
1. **BLOB**: a blob containing the serialized model (when called with the `BLOB` argument) as a String
224+
1. **BLOB**: a blob containing the serialized model (when called with the `BLOB` argument) as a String. If the size of the serialized model exceeds `MODEL_CHUNK_SIZE` (see `AI.CONFIG` command), then an array of chunks is returned. The full serialized model can be obtained by concatenating the chunks.
224225

225226
**Examples**
226227

@@ -361,6 +362,7 @@ _Arguments_
361362
* **device**: the device that will execute the model can be of:
362363
* **CPU**: a CPU device
363364
* **GPU**: a GPU device
365+
* **GPU:0**, ..., **GPU:n**: a specific GPU device on a multi-GPU system
364366
* **script**: a string containing [TorchScript](https://pytorch.org/docs/stable/jit.html) source code
365367

366368
_Return_
@@ -719,6 +721,7 @@ _Arguments_
719721
* **TFLITE**: The TensorFlow Lite backend
720722
* **TORCH**: The PyTorch backend
721723
* **ONNX**: ONNXRuntime backend
724+
* **MODEL_CHUNK_SIZE**: Sets the size of chunks (in bytes) in which model payloads are split for serialization, replication and `MODELGET`. Default is `511 * 1024 * 1024`.
722725

723726
_Return_
724727

@@ -746,3 +749,10 @@ This loads the PyTorch backend with a full path:
746749
redis> AI.CONFIG LOADBACKEND TORCH /usr/lib/redis/modules/redisai/backends/redisai_torch/redisai_torch.so
747750
OK
748751
```
752+
753+
This sets model chunk size to one megabyte (not recommended):
754+
755+
```
756+
redis> AI.CONFIG MODEL_CHUNK_SIZE 1048576
757+
OK
758+
```

src/backends/onnxruntime.c

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ OrtValue* RAI_OrtValueFromTensors(RAI_Tensor** ts, size_t count, RAI_Error *erro
163163
return NULL;
164164
}
165165

166-
RAI_Tensor* RAI_TensorCreateFromOrtValue(OrtValue* v, size_t batch_offset, size_t batch_size, RAI_Error *error) {
166+
RAI_Tensor* RAI_TensorCreateFromOrtValue(OrtValue* v, size_t batch_offset, long long batch_size, RAI_Error *error) {
167167
OrtStatus* status = NULL;
168168
const OrtApi* ort = OrtGetApiBase()->GetApi(1);
169169

@@ -206,6 +206,7 @@ RAI_Tensor* RAI_TensorCreateFromOrtValue(OrtValue* v, size_t batch_offset, size_
206206
if (status != NULL) goto error;
207207

208208
int64_t total_batch_size = dims[0];
209+
total_batch_size = total_batch_size > 0 ? total_batch_size : 1;
209210

210211
shape = RedisModule_Calloc(ndims, sizeof(*shape));
211212
strides = RedisModule_Calloc(ndims, sizeof(*strides));
@@ -214,7 +215,12 @@ RAI_Tensor* RAI_TensorCreateFromOrtValue(OrtValue* v, size_t batch_offset, size_
214215
shape[i] = dims[i];
215216
strides[i] = 1;
216217
}
217-
shape[0] = batch_size;
218+
if (batch_size != -1) {
219+
shape[0] = batch_size;
220+
}
221+
else {
222+
batch_size = total_batch_size;
223+
}
218224
for (int64_t i = ndims - 2; i >= 0; --i)
219225
{
220226
strides[i] *= strides[i + 1] * shape[i + 1];
@@ -411,9 +417,11 @@ int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error)
411417

412418
size_t batch_sizes[nbatches];
413419
size_t batch_offsets[nbatches];
420+
size_t total_batch_size = 0;
414421
if (array_len(mctxs[0]->inputs) > 0) {
415422
for (size_t b=0; b<nbatches; ++b) {
416423
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
424+
total_batch_size += batch_sizes[b];
417425
}
418426
batch_offsets[0] = 0;
419427
for (size_t b=1; b<nbatches; ++b) {
@@ -529,14 +537,48 @@ int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error)
529537
}
530538

531539
for (size_t i = 0; i < n_output_nodes; i++) {
532-
for (size_t b=0; b<nbatches; b++) {
533-
RAI_Tensor* output_tensor = RAI_TensorCreateFromOrtValue(outputs[i], batch_offsets[b], batch_sizes[b], error);
540+
if (nbatches > 1) {
541+
OrtTensorTypeAndShapeInfo* info;
542+
status = ort->GetTensorTypeAndShape(outputs[i], &info);
543+
if (status != NULL) goto error;
544+
545+
size_t ndims;
546+
status = ort->GetDimensionsCount(info, &ndims);
547+
if (status != NULL) goto error;
548+
549+
int64_t dims[ndims];
550+
status = ort->GetDimensions(info, dims, ndims);
551+
if (status != NULL) goto error;
552+
553+
if (dims[0] != total_batch_size) {
554+
RAI_SetError(error, RAI_EMODELRUN, "ERR Model did not generate the expected batch size");
555+
ort->ReleaseStatus(status);
556+
return 1;
557+
}
558+
559+
for (size_t b=0; b<nbatches; b++) {
560+
RAI_Tensor* output_tensor = RAI_TensorCreateFromOrtValue(outputs[i], batch_offsets[b], batch_sizes[b], error);
561+
if (error->code != RAI_OK) {
562+
ort->ReleaseStatus(status);
563+
return 1;
564+
}
565+
if (output_tensor) {
566+
mctxs[b]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
567+
RAI_TensorFree(output_tensor);
568+
}
569+
else {
570+
printf("ERR: non-tensor output from ONNX models, ignoring (currently unsupported)");
571+
}
572+
}
573+
}
574+
else {
575+
RAI_Tensor* output_tensor = RAI_TensorCreateFromOrtValue(outputs[i], 0, -1, error);
534576
if (error->code != RAI_OK) {
535577
ort->ReleaseStatus(status);
536578
return 1;
537579
}
538580
if (output_tensor) {
539-
mctxs[b]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
581+
mctxs[0]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
540582
RAI_TensorFree(output_tensor);
541583
}
542584
else {

src/backends/tensorflow.c

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ DLDataType RAI_GetDLDataTypeFromTF(TF_DataType dtype) {
7979
return (DLDataType){ .bits = 0 };
8080
}
8181

82-
RAI_Tensor* RAI_TensorCreateFromTFTensor(TF_Tensor *tensor, size_t batch_offset, size_t batch_size) {
82+
RAI_Tensor* RAI_TensorCreateFromTFTensor(TF_Tensor *tensor, size_t batch_offset, long long batch_size) {
8383
RAI_Tensor* ret = RedisModule_Calloc(1, sizeof(*ret));
8484

8585
DLContext ctx = (DLContext){
@@ -89,15 +89,21 @@ RAI_Tensor* RAI_TensorCreateFromTFTensor(TF_Tensor *tensor, size_t batch_offset,
8989

9090
const size_t ndims = TF_NumDims(tensor);
9191

92-
const int64_t total_batch_size = TF_Dim(tensor, 0);
92+
int64_t total_batch_size = TF_Dim(tensor, 0);
93+
total_batch_size = total_batch_size > 0 ? total_batch_size : 1;
9394

9495
int64_t* shape = RedisModule_Calloc(ndims, sizeof(*shape));
9596
int64_t* strides = RedisModule_Calloc(ndims, sizeof(*strides));
9697
for (int64_t i = 0 ; i < ndims ; ++i) {
9798
shape[i] = TF_Dim(tensor, i);
9899
strides[i] = 1;
99100
}
100-
shape[0] = batch_size;
101+
if (batch_size != -1) {
102+
shape[0] = batch_size;
103+
}
104+
else {
105+
batch_size = total_batch_size;
106+
}
101107
for (int64_t i = ndims-2 ; i >= 0 ; --i) {
102108
strides[i] *= strides[i+1] * shape[i+1];
103109
}
@@ -475,9 +481,11 @@ int RAI_ModelRunTF(RAI_ModelRunCtx** mctxs, RAI_Error *error) {
475481

476482
size_t batch_sizes[nbatches];
477483
size_t batch_offsets[nbatches];
484+
size_t total_batch_size = 0;
478485
if (ninputs > 0) {
479486
for (size_t b=0; b<nbatches; ++b) {
480487
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
488+
total_batch_size += batch_sizes[b];
481489
}
482490
batch_offsets[0] = 0;
483491
for (size_t b=1; b<nbatches; ++b) {
@@ -531,8 +539,23 @@ int RAI_ModelRunTF(RAI_ModelRunCtx** mctxs, RAI_Error *error) {
531539
}
532540

533541
for(size_t i=0; i<noutputs; ++i) {
534-
for (size_t b=0; b<nbatches; b++) {
535-
mctxs[b]->outputs[i].tensor = RAI_TensorCreateFromTFTensor(outputTensorsValues[i], batch_offsets[b], batch_sizes[b]);
542+
if (nbatches > 1) {
543+
if (TF_NumDims(outputTensorsValues[i]) == 0) {
544+
continue;
545+
}
546+
if (TF_Dim(outputTensorsValues[i], 0) != total_batch_size) {
547+
TF_DeleteTensor(outputTensorsValues[i]);
548+
TF_DeleteStatus(status);
549+
RAI_SetError(error, RAI_EMODELRUN, "ERR Model did not generate the expected batch size");
550+
return 1;
551+
}
552+
553+
for (size_t b=0; b<nbatches; b++) {
554+
mctxs[b]->outputs[i].tensor = RAI_TensorCreateFromTFTensor(outputTensorsValues[i], batch_offsets[b], batch_sizes[b]);
555+
}
556+
}
557+
else {
558+
mctxs[0]->outputs[i].tensor = RAI_TensorCreateFromTFTensor(outputTensorsValues[i], 0, -1);
536559
}
537560
TF_DeleteTensor(outputTensorsValues[i]);
538561
}

src/backends/torch.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx** mctxs, RAI_Error *error) {
9393

9494
size_t batch_sizes[nbatches];
9595
size_t batch_offsets[nbatches];
96+
size_t total_batch_size = 0;
9697

9798
if (nbatches > 1) {
98-
size_t total_batch_size = 0;
9999
if (array_len(mctxs[0]->inputs) > 0) {
100100
for (size_t b=0; b<nbatches; ++b) {
101101
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
@@ -147,6 +147,10 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx** mctxs, RAI_Error *error) {
147147
}
148148
RAI_Tensor* output_tensor = RAI_TensorCreateFromDLTensor(outputs_dl[i]);
149149
if (nbatches > 1) {
150+
if (outputs_dl[i]->dl_tensor.shape[0] != total_batch_size) {
151+
RAI_SetError(error, RAI_EMODELRUN, "ERR Model did not generate the expected batch size");
152+
return 1;
153+
}
150154
for (size_t b=0; b<nbatches; b++) {
151155
mctxs[b]->outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]);
152156
}

src/config.c

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ long long backends_intra_op_parallelism; // number of threads used within an
2020
long long
2121
backends_inter_op_parallelism; // number of threads used for parallelism
2222
// between independent operations.
23+
long long model_chunk_size; // size of chunks used to break up model payloads.
2324

2425
/**
2526
*
@@ -69,6 +70,30 @@ int setBackendsIntraOpParallelism(long long num_threads) {
6970
return result;
7071
}
7172

73+
/**
74+
* @return size of chunks (in bytes) in which models are split for
75+
* set, get, serialization and replication.
76+
*/
77+
long long getModelChunkSize() {
78+
return model_chunk_size;
79+
}
80+
81+
/**
82+
* Set size of chunks (in bytes) in which models are split for set,
83+
* get, serialization and replication.
84+
*
85+
* @param size
86+
* @return 0 on success, or 1 if failed
87+
*/
88+
int setModelChunkSize(long long size) {
89+
int result = 1;
90+
if (size > 0) {
91+
model_chunk_size = size;
92+
result = 0;
93+
}
94+
return result;
95+
}
96+
7297
/**
7398
* Helper method for AI.CONFIG LOADBACKEND <backend_identifier>
7499
* <location_of_backend_library>
@@ -175,6 +200,26 @@ int RedisAI_Config_IntraOperationParallelism(
175200
return result;
176201
}
177202

203+
/**
204+
* Set size of chunks in which model payloads are split for set,
205+
* get, serialization and replication.
206+
*
207+
* @param chunk_size_string string containing chunk size (in bytes)
208+
* @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed
209+
*/
210+
int RedisAI_Config_ModelChunkSize(RedisModuleString *chunk_size_string) {
211+
long long temp;
212+
int result = RedisModule_StringToLongLong(chunk_size_string, &temp);
213+
// make sure chunk size is a positive integer
214+
// if not set the value to the default
215+
if (result == REDISMODULE_OK && temp < 1) {
216+
temp = REDISAI_DEFAULT_MODEL_CHUNK_SIZE;
217+
result = REDISMODULE_ERR;
218+
}
219+
result = setModelChunkSize(temp);
220+
return result;
221+
}
222+
178223
/**
179224
*
180225
* @param ctx Context in which Redis modules operate
@@ -199,34 +244,30 @@ int RAI_configParamParse(RedisModuleCtx *ctx, const char *key,
199244
else if (strcasecmp((key), "THREADS_PER_QUEUE") == 0) {
200245
ret = RedisAI_Config_QueueThreads(rsval);
201246
if (ret == REDISMODULE_OK) {
202-
char *buffer = RedisModule_Alloc(
203-
(3 + strlen(REDISAI_INFOMSG_THREADS_PER_QUEUE) + strlen((val))) *
204-
sizeof(*buffer));
205-
sprintf(buffer, "%s: %s", REDISAI_INFOMSG_THREADS_PER_QUEUE, (val));
206-
RedisModule_Log(ctx, "notice", buffer);
207-
RedisModule_Free(buffer);
247+
RedisModule_Log(ctx, "notice", "%s: %s",
248+
REDISAI_INFOMSG_THREADS_PER_QUEUE,
249+
(val));
208250
}
209251
} else if (strcasecmp((key), "INTRA_OP_PARALLELISM") == 0) {
210252
ret = RedisAI_Config_IntraOperationParallelism(rsval);
211253
if (ret == REDISMODULE_OK) {
212-
char *buffer = RedisModule_Alloc(
213-
(3 + strlen(REDISAI_INFOMSG_INTRA_OP_PARALLELISM) + strlen((val))) *
214-
sizeof(*buffer));
215-
sprintf(buffer, "%s: %lld", REDISAI_INFOMSG_INTRA_OP_PARALLELISM,
216-
getBackendsIntraOpParallelism());
217-
RedisModule_Log(ctx, "notice", buffer);
218-
RedisModule_Free(buffer);
254+
RedisModule_Log(ctx, "notice", "%s: %lld",
255+
REDISAI_INFOMSG_INTRA_OP_PARALLELISM,
256+
getBackendsIntraOpParallelism());
219257
}
220258
} else if (strcasecmp((key), "INTER_OP_PARALLELISM") == 0) {
221259
ret = RedisAI_Config_InterOperationParallelism(rsval);
222260
if (ret == REDISMODULE_OK) {
223-
char *buffer = RedisModule_Alloc(
224-
(3 + strlen(REDISAI_INFOMSG_INTER_OP_PARALLELISM) + strlen((val))) *
225-
sizeof(*buffer));
226-
sprintf(buffer, "%s: %lld", REDISAI_INFOMSG_INTER_OP_PARALLELISM,
227-
getBackendsInterOpParallelism());
228-
RedisModule_Log(ctx, "notice", buffer);
229-
RedisModule_Free(buffer);
261+
RedisModule_Log(ctx, "notice", "%s: %lld",
262+
REDISAI_INFOMSG_INTER_OP_PARALLELISM,
263+
getBackendsInterOpParallelism());
264+
}
265+
} else if (strcasecmp((key), "MODEL_CHUNK_SIZE") == 0) {
266+
ret = RedisAI_Config_ModelChunkSize(rsval);
267+
if (ret == REDISMODULE_OK) {
268+
RedisModule_Log(ctx, "notice", "%s: %lld",
269+
REDISAI_INFOMSG_MODEL_CHUNK_SIZE,
270+
getModelChunkSize());
230271
}
231272
} else if (strcasecmp((key), "BACKENDSPATH") == 0) {
232273
// already taken care of

0 commit comments

Comments
 (0)