Skip to content

Refactor batching mechanism, solve threading issue #325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/backends.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ int RAI_LoadBackend_TensorFlow(RedisModuleCtx *ctx, const char *path) {
return REDISMODULE_ERR;
}

backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*))
backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*))
(unsigned long) dlsym(handle, "RAI_ModelRunTF");
if (backend.model_run == NULL) {
dlclose(handle);
Expand Down Expand Up @@ -157,7 +157,7 @@ int RAI_LoadBackend_TFLite(RedisModuleCtx *ctx, const char *path) {
return REDISMODULE_ERR;
}

backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*))
backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*))
(unsigned long) dlsym(handle, "RAI_ModelRunTFLite");
if (backend.model_run == NULL) {
dlclose(handle);
Expand Down Expand Up @@ -222,7 +222,7 @@ int RAI_LoadBackend_Torch(RedisModuleCtx *ctx, const char *path) {
return REDISMODULE_ERR;
}

backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*))
backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*))
(unsigned long) dlsym(handle, "RAI_ModelRunTorch");
if (backend.model_run == NULL) {
dlclose(handle);
Expand Down Expand Up @@ -311,7 +311,7 @@ int RAI_LoadBackend_ONNXRuntime(RedisModuleCtx *ctx, const char *path) {
return REDISMODULE_ERR;
}

backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*))
backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*))
(unsigned long) dlsym(handle, "RAI_ModelRunORT");
if (backend.model_run == NULL) {
dlclose(handle);
Expand Down
2 changes: 1 addition & 1 deletion src/backends.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ typedef struct RAI_LoadedBackend {
RAI_Model* (*model_create)(RAI_Backend, const char*, RAI_ModelOpts,
const char*, size_t, RAI_Error*);
void (*model_free)(RAI_Model*, RAI_Error*);
int (*model_run)(RAI_ModelRunCtx*, RAI_Error*);
int (*model_run)(RAI_ModelRunCtx**, RAI_Error*);
int (*model_serialize)(RAI_Model*, char**, size_t*, RAI_Error*);

RAI_Script* (*script_create)(const char*, const char*, RAI_Error*);
Expand Down
18 changes: 9 additions & 9 deletions src/backends/onnxruntime.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,28 +401,28 @@ void RAI_ModelFreeORT(RAI_Model* model, RAI_Error* error) {
model->session = NULL;
}

int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error)
int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error)
{
const OrtApi* ort = OrtGetApiBase()->GetApi(1);

OrtSession *session = mctx->model->session;
OrtSession *session = mctxs[0]->model->session;

if (session == NULL) {
RAI_SetError(error, RAI_EMODELRUN, "ERR ONNXRuntime session was not allocated");
return 1;
}

const size_t nbatches = array_len(mctx->batches);
const size_t nbatches = array_len(mctxs);
if (nbatches == 0) {
RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run");
return 1;
}

size_t batch_sizes[nbatches];
size_t batch_offsets[nbatches];
if (array_len(mctx->batches[0].inputs) > 0) {
if (array_len(mctxs[0]->inputs) > 0) {
for (size_t b=0; b<nbatches; ++b) {
batch_sizes[b] = RAI_TensorDim(mctx->batches[b].inputs[0].tensor, 0);
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
}
batch_offsets[0] = 0;
for (size_t b=1; b<nbatches; ++b) {
Expand Down Expand Up @@ -457,8 +457,8 @@ int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error)
OrtValue *inputs[n_input_nodes];
OrtValue *outputs[n_output_nodes];

const size_t ninputs = array_len(mctx->batches[0].inputs);
const size_t noutputs = array_len(mctx->batches[0].outputs);
const size_t ninputs = array_len(mctxs[0]->inputs);
const size_t noutputs = array_len(mctxs[0]->outputs);

if (ninputs != n_input_nodes) {
char msg[70];
Expand All @@ -485,7 +485,7 @@ int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error)

RAI_Tensor* batched_input_tensors[nbatches];
for (size_t b=0; b<nbatches; b++) {
batched_input_tensors[b] = mctx->batches[b].inputs[i].tensor;
batched_input_tensors[b] = mctxs[b]->inputs[i].tensor;
}

inputs[i] = RAI_OrtValueFromTensors(batched_input_tensors, nbatches, error);
Expand Down Expand Up @@ -545,7 +545,7 @@ int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error)
return 1;
}
if (output_tensor) {
mctx->batches[b].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
mctxs[b]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
RAI_TensorFree(output_tensor);
}
else {
Expand Down
2 changes: 1 addition & 1 deletion src/backends/onnxruntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RAI_Model *RAI_ModelCreateORT(RAI_Backend backend, const char* devicestr, RAI_M

void RAI_ModelFreeORT(RAI_Model *model, RAI_Error *error);

int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error);
int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error);

int RAI_ModelSerializeORT(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error);

Expand Down
22 changes: 11 additions & 11 deletions src/backends/tensorflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,27 +419,27 @@ void RAI_ModelFreeTF(RAI_Model* model, RAI_Error* error) {
TF_DeleteStatus(status);
}

int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) {
int RAI_ModelRunTF(RAI_ModelRunCtx** mctxs, RAI_Error *error) {
TF_Status *status = TF_NewStatus();

const size_t nbatches = array_len(mctx->batches);
const size_t nbatches = array_len(mctxs);
if (nbatches == 0) {
RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run");
return 1;
}

const size_t ninputs = array_len(mctx->batches[0].inputs);
const size_t noutputs = array_len(mctx->batches[0].outputs);
const size_t ninputs = array_len(mctxs[0]->inputs);
const size_t noutputs = array_len(mctxs[0]->outputs);
TF_Tensor* inputTensorsValues[ninputs];
TF_Output inputs[ninputs];
TF_Tensor* outputTensorsValues[noutputs];
TF_Output outputs[noutputs];

size_t batch_sizes[nbatches];
size_t batch_offsets[nbatches];
if (array_len(mctx->batches[0].inputs) > 0) {
if (array_len(mctxs[0]->inputs) > 0) {
for (size_t b=0; b<nbatches; ++b) {
batch_sizes[b] = RAI_TensorDim(mctx->batches[b].inputs[0].tensor, 0);
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
}
batch_offsets[0] = 0;
for (size_t b=1; b<nbatches; ++b) {
Expand All @@ -451,12 +451,12 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) {
RAI_Tensor* batched_input_tensors[nbatches];

for (size_t b=0; b<nbatches; ++b) {
batched_input_tensors[b] = mctx->batches[b].inputs[i].tensor;
batched_input_tensors[b] = mctxs[b]->inputs[i].tensor;
}
// inputTensorsValues[i] = RAI_TFTensorFromTensor(mctx->inputs[i].tensor);
inputTensorsValues[i] = RAI_TFTensorFromTensors(batched_input_tensors, nbatches);
TF_Output port;
port.oper = TF_GraphOperationByName(mctx->model->model, mctx->batches[0].inputs[i].name);
port.oper = TF_GraphOperationByName(mctxs[0]->model->model, mctxs[0]->inputs[i].name);
port.index = 0;
if(port.oper == NULL){
return 1;
Expand All @@ -466,15 +466,15 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) {

for (size_t i=0 ; i<noutputs; ++i) {
TF_Output port;
port.oper = TF_GraphOperationByName(mctx->model->model, mctx->batches[0].outputs[i].name);
port.oper = TF_GraphOperationByName(mctxs[0]->model->model, mctxs[0]->outputs[i].name);
port.index = 0;
if(port.oper == NULL){
return 1;
}
outputs[i] = port;
}

TF_SessionRun(mctx->model->session, NULL /* run_options */,
TF_SessionRun(mctxs[0]->model->session, NULL /* run_options */,
inputs, inputTensorsValues, ninputs,
outputs, outputTensorsValues, noutputs,
NULL /* target_opers */, 0 /* ntargets */,
Expand All @@ -496,7 +496,7 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) {
for(size_t i=0; i<noutputs; ++i) {
for (size_t b=0; b<nbatches; b++) {
RAI_Tensor* output_tensor = RAI_TensorCreateFromTFTensor(outputTensorsValues[i], batch_offsets[b], batch_sizes[b]);
mctx->batches[b].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
mctxs[b]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
RAI_TensorFree(output_tensor);
}
TF_DeleteTensor(outputTensorsValues[i]);
Expand Down
2 changes: 1 addition & 1 deletion src/backends/tensorflow.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RAI_Model *RAI_ModelCreateTF(RAI_Backend backend, const char* devicestr, RAI_Mod

void RAI_ModelFreeTF(RAI_Model *model, RAI_Error *error);

int RAI_ModelRunTF(RAI_ModelRunCtx *mctx, RAI_Error *error);
int RAI_ModelRunTF(RAI_ModelRunCtx **mctxs, RAI_Error *error);

int RAI_ModelSerializeTF(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error);

Expand Down
22 changes: 11 additions & 11 deletions src/backends/tflite.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ void RAI_ModelFreeTFLite(RAI_Model* model, RAI_Error *error) {
model->model = NULL;
}

int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
int RAI_ModelRunTFLite(RAI_ModelRunCtx** mctxs, RAI_Error *error) {

const size_t nbatches = array_len(mctx->batches);
const size_t nbatches = array_len(mctxs);
if (nbatches == 0) {
RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run");
return 1;
}

const size_t ninputs = array_len(mctx->batches[0].inputs);
const size_t noutputs = array_len(mctx->batches[0].outputs);
const size_t ninputs = array_len(mctxs[0]->inputs);
const size_t noutputs = array_len(mctxs[0]->outputs);

RAI_Tensor* inputs[ninputs];

Expand All @@ -103,9 +103,9 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
size_t total_batch_size = 0;

if (nbatches > 1) {
if (array_len(mctx->batches[0].inputs) > 0) {
if (array_len(mctxs[0]->inputs) > 0) {
for (size_t b=0; b<nbatches; ++b) {
batch_sizes[b] = RAI_TensorDim(mctx->batches[b].inputs[0].tensor, 0);
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
total_batch_size += batch_sizes[b];
}
batch_offsets[0] = 0;
Expand All @@ -118,7 +118,7 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
RAI_Tensor* batch[nbatches];

for (size_t b=0; b<nbatches; b++) {
batch[b] = mctx->batches[b].inputs[i].tensor;
batch[b] = mctxs[b]->inputs[i].tensor;
}

inputs[i] = RAI_TensorCreateByConcatenatingTensors(batch, nbatches);
Expand All @@ -127,7 +127,7 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
}
else {
for (size_t i=0 ; i<ninputs; ++i) {
inputs[i] = RAI_TensorGetShallowCopy(mctx->batches[0].inputs[i].tensor);
inputs[i] = RAI_TensorGetShallowCopy(mctxs[0]->inputs[i].tensor);
inputs_dl[i] = &inputs[i]->tensor;
}
}
Expand All @@ -137,7 +137,7 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
}

char* error_descr = NULL;
tfliteRunModel(mctx->model->model,
tfliteRunModel(mctxs[0]->model->model,
ninputs, inputs_dl, noutputs, outputs_dl,
&error_descr, RedisModule_Alloc);

Expand All @@ -160,11 +160,11 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) {
}
if (nbatches > 1) {
for (size_t b=0; b<nbatches; b++) {
mctx->batches[b].outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]);
mctxs[b]->outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]);
}
}
else {
mctx->batches[0].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
mctxs[0]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
}
RAI_TensorFree(output_tensor);
RedisModule_Free(outputs_dl[i]);
Expand Down
2 changes: 1 addition & 1 deletion src/backends/tflite.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RAI_Model *RAI_ModelCreateTFLite(RAI_Backend backend, const char* devicestr, RAI

void RAI_ModelFreeTFLite(RAI_Model *model, RAI_Error *error);

int RAI_ModelRunTFLite(RAI_ModelRunCtx *mctx, RAI_Error *error);
int RAI_ModelRunTFLite(RAI_ModelRunCtx **mctxs, RAI_Error *error);

int RAI_ModelSerializeTFLite(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error);

Expand Down
22 changes: 11 additions & 11 deletions src/backends/torch.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ void RAI_ModelFreeTorch(RAI_Model* model, RAI_Error *error) {
torchDeallocContext(model->model);
}

int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
const size_t nbatches = array_len(mctx->batches);
int RAI_ModelRunTorch(RAI_ModelRunCtx** mctxs, RAI_Error *error) {
const size_t nbatches = array_len(mctxs);
if (nbatches == 0) {
RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run");
return 1;
}

const size_t ninputs = array_len(mctx->batches[0].inputs);
const size_t noutputs = array_len(mctx->batches[0].outputs);
const size_t ninputs = array_len(mctxs[0]->inputs);
const size_t noutputs = array_len(mctxs[0]->outputs);

RAI_Tensor* inputs[ninputs];

Expand All @@ -88,9 +88,9 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {

if (nbatches > 1) {
size_t total_batch_size = 0;
if (array_len(mctx->batches[0].inputs) > 0) {
if (array_len(mctxs[0]->inputs) > 0) {
for (size_t b=0; b<nbatches; ++b) {
batch_sizes[b] = RAI_TensorDim(mctx->batches[b].inputs[0].tensor, 0);
batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0);
total_batch_size += batch_sizes[b];
}
batch_offsets[0] = 0;
Expand All @@ -103,7 +103,7 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
RAI_Tensor* batch[nbatches];

for (size_t b=0; b<nbatches; b++) {
batch[b] = mctx->batches[b].inputs[i].tensor;
batch[b] = mctxs[b]->inputs[i].tensor;
}

inputs[i] = RAI_TensorCreateByConcatenatingTensors(batch, nbatches);
Expand All @@ -112,7 +112,7 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
}
else {
for (size_t i=0 ; i<ninputs; ++i) {
inputs[i] = RAI_TensorGetShallowCopy(mctx->batches[0].inputs[i].tensor);
inputs[i] = RAI_TensorGetShallowCopy(mctxs[0]->inputs[i].tensor);
inputs_dl[i] = &inputs[i]->tensor;
}
}
Expand All @@ -122,7 +122,7 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
}

char* error_descr = NULL;
torchRunModel(mctx->model->model,
torchRunModel(mctxs[0]->model->model,
ninputs, inputs_dl, noutputs, outputs_dl,
&error_descr, RedisModule_Alloc);

Expand All @@ -140,11 +140,11 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) {
RAI_Tensor* output_tensor = RAI_TensorCreateFromDLTensor(outputs_dl[i]);
if (nbatches > 1) {
for (size_t b=0; b<nbatches; b++) {
mctx->batches[b].outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]);
mctxs[b]->outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]);
}
}
else {
mctx->batches[0].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
mctxs[0]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor);
}
RAI_TensorFree(output_tensor);
}
Expand Down
2 changes: 1 addition & 1 deletion src/backends/torch.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ RAI_Model *RAI_ModelCreateTorch(RAI_Backend backend, const char* devicestr, RAI_

void RAI_ModelFreeTorch(RAI_Model *model, RAI_Error *error);

int RAI_ModelRunTorch(RAI_ModelRunCtx *mctx, RAI_Error *error);
int RAI_ModelRunTorch(RAI_ModelRunCtx **mctxs, RAI_Error *error);

int RAI_ModelSerializeTorch(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error);

Expand Down
Loading