Skip to content

Commit 26e352b

Browse files
Merge pull request #325 from RedisAI/batching_threading_fix
Refactor batching mechanism, solve threading issue
2 parents 4f2e4c0 + f7e396f commit 26e352b

File tree

14 files changed

+114
-195
lines changed

14 files changed

+114
-195
lines changed

src/backends.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ int RAI_LoadBackend_TensorFlow(RedisModuleCtx *ctx, const char *path) {
9292
return REDISMODULE_ERR;
9393
}
9494

95-
backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*))
95+
backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*))
9696
(unsigned long) dlsym(handle, "RAI_ModelRunTF");
9797
if (backend.model_run == NULL) {
9898
dlclose(handle);
@@ -157,7 +157,7 @@ int RAI_LoadBackend_TFLite(RedisModuleCtx *ctx, const char *path) {
157157
return REDISMODULE_ERR;
158158
}
159159

160-
backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*))
160+
backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*))
161161
(unsigned long) dlsym(handle, "RAI_ModelRunTFLite");
162162
if (backend.model_run == NULL) {
163163
dlclose(handle);
@@ -222,7 +222,7 @@ int RAI_LoadBackend_Torch(RedisModuleCtx *ctx, const char *path) {
222222
return REDISMODULE_ERR;
223223
}
224224

225-
backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*))
225+
backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*))
226226
(unsigned long) dlsym(handle, "RAI_ModelRunTorch");
227227
if (backend.model_run == NULL) {
228228
dlclose(handle);
@@ -311,7 +311,7 @@ int RAI_LoadBackend_ONNXRuntime(RedisModuleCtx *ctx, const char *path) {
311311
return REDISMODULE_ERR;
312312
}
313313

314-
backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*))
314+
backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*))
315315
(unsigned long) dlsym(handle, "RAI_ModelRunORT");
316316
if (backend.model_run == NULL) {
317317
dlclose(handle);

src/backends.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ typedef struct RAI_LoadedBackend {
1414
RAI_Model* (*model_create)(RAI_Backend, const char*, RAI_ModelOpts,
1515
const char*, size_t, RAI_Error*);
1616
void (*model_free)(RAI_Model*, RAI_Error*);
17-
int (*model_run)(RAI_ModelRunCtx*, RAI_Error*);
17+
int (*model_run)(RAI_ModelRunCtx**, RAI_Error*);
1818
int (*model_serialize)(RAI_Model*, char**, size_t*, RAI_Error*);
1919

2020
RAI_Script* (*script_create)(const char*, const char*, RAI_Error*);

src/backends/onnxruntime.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -401,28 +401,28 @@ void RAI_ModelFreeORT(RAI_Model* model, RAI_Error* error) {
401401
model->session = NULL;
402402
}
403403

404-
int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error)
404+
int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error)
405405
{
406406
const OrtApi* ort = OrtGetApiBase()->GetApi(1);
407407

408-
OrtSession *session = mctx->model->session;
408+
OrtSession *session = mctxs[0]->model->session;
409409

410410
if (session == NULL) {
411411
RAI_SetError(error, RAI_EMODELRUN, "ERR ONNXRuntime session was not allocated");
412412
return 1;
413413
}
414414

415-
const size_t nbatches = array_len(mctx->batches);
415+
const size_t nbatches = array_len(mctxs);
416416
if (nbatches == 0) {
417417
RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run");
418418
return 1;
419419
}
420420

421421
size_t batch_sizes[nbatches];
422422
size_t batch_offsets[nbatches];
423-
if (array_len(mctx->batches[0].inputs) > 0) {
423+
if (array_len(mctxs[0]->inputs) > 0) {
424424
for (size_t b=0; b<nbatches; ++b) {
425-
batch_sizes[b] = RAI_TensorDim(mctx->batches[b].inputs[0].tensor, 0);
425+
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
426426
}
427427
batch_offsets[0] = 0;
428428
for (size_t b=1; b<nbatches; ++b) {
@@ -457,8 +457,8 @@ int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error)
457457
OrtValue *inputs[n_input_nodes];
458458
OrtValue *outputs[n_output_nodes];
459459

460-
const size_t ninputs = array_len(mctx->batches[0].inputs);
461-
const size_t noutputs = array_len(mctx->batches[0].outputs);
460+
const size_t ninputs = array_len(mctxs[0]->inputs);
461+
const size_t noutputs = array_len(mctxs[0]->outputs);
462462

463463
if (ninputs != n_input_nodes) {
464464
char msg[70];
@@ -485,7 +485,7 @@ int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error)
485485

486486
RAI_Tensor* batched_input_tensors[nbatches];
487487
for (size_t b=0; b<nbatches; b++) {
488-
batched_input_tensors[b] = mctx->batches[b].inputs[i].tensor;
488+
batched_input_tensors[b] = mctxs[b]->inputs[i].tensor;
489489
}
490490

491491
inputs[i] = RAI_OrtValueFromTensors(batched_input_tensors, nbatches, error);
@@ -545,7 +545,7 @@ int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error)
545545
return 1;
546546
}
547547
if (output_tensor) {
548-
mctx->batches[b].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
548+
mctxs[b]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
549549
RAI_TensorFree(output_tensor);
550550
}
551551
else {

src/backends/onnxruntime.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ RAI_Model *RAI_ModelCreateORT(RAI_Backend backend, const char* devicestr, RAI_M
1414

1515
void RAI_ModelFreeORT(RAI_Model *model, RAI_Error *error);
1616

17-
int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error);
17+
int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error);
1818

1919
int RAI_ModelSerializeORT(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error);
2020

src/backends/tensorflow.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -419,27 +419,27 @@ void RAI_ModelFreeTF(RAI_Model* model, RAI_Error* error) {
419419
TF_DeleteStatus(status);
420420
}
421421

422-
int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) {
422+
int RAI_ModelRunTF(RAI_ModelRunCtx** mctxs, RAI_Error *error) {
423423
TF_Status *status = TF_NewStatus();
424424

425-
const size_t nbatches = array_len(mctx->batches);
425+
const size_t nbatches = array_len(mctxs);
426426
if (nbatches == 0) {
427427
RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run");
428428
return 1;
429429
}
430430

431-
const size_t ninputs = array_len(mctx->batches[0].inputs);
432-
const size_t noutputs = array_len(mctx->batches[0].outputs);
431+
const size_t ninputs = array_len(mctxs[0]->inputs);
432+
const size_t noutputs = array_len(mctxs[0]->outputs);
433433
TF_Tensor* inputTensorsValues[ninputs];
434434
TF_Output inputs[ninputs];
435435
TF_Tensor* outputTensorsValues[noutputs];
436436
TF_Output outputs[noutputs];
437437

438438
size_t batch_sizes[nbatches];
439439
size_t batch_offsets[nbatches];
440-
if (array_len(mctx->batches[0].inputs) > 0) {
440+
if (array_len(mctxs[0]->inputs) > 0) {
441441
for (size_t b=0; b<nbatches; ++b) {
442-
batch_sizes[b] = RAI_TensorDim(mctx->batches[b].inputs[0].tensor, 0);
442+
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
443443
}
444444
batch_offsets[0] = 0;
445445
for (size_t b=1; b<nbatches; ++b) {
@@ -451,12 +451,12 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) {
451451
RAI_Tensor* batched_input_tensors[nbatches];
452452

453453
for (size_t b=0; b<nbatches; ++b) {
454-
batched_input_tensors[b] = mctx->batches[b].inputs[i].tensor;
454+
batched_input_tensors[b] = mctxs[b]->inputs[i].tensor;
455455
}
456456
// inputTensorsValues[i] = RAI_TFTensorFromTensor(mctx->inputs[i].tensor);
457457
inputTensorsValues[i] = RAI_TFTensorFromTensors(batched_input_tensors, nbatches);
458458
TF_Output port;
459-
port.oper = TF_GraphOperationByName(mctx->model->model, mctx->batches[0].inputs[i].name);
459+
port.oper = TF_GraphOperationByName(mctxs[0]->model->model, mctxs[0]->inputs[i].name);
460460
port.index = 0;
461461
if(port.oper == NULL){
462462
return 1;
@@ -466,15 +466,15 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) {
466466

467467
for (size_t i=0 ; i<noutputs; ++i) {
468468
TF_Output port;
469-
port.oper = TF_GraphOperationByName(mctx->model->model, mctx->batches[0].outputs[i].name);
469+
port.oper = TF_GraphOperationByName(mctxs[0]->model->model, mctxs[0]->outputs[i].name);
470470
port.index = 0;
471471
if(port.oper == NULL){
472472
return 1;
473473
}
474474
outputs[i] = port;
475475
}
476476

477-
TF_SessionRun(mctx->model->session, NULL /* run_options */,
477+
TF_SessionRun(mctxs[0]->model->session, NULL /* run_options */,
478478
inputs, inputTensorsValues, ninputs,
479479
outputs, outputTensorsValues, noutputs,
480480
NULL /* target_opers */, 0 /* ntargets */,
@@ -496,7 +496,7 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) {
496496
for(size_t i=0; i<noutputs; ++i) {
497497
for (size_t b=0; b<nbatches; b++) {
498498
RAI_Tensor* output_tensor = RAI_TensorCreateFromTFTensor(outputTensorsValues[i], batch_offsets[b], batch_sizes[b]);
499-
mctx->batches[b].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
499+
mctxs[b]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
500500
RAI_TensorFree(output_tensor);
501501
}
502502
TF_DeleteTensor(outputTensorsValues[i]);

src/backends/tensorflow.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ RAI_Model *RAI_ModelCreateTF(RAI_Backend backend, const char* devicestr, RAI_Mod
1616

1717
void RAI_ModelFreeTF(RAI_Model *model, RAI_Error *error);
1818

19-
int RAI_ModelRunTF(RAI_ModelRunCtx *mctx, RAI_Error *error);
19+
int RAI_ModelRunTF(RAI_ModelRunCtx **mctxs, RAI_Error *error);
2020

2121
int RAI_ModelSerializeTF(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error);
2222

src/backends/tflite.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,16 @@ void RAI_ModelFreeTFLite(RAI_Model* model, RAI_Error *error) {
8282
model->model = NULL;
8383
}
8484

85-
int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
85+
int RAI_ModelRunTFLite(RAI_ModelRunCtx** mctxs, RAI_Error *error) {
8686

87-
const size_t nbatches = array_len(mctx->batches);
87+
const size_t nbatches = array_len(mctxs);
8888
if (nbatches == 0) {
8989
RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run");
9090
return 1;
9191
}
9292

93-
const size_t ninputs = array_len(mctx->batches[0].inputs);
94-
const size_t noutputs = array_len(mctx->batches[0].outputs);
93+
const size_t ninputs = array_len(mctxs[0]->inputs);
94+
const size_t noutputs = array_len(mctxs[0]->outputs);
9595

9696
RAI_Tensor* inputs[ninputs];
9797

@@ -103,9 +103,9 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
103103
size_t total_batch_size = 0;
104104

105105
if (nbatches > 1) {
106-
if (array_len(mctx->batches[0].inputs) > 0) {
106+
if (array_len(mctxs[0]->inputs) > 0) {
107107
for (size_t b=0; b<nbatches; ++b) {
108-
batch_sizes[b] = RAI_TensorDim(mctx->batches[b].inputs[0].tensor, 0);
108+
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
109109
total_batch_size += batch_sizes[b];
110110
}
111111
batch_offsets[0] = 0;
@@ -118,7 +118,7 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
118118
RAI_Tensor* batch[nbatches];
119119

120120
for (size_t b=0; b<nbatches; b++) {
121-
batch[b] = mctx->batches[b].inputs[i].tensor;
121+
batch[b] = mctxs[b]->inputs[i].tensor;
122122
}
123123

124124
inputs[i] = RAI_TensorCreateByConcatenatingTensors(batch, nbatches);
@@ -127,7 +127,7 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
127127
}
128128
else {
129129
for (size_t i=0 ; i<ninputs; ++i) {
130-
inputs[i] = RAI_TensorGetShallowCopy(mctx->batches[0].inputs[i].tensor);
130+
inputs[i] = RAI_TensorGetShallowCopy(mctxs[0]->inputs[i].tensor);
131131
inputs_dl[i] = &inputs[i]->tensor;
132132
}
133133
}
@@ -137,7 +137,7 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
137137
}
138138

139139
char* error_descr = NULL;
140-
tfliteRunModel(mctx->model->model,
140+
tfliteRunModel(mctxs[0]->model->model,
141141
ninputs, inputs_dl, noutputs, outputs_dl,
142142
&error_descr, RedisModule_Alloc);
143143

@@ -160,11 +160,11 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
160160
}
161161
if (nbatches > 1) {
162162
for (size_t b=0; b<nbatches; b++) {
163-
mctx->batches[b].outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]);
163+
mctxs[b]->outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]);
164164
}
165165
}
166166
else {
167-
mctx->batches[0].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
167+
mctxs[0]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
168168
}
169169
RAI_TensorFree(output_tensor);
170170
RedisModule_Free(outputs_dl[i]);

src/backends/tflite.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ RAI_Model *RAI_ModelCreateTFLite(RAI_Backend backend, const char* devicestr, RAI
1414

1515
void RAI_ModelFreeTFLite(RAI_Model *model, RAI_Error *error);
1616

17-
int RAI_ModelRunTFLite(RAI_ModelRunCtx *mctx, RAI_Error *error);
17+
int RAI_ModelRunTFLite(RAI_ModelRunCtx **mctxs, RAI_Error *error);
1818

1919
int RAI_ModelSerializeTFLite(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error);
2020

src/backends/torch.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,15 @@ void RAI_ModelFreeTorch(RAI_Model* model, RAI_Error *error) {
6868
torchDeallocContext(model->model);
6969
}
7070

71-
int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
72-
const size_t nbatches = array_len(mctx->batches);
71+
int RAI_ModelRunTorch(RAI_ModelRunCtx** mctxs, RAI_Error *error) {
72+
const size_t nbatches = array_len(mctxs);
7373
if (nbatches == 0) {
7474
RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run");
7575
return 1;
7676
}
7777

78-
const size_t ninputs = array_len(mctx->batches[0].inputs);
79-
const size_t noutputs = array_len(mctx->batches[0].outputs);
78+
const size_t ninputs = array_len(mctxs[0]->inputs);
79+
const size_t noutputs = array_len(mctxs[0]->outputs);
8080

8181
RAI_Tensor* inputs[ninputs];
8282

@@ -88,9 +88,9 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
8888

8989
if (nbatches > 1) {
9090
size_t total_batch_size = 0;
91-
if (array_len(mctx->batches[0].inputs) > 0) {
91+
if (array_len(mctxs[0]->inputs) > 0) {
9292
for (size_t b=0; b<nbatches; ++b) {
93-
batch_sizes[b] = RAI_TensorDim(mctx->batches[b].inputs[0].tensor, 0);
93+
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
9494
total_batch_size += batch_sizes[b];
9595
}
9696
batch_offsets[0] = 0;
@@ -103,7 +103,7 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
103103
RAI_Tensor* batch[nbatches];
104104

105105
for (size_t b=0; b<nbatches; b++) {
106-
batch[b] = mctx->batches[b].inputs[i].tensor;
106+
batch[b] = mctxs[b]->inputs[i].tensor;
107107
}
108108

109109
inputs[i] = RAI_TensorCreateByConcatenatingTensors(batch, nbatches);
@@ -112,7 +112,7 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
112112
}
113113
else {
114114
for (size_t i=0 ; i<ninputs; ++i) {
115-
inputs[i] = RAI_TensorGetShallowCopy(mctx->batches[0].inputs[i].tensor);
115+
inputs[i] = RAI_TensorGetShallowCopy(mctxs[0]->inputs[i].tensor);
116116
inputs_dl[i] = &inputs[i]->tensor;
117117
}
118118
}
@@ -122,7 +122,7 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
122122
}
123123

124124
char* error_descr = NULL;
125-
torchRunModel(mctx->model->model,
125+
torchRunModel(mctxs[0]->model->model,
126126
ninputs, inputs_dl, noutputs, outputs_dl,
127127
&error_descr, RedisModule_Alloc);
128128

@@ -140,11 +140,11 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
140140
RAI_Tensor* output_tensor = RAI_TensorCreateFromDLTensor(outputs_dl[i]);
141141
if (nbatches > 1) {
142142
for (size_t b=0; b<nbatches; b++) {
143-
mctx->batches[b].outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]);
143+
mctxs[b]->outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]);
144144
}
145145
}
146146
else {
147-
mctx->batches[0].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
147+
mctxs[0]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
148148
}
149149
RAI_TensorFree(output_tensor);
150150
}

src/backends/torch.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ RAI_Model *RAI_ModelCreateTorch(RAI_Backend backend, const char* devicestr, RAI_
1515

1616
void RAI_ModelFreeTorch(RAI_Model *model, RAI_Error *error);
1717

18-
int RAI_ModelRunTorch(RAI_ModelRunCtx *mctx, RAI_Error *error);
18+
int RAI_ModelRunTorch(RAI_ModelRunCtx **mctxs, RAI_Error *error);
1919

2020
int RAI_ModelSerializeTorch(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error);
2121

0 commit comments

Comments
 (0)