From f7e396f3ef935e05214d6eb25e609b4165c1f9d3 Mon Sep 17 00:00:00 2001 From: Luca Antiga Date: Thu, 16 Apr 2020 19:06:13 +0200 Subject: [PATCH] Refactor batching mechanism, solve threading issue --- src/backends.c | 8 +-- src/backends.h | 2 +- src/backends/onnxruntime.c | 18 +++--- src/backends/onnxruntime.h | 2 +- src/backends/tensorflow.c | 22 ++++---- src/backends/tensorflow.h | 2 +- src/backends/tflite.c | 22 ++++---- src/backends/tflite.h | 2 +- src/backends/torch.c | 22 ++++---- src/backends/torch.h | 2 +- src/model.c | 109 +++++++++++-------------------------- src/model.h | 15 ++--- src/model_struct.h | 8 +-- src/redisai.c | 75 ++++++++----------------- 14 files changed, 114 insertions(+), 195 deletions(-) diff --git a/src/backends.c b/src/backends.c index a45044cc1..b6bd6306c 100644 --- a/src/backends.c +++ b/src/backends.c @@ -92,7 +92,7 @@ int RAI_LoadBackend_TensorFlow(RedisModuleCtx *ctx, const char *path) { return REDISMODULE_ERR; } - backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*)) + backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*)) (unsigned long) dlsym(handle, "RAI_ModelRunTF"); if (backend.model_run == NULL) { dlclose(handle); @@ -157,7 +157,7 @@ int RAI_LoadBackend_TFLite(RedisModuleCtx *ctx, const char *path) { return REDISMODULE_ERR; } - backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*)) + backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*)) (unsigned long) dlsym(handle, "RAI_ModelRunTFLite"); if (backend.model_run == NULL) { dlclose(handle); @@ -222,7 +222,7 @@ int RAI_LoadBackend_Torch(RedisModuleCtx *ctx, const char *path) { return REDISMODULE_ERR; } - backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*)) + backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*)) (unsigned long) dlsym(handle, "RAI_ModelRunTorch"); if (backend.model_run == NULL) { dlclose(handle); @@ -311,7 +311,7 @@ int RAI_LoadBackend_ONNXRuntime(RedisModuleCtx *ctx, const char *path) { return REDISMODULE_ERR; } - backend.model_run = (int (*)(RAI_ModelRunCtx*, RAI_Error*)) + backend.model_run = (int (*)(RAI_ModelRunCtx**, RAI_Error*)) (unsigned long) dlsym(handle, "RAI_ModelRunORT"); if (backend.model_run == NULL) { dlclose(handle); diff --git a/src/backends.h b/src/backends.h index 6ae6871a0..e94e5e338 100644 --- a/src/backends.h +++ b/src/backends.h @@ -14,7 +14,7 @@ typedef struct RAI_LoadedBackend { RAI_Model* (*model_create)(RAI_Backend, const char*, RAI_ModelOpts, const char*, size_t, RAI_Error*); void (*model_free)(RAI_Model*, RAI_Error*); - int (*model_run)(RAI_ModelRunCtx*, RAI_Error*); + int (*model_run)(RAI_ModelRunCtx**, RAI_Error*); int (*model_serialize)(RAI_Model*, char**, size_t*, RAI_Error*); RAI_Script* (*script_create)(const char*, const char*, RAI_Error*); diff --git a/src/backends/onnxruntime.c b/src/backends/onnxruntime.c index c740733a5..7281e419e 100644 --- a/src/backends/onnxruntime.c +++ b/src/backends/onnxruntime.c @@ -401,18 +401,18 @@ void RAI_ModelFreeORT(RAI_Model* model, RAI_Error* error) { model->session = NULL; } -int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error) +int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error) { const OrtApi* ort = OrtGetApiBase()->GetApi(1); - OrtSession *session = mctx->model->session; + OrtSession *session = mctxs[0]->model->session; if (session == NULL) { RAI_SetError(error, RAI_EMODELRUN, "ERR ONNXRuntime session was not allocated"); return 1; } - const size_t nbatches = array_len(mctx->batches); + const size_t nbatches = array_len(mctxs); if (nbatches == 0) { RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run"); return 1; @@ -420,9 +420,9 @@ int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error) size_t batch_sizes[nbatches]; size_t batch_offsets[nbatches]; - if (array_len(mctx->batches[0].inputs) > 0) { + if (array_len(mctxs[0]->inputs) > 0) { for (size_t b=0; bbatches[b].inputs[0].tensor, 0); + batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0); } batch_offsets[0] = 0; for (size_t b=1; bbatches[0].inputs); - const size_t noutputs = array_len(mctx->batches[0].outputs); + const size_t ninputs = array_len(mctxs[0]->inputs); + const size_t noutputs = array_len(mctxs[0]->outputs); if (ninputs != n_input_nodes) { char msg[70]; @@ -485,7 +485,7 @@ int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error) RAI_Tensor* batched_input_tensors[nbatches]; for (size_t b=0; bbatches[b].inputs[i].tensor; + batched_input_tensors[b] = mctxs[b]->inputs[i].tensor; } inputs[i] = RAI_OrtValueFromTensors(batched_input_tensors, nbatches, error); @@ -545,7 +545,7 @@ int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error) return 1; } if (output_tensor) { - mctx->batches[b].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor); + mctxs[b]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor); RAI_TensorFree(output_tensor); } else { diff --git a/src/backends/onnxruntime.h b/src/backends/onnxruntime.h index 86b83633d..a1b4184d5 100644 --- a/src/backends/onnxruntime.h +++ b/src/backends/onnxruntime.h @@ -14,7 +14,7 @@ RAI_Model *RAI_ModelCreateORT(RAI_Backend backend, const char* devicestr, RAI_M void RAI_ModelFreeORT(RAI_Model *model, RAI_Error *error); -int RAI_ModelRunORT(RAI_ModelRunCtx *mctx, RAI_Error *error); +int RAI_ModelRunORT(RAI_ModelRunCtx **mctxs, RAI_Error *error); int RAI_ModelSerializeORT(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error); diff --git a/src/backends/tensorflow.c b/src/backends/tensorflow.c index e8c7d4222..8dd9405e5 100644 --- a/src/backends/tensorflow.c +++ b/src/backends/tensorflow.c @@ -419,17 +419,17 @@ void RAI_ModelFreeTF(RAI_Model* model, RAI_Error* error) { TF_DeleteStatus(status); } -int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) { +int RAI_ModelRunTF(RAI_ModelRunCtx** mctxs, RAI_Error *error) { TF_Status *status = TF_NewStatus(); - const size_t nbatches = array_len(mctx->batches); + const size_t nbatches = array_len(mctxs); if (nbatches == 0) { RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run"); return 1; } - const size_t ninputs = array_len(mctx->batches[0].inputs); - const size_t noutputs = array_len(mctx->batches[0].outputs); + const size_t ninputs = array_len(mctxs[0]->inputs); + const size_t noutputs = array_len(mctxs[0]->outputs); TF_Tensor* inputTensorsValues[ninputs]; TF_Output inputs[ninputs]; TF_Tensor* outputTensorsValues[noutputs]; @@ -437,9 +437,9 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) { size_t batch_sizes[nbatches]; size_t batch_offsets[nbatches]; - if (array_len(mctx->batches[0].inputs) > 0) { + if (array_len(mctxs[0]->inputs) > 0) { for (size_t b=0; bbatches[b].inputs[0].tensor, 0); + batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0); } batch_offsets[0] = 0; for (size_t b=1; bbatches[b].inputs[i].tensor; + batched_input_tensors[b] = mctxs[b]->inputs[i].tensor; } // inputTensorsValues[i] = RAI_TFTensorFromTensor(mctx->inputs[i].tensor); inputTensorsValues[i] = RAI_TFTensorFromTensors(batched_input_tensors, nbatches); TF_Output port; - port.oper = TF_GraphOperationByName(mctx->model->model, mctx->batches[0].inputs[i].name); + port.oper = TF_GraphOperationByName(mctxs[0]->model->model, mctxs[0]->inputs[i].name); port.index = 0; if(port.oper == NULL){ return 1; @@ -466,7 +466,7 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) { for (size_t i=0 ; imodel->model, mctx->batches[0].outputs[i].name); + port.oper = TF_GraphOperationByName(mctxs[0]->model->model, mctxs[0]->outputs[i].name); port.index = 0; if(port.oper == NULL){ return 1; @@ -474,7 +474,7 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) { outputs[i] = port; } - TF_SessionRun(mctx->model->session, NULL /* run_options */, + TF_SessionRun(mctxs[0]->model->session, NULL /* run_options */, inputs, inputTensorsValues, ninputs, outputs, outputTensorsValues, noutputs, NULL /* target_opers */, 0 /* ntargets */, @@ -496,7 +496,7 @@ int RAI_ModelRunTF(RAI_ModelRunCtx* mctx, RAI_Error *error) { for(size_t i=0; ibatches[b].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor); + mctxs[b]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor); RAI_TensorFree(output_tensor); } TF_DeleteTensor(outputTensorsValues[i]); diff --git a/src/backends/tensorflow.h b/src/backends/tensorflow.h index b59cfdde7..2f28099d8 100644 --- a/src/backends/tensorflow.h +++ b/src/backends/tensorflow.h @@ -16,7 +16,7 @@ RAI_Model *RAI_ModelCreateTF(RAI_Backend backend, const char* devicestr, RAI_Mod void RAI_ModelFreeTF(RAI_Model *model, RAI_Error *error); -int RAI_ModelRunTF(RAI_ModelRunCtx *mctx, RAI_Error *error); +int RAI_ModelRunTF(RAI_ModelRunCtx **mctxs, RAI_Error *error); int RAI_ModelSerializeTF(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error); diff --git a/src/backends/tflite.c b/src/backends/tflite.c index c448f93df..78f83e294 100644 --- a/src/backends/tflite.c +++ b/src/backends/tflite.c @@ -82,16 +82,16 @@ void RAI_ModelFreeTFLite(RAI_Model* model, RAI_Error *error) { model->model = NULL; } -int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) { +int RAI_ModelRunTFLite(RAI_ModelRunCtx** mctxs, RAI_Error *error) { - const size_t nbatches = array_len(mctx->batches); + const size_t nbatches = array_len(mctxs); if (nbatches == 0) { RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run"); return 1; } - const size_t ninputs = array_len(mctx->batches[0].inputs); - const size_t noutputs = array_len(mctx->batches[0].outputs); + const size_t ninputs = array_len(mctxs[0]->inputs); + const size_t noutputs = array_len(mctxs[0]->outputs); RAI_Tensor* inputs[ninputs]; @@ -103,9 +103,9 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) { size_t total_batch_size = 0; if (nbatches > 1) { - if (array_len(mctx->batches[0].inputs) > 0) { + if (array_len(mctxs[0]->inputs) > 0) { for (size_t b=0; bbatches[b].inputs[0].tensor, 0); + batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0); total_batch_size += batch_sizes[b]; } batch_offsets[0] = 0; @@ -118,7 +118,7 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) { RAI_Tensor* batch[nbatches]; for (size_t b=0; bbatches[b].inputs[i].tensor; + batch[b] = mctxs[b]->inputs[i].tensor; } inputs[i] = RAI_TensorCreateByConcatenatingTensors(batch, nbatches); @@ -127,7 +127,7 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) { } else { for (size_t i=0 ; ibatches[0].inputs[i].tensor); + inputs[i] = RAI_TensorGetShallowCopy(mctxs[0]->inputs[i].tensor); inputs_dl[i] = &inputs[i]->tensor; } } @@ -137,7 +137,7 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) { } char* error_descr = NULL; - tfliteRunModel(mctx->model->model, + tfliteRunModel(mctxs[0]->model->model, ninputs, inputs_dl, noutputs, outputs_dl, &error_descr, RedisModule_Alloc); @@ -160,11 +160,11 @@ int RAI_ModelRunTFLite(RAI_ModelRunCtx* mctx, RAI_Error *error) { } if (nbatches > 1) { for (size_t b=0; bbatches[b].outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]); + mctxs[b]->outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]); } } else { - mctx->batches[0].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor); + mctxs[0]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor); } RAI_TensorFree(output_tensor); RedisModule_Free(outputs_dl[i]); diff --git a/src/backends/tflite.h b/src/backends/tflite.h index 50c48e370..a7af0d523 100644 --- a/src/backends/tflite.h +++ b/src/backends/tflite.h @@ -14,7 +14,7 @@ RAI_Model *RAI_ModelCreateTFLite(RAI_Backend backend, const char* devicestr, RAI void RAI_ModelFreeTFLite(RAI_Model *model, RAI_Error *error); -int RAI_ModelRunTFLite(RAI_ModelRunCtx *mctx, RAI_Error *error); +int RAI_ModelRunTFLite(RAI_ModelRunCtx **mctxs, RAI_Error *error); int RAI_ModelSerializeTFLite(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error); diff --git a/src/backends/torch.c b/src/backends/torch.c index b350333a7..671192a65 100644 --- a/src/backends/torch.c +++ b/src/backends/torch.c @@ -68,15 +68,15 @@ void RAI_ModelFreeTorch(RAI_Model* model, RAI_Error *error) { torchDeallocContext(model->model); } -int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) { - const size_t nbatches = array_len(mctx->batches); +int RAI_ModelRunTorch(RAI_ModelRunCtx** mctxs, RAI_Error *error) { + const size_t nbatches = array_len(mctxs); if (nbatches == 0) { RAI_SetError(error, RAI_EMODELRUN, "ERR No batches to run"); return 1; } - const size_t ninputs = array_len(mctx->batches[0].inputs); - const size_t noutputs = array_len(mctx->batches[0].outputs); + const size_t ninputs = array_len(mctxs[0]->inputs); + const size_t noutputs = array_len(mctxs[0]->outputs); RAI_Tensor* inputs[ninputs]; @@ -88,9 +88,9 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) { if (nbatches > 1) { size_t total_batch_size = 0; - if (array_len(mctx->batches[0].inputs) > 0) { + if (array_len(mctxs[0]->inputs) > 0) { for (size_t b=0; bbatches[b].inputs[0].tensor, 0); + batch_sizes[b] = RAI_TensorDim(mctxs[b]->inputs[0].tensor, 0); total_batch_size += batch_sizes[b]; } batch_offsets[0] = 0; @@ -103,7 +103,7 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) { RAI_Tensor* batch[nbatches]; for (size_t b=0; bbatches[b].inputs[i].tensor; + batch[b] = mctxs[b]->inputs[i].tensor; } inputs[i] = RAI_TensorCreateByConcatenatingTensors(batch, nbatches); @@ -112,7 +112,7 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) { } else { for (size_t i=0 ; ibatches[0].inputs[i].tensor); + inputs[i] = RAI_TensorGetShallowCopy(mctxs[0]->inputs[i].tensor); inputs_dl[i] = &inputs[i]->tensor; } } @@ -122,7 +122,7 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) { } char* error_descr = NULL; - torchRunModel(mctx->model->model, + torchRunModel(mctxs[0]->model->model, ninputs, inputs_dl, noutputs, outputs_dl, &error_descr, RedisModule_Alloc); @@ -140,11 +140,11 @@ int RAI_ModelRunTorch(RAI_ModelRunCtx* mctx, RAI_Error *error) { RAI_Tensor* output_tensor = RAI_TensorCreateFromDLTensor(outputs_dl[i]); if (nbatches > 1) { for (size_t b=0; bbatches[b].outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]); + mctxs[b]->outputs[i].tensor = RAI_TensorCreateBySlicingTensor(output_tensor, batch_offsets[b], batch_sizes[b]); } } else { - mctx->batches[0].outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor); + mctxs[0]->outputs[i].tensor = RAI_TensorGetShallowCopy(output_tensor); } RAI_TensorFree(output_tensor); } diff --git a/src/backends/torch.h b/src/backends/torch.h index ef245c829..9b4749b30 100644 --- a/src/backends/torch.h +++ b/src/backends/torch.h @@ -15,7 +15,7 @@ RAI_Model *RAI_ModelCreateTorch(RAI_Backend backend, const char* devicestr, RAI_ void RAI_ModelFreeTorch(RAI_Model *model, RAI_Error *error); -int RAI_ModelRunTorch(RAI_ModelRunCtx *mctx, RAI_Error *error); +int RAI_ModelRunTorch(RAI_ModelRunCtx **mctxs, RAI_Error *error); int RAI_ModelSerializeTorch(RAI_Model *model, char **buffer, size_t *len, RAI_Error *error); diff --git a/src/model.c b/src/model.c index 144ae60f1..3028d5ec6 100644 --- a/src/model.c +++ b/src/model.c @@ -304,12 +304,13 @@ void RAI_ModelFree(RAI_Model* model, RAI_Error* err) { } RAI_ModelRunCtx* RAI_ModelRunCtxCreate(RAI_Model* model) { -#define BATCH_INITIAL_SIZE 10 +#define PARAM_INITIAL_SIZE 10 RAI_ModelRunCtx* mctx = RedisModule_Calloc(1, sizeof(*mctx)); mctx->model = RAI_ModelGetShallowCopy(model); - mctx->batches = array_new(RAI_ModelCtxBatch, BATCH_INITIAL_SIZE); -#undef BATCH_INITIAL_SIZE + mctx->inputs = array_new(RAI_ModelCtxParam, PARAM_INITIAL_SIZE); + mctx->outputs = array_new(RAI_ModelCtxParam, PARAM_INITIAL_SIZE); return mctx; +#undef PARAM_INITIAL_SIZE } static int Model_RunCtxAddParam(RAI_ModelRunCtx* mctx, RAI_ModelCtxParam** paramArr, @@ -323,93 +324,44 @@ static int Model_RunCtxAddParam(RAI_ModelRunCtx* mctx, RAI_ModelCtxParam** param return 1; } -int RAI_ModelRunCtxAddInput(RAI_ModelRunCtx* mctx, size_t id, const char* inputName, RAI_Tensor* inputTensor) { - if (id >= RAI_ModelRunCtxNumBatches(mctx)) { - // TODO error - return 0; - } - return Model_RunCtxAddParam(mctx, &mctx->batches[id].inputs, inputName, inputTensor); +int RAI_ModelRunCtxAddInput(RAI_ModelRunCtx* mctx, const char* inputName, RAI_Tensor* inputTensor) { + return Model_RunCtxAddParam(mctx, &mctx->inputs, inputName, inputTensor); } -int RAI_ModelRunCtxAddOutput(RAI_ModelRunCtx* mctx, size_t id, const char* outputName) { - if (id >= RAI_ModelRunCtxNumBatches(mctx)) { - // TODO error - return 0; - } - return Model_RunCtxAddParam(mctx, &mctx->batches[id].outputs, outputName, NULL); +int RAI_ModelRunCtxAddOutput(RAI_ModelRunCtx* mctx, const char* outputName) { + return Model_RunCtxAddParam(mctx, &mctx->outputs, outputName, NULL); } size_t RAI_ModelRunCtxNumInputs(RAI_ModelRunCtx* mctx) { - if (RAI_ModelRunCtxNumBatches(mctx) == 0) { - return 0; - } - // Here we assume batch is well-formed (i.e. number of outputs is equal in all batches) - return array_len(mctx->batches[0].inputs); + return array_len(mctx->inputs); } size_t RAI_ModelRunCtxNumOutputs(RAI_ModelRunCtx* mctx) { - if (RAI_ModelRunCtxNumBatches(mctx) == 0) { - return 0; - } - // Here we assume batch is well-formed (i.e. number of outputs is equal in all batches) - return array_len(mctx->batches[0].outputs); -} - -int RAI_ModelRunCtxAddBatch(RAI_ModelRunCtx* mctx) { -#define PARAM_INITIAL_SIZE 10 - RAI_ModelCtxBatch batch = { - .inputs = array_new(RAI_ModelCtxParam, PARAM_INITIAL_SIZE), - .outputs = array_new(RAI_ModelCtxParam, PARAM_INITIAL_SIZE) - }; -#undef PARAM_INITIAL_SIZE - array_append(mctx->batches, batch); - return array_len(mctx->batches)-1; -} - -size_t RAI_ModelRunCtxNumBatches(RAI_ModelRunCtx* mctx) { - return array_len(mctx->batches); + return array_len(mctx->outputs); } -void RAI_ModelRunCtxCopyBatch(RAI_ModelRunCtx* dest, size_t id_dest, RAI_ModelRunCtx* src, size_t id_src) { - size_t ninputs = array_len(src->batches[id_src].inputs); - for (size_t i=0; ibatches[id_src].inputs[i]; - RAI_ModelRunCtxAddInput(dest, id_dest, param.name, param.tensor); - } - - size_t noutputs = array_len(src->batches[id_src].outputs); - for (size_t i=0; ibatches[id_src].outputs[i]; - RAI_ModelRunCtxAddOutput(dest, id_dest, param.name); - } -} - -RAI_Tensor* RAI_ModelRunCtxInputTensor(RAI_ModelRunCtx* mctx, size_t id, size_t index) { - // TODO: add method to collect from batches? +RAI_Tensor* RAI_ModelRunCtxInputTensor(RAI_ModelRunCtx* mctx, size_t index) { assert(RAI_ModelRunCtxNumInputs(mctx) > index && index >= 0); - return mctx->batches[id].inputs[index].tensor; + return mctx->inputs[index].tensor; } -RAI_Tensor* RAI_ModelRunCtxOutputTensor(RAI_ModelRunCtx* mctx, size_t id, size_t index) { - // TODO: add method to collect from batches? +RAI_Tensor* RAI_ModelRunCtxOutputTensor(RAI_ModelRunCtx* mctx, size_t index) { assert(RAI_ModelRunCtxNumOutputs(mctx) > index && index >= 0); - return mctx->batches[id].outputs[index].tensor; + return mctx->outputs[index].tensor; } void RAI_ModelRunCtxFree(RAI_ModelRunCtx* mctx) { - for (size_t b=0; bbatches); ++b) { - for (size_t i=0; ibatches[b].inputs); ++i) { - RAI_TensorFree(mctx->batches[b].inputs[i].tensor); - } - array_free(mctx->batches[b].inputs); + for (size_t i=0; iinputs); ++i) { + RAI_TensorFree(mctx->inputs[i].tensor); + } + array_free(mctx->inputs); - for (size_t i = 0 ; i < array_len(mctx->batches[b].outputs) ; ++i) { - if (mctx->batches[b].outputs[i].tensor) { - RAI_TensorFree(mctx->batches[b].outputs[i].tensor); - } + for (size_t i = 0 ; i < array_len(mctx->outputs) ; ++i) { + if (mctx->outputs[i].tensor) { + RAI_TensorFree(mctx->outputs[i].tensor); } - array_free(mctx->batches[b].outputs); } + array_free(mctx->outputs); RAI_Error err = {0}; RAI_ModelFree(mctx->model, &err); @@ -422,37 +374,42 @@ void RAI_ModelRunCtxFree(RAI_ModelRunCtx* mctx) { RedisModule_Free(mctx); } -int RAI_ModelRun(RAI_ModelRunCtx* mctx, RAI_Error* err) { +int RAI_ModelRun(RAI_ModelRunCtx** mctxs, RAI_Error* err) { int ret; - switch (mctx->model->backend) { + if (array_len(mctxs) == 0) { + RAI_SetError(err, RAI_EBACKENDNOTLOADED, "ERR Nothing to run"); + return REDISMODULE_ERR; + } + + switch (mctxs[0]->model->backend) { case RAI_BACKEND_TENSORFLOW: if (!RAI_backends.tf.model_run) { RAI_SetError(err, RAI_EBACKENDNOTLOADED, "ERR Backend not loaded: TF"); return REDISMODULE_ERR; } - ret = RAI_backends.tf.model_run(mctx, err); + ret = RAI_backends.tf.model_run(mctxs, err); break; case RAI_BACKEND_TFLITE: if (!RAI_backends.tflite.model_run) { RAI_SetError(err, RAI_EBACKENDNOTLOADED, "ERR Backend not loaded: TFLITE"); return REDISMODULE_ERR; } - ret = RAI_backends.tflite.model_run(mctx, err); + ret = RAI_backends.tflite.model_run(mctxs, err); break; case RAI_BACKEND_TORCH: if (!RAI_backends.torch.model_run) { RAI_SetError(err, RAI_EBACKENDNOTLOADED, "ERR Backend not loaded: TORCH"); return REDISMODULE_ERR; } - ret = RAI_backends.torch.model_run(mctx, err); + ret = RAI_backends.torch.model_run(mctxs, err); break; case RAI_BACKEND_ONNXRUNTIME: if (!RAI_backends.onnx.model_run) { RAI_SetError(err, RAI_EBACKENDNOTLOADED, "ERR Backend not loaded: ONNX"); return REDISMODULE_ERR; } - ret = RAI_backends.onnx.model_run(mctx, err); + ret = RAI_backends.onnx.model_run(mctxs, err); break; default: RAI_SetError(err, RAI_EUNSUPPORTEDBACKEND, "ERR Unsupported backend"); diff --git a/src/model.h b/src/model.h index 2c127278a..87d8d0b3b 100644 --- a/src/model.h +++ b/src/model.h @@ -17,19 +17,16 @@ RAI_Model *RAI_ModelCreate(RAI_Backend backend, const char* devicestr, const cha void RAI_ModelFree(RAI_Model* model, RAI_Error* err); RAI_ModelRunCtx* RAI_ModelRunCtxCreate(RAI_Model* model); +void RAI_ModelRunCtxFree(RAI_ModelRunCtx* mctx); -int RAI_ModelRunCtxAddBatch(RAI_ModelRunCtx* mctx); -size_t RAI_ModelRunCtxNumBatches(RAI_ModelRunCtx* mctx); -void RAI_ModelRunCtxCopyBatch(RAI_ModelRunCtx* dest, size_t id_dest, RAI_ModelRunCtx* src, size_t id_src); -int RAI_ModelRunCtxAddInput(RAI_ModelRunCtx* mctx, size_t id, const char* inputName, RAI_Tensor* inputTensor); -int RAI_ModelRunCtxAddOutput(RAI_ModelRunCtx* mctx, size_t id, const char* outputName); +int RAI_ModelRunCtxAddInput(RAI_ModelRunCtx* mctx, const char* inputName, RAI_Tensor* inputTensor); +int RAI_ModelRunCtxAddOutput(RAI_ModelRunCtx* mctx, const char* outputName); size_t RAI_ModelRunCtxNumInputs(RAI_ModelRunCtx* mctx); size_t RAI_ModelRunCtxNumOutputs(RAI_ModelRunCtx* mctx); -RAI_Tensor* RAI_ModelRunCtxInputTensor(RAI_ModelRunCtx* mctx, size_t id, size_t index); -RAI_Tensor* RAI_ModelRunCtxOutputTensor(RAI_ModelRunCtx* mctx, size_t id, size_t index); -void RAI_ModelRunCtxFree(RAI_ModelRunCtx* mctx); +RAI_Tensor* RAI_ModelRunCtxInputTensor(RAI_ModelRunCtx* mctx, size_t index); +RAI_Tensor* RAI_ModelRunCtxOutputTensor(RAI_ModelRunCtx* mctx, size_t index); -int RAI_ModelRun(RAI_ModelRunCtx* mctx, RAI_Error* err); +int RAI_ModelRun(RAI_ModelRunCtx** mctxs, RAI_Error* err); RAI_Model* RAI_ModelGetShallowCopy(RAI_Model* model); int RAI_ModelSerialize(RAI_Model *model, char **buffer, size_t *len, RAI_Error *err); diff --git a/src/model_struct.h b/src/model_struct.h index 3a7a38abc..074785e41 100644 --- a/src/model_struct.h +++ b/src/model_struct.h @@ -33,15 +33,11 @@ typedef struct RAI_ModelCtxParam { RAI_Tensor* tensor; } RAI_ModelCtxParam; -typedef struct RAI_ModelCtxBatch { - RAI_ModelCtxParam* inputs; - RAI_ModelCtxParam* outputs; -} RAI_ModelCtxBatch; - typedef struct RAI_ModelRunCtx { size_t ctxtype; RAI_Model* model; - RAI_ModelCtxBatch* batches; + RAI_ModelCtxParam* inputs; + RAI_ModelCtxParam* outputs; } RAI_ModelRunCtx; #endif /* SRC_MODEL_STRUCT_H_ */ diff --git a/src/redisai.c b/src/redisai.c index f04563c90..5bc8876ab 100644 --- a/src/redisai.c +++ b/src/redisai.c @@ -212,31 +212,25 @@ void *RedisAI_RunSession(struct RedisAI_RunInfo **batch_rinfo) { return NULL; } + RAI_ModelRunCtx** mctxs = NULL; + RAI_ScriptRunCtx* sctx = NULL; + RAI_Error* err = RedisModule_Calloc(1, sizeof(RAI_Error)); long long rtime; int status; - RAI_ModelRunCtx* mctx = NULL; - RAI_ScriptRunCtx* sctx = NULL; if (batch_rinfo[0]->mctx) { - if (batch_size > 1) { - mctx = RAI_ModelRunCtxCreate(batch_rinfo[0]->mctx->model); - for (long long i=0; imctx, 0); - } - } - else { - mctx = batch_rinfo[0]->mctx; + mctxs = array_new(RAI_ModelRunCtx*, batch_size); + for (long long i=0; imctx); } } else if (batch_rinfo[0]->sctx) { - // No batching for scripts for now sctx = batch_rinfo[0]->sctx; } const long long start = ustime(); - if (mctx) { - status = RAI_ModelRun(mctx, err); + if (mctxs) { + status = RAI_ModelRun(mctxs, err); } else if (sctx) { status = RAI_ScriptRun(sctx, err); @@ -245,26 +239,6 @@ void *RedisAI_RunSession(struct RedisAI_RunInfo **batch_rinfo) { for (long long i=0; i 1) { - size_t noutputs = RAI_ModelRunCtxNumOutputs(mctx); - for (long long o=0; obatches[i].outputs[o].tensor; - if (tensor) { - rinfo->mctx->batches[0].outputs[o].tensor = RAI_TensorGetShallowCopy(tensor); - } - else { - rinfo->mctx->batches[0].outputs[o].tensor = NULL; - } - } - } - else { - // Do nothing if no batching - } - } - else if (sctx) { - // No batching for scripts for now - } rinfo->status = status; rinfo->err = RedisModule_Calloc(1, sizeof(RAI_Error)); @@ -282,13 +256,8 @@ void *RedisAI_RunSession(struct RedisAI_RunInfo **batch_rinfo) { } } - if (mctx) { - if (batch_size > 1) { - RAI_ModelRunCtxFree(mctx); - } - else { - // Do nothing if no batching - } + if (mctxs) { + array_free(mctxs); } else if (sctx) { // No batching for scripts for now @@ -378,7 +347,7 @@ int RedisAI_Run_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { } RAI_Tensor *t = NULL; if (rinfo->mctx) { - t = RAI_ModelRunCtxOutputTensor(rinfo->mctx, 0, i); + t = RAI_ModelRunCtxOutputTensor(rinfo->mctx, i); if (t && batch_size == 0) { batch_size = RAI_TensorDim(t, 0); } @@ -427,7 +396,7 @@ size_t RAI_RunInfoBatchSize(struct RedisAI_RunInfo* rinfo) { } for (size_t i=0; imctx, 0, i); + RAI_Tensor* input = RAI_ModelRunCtxInputTensor(rinfo->mctx, i); if (i == 0) { batchsize = RAI_TensorDim(input, 0); @@ -460,8 +429,8 @@ int RAI_RunInfoBatchable(struct RedisAI_RunInfo* rinfo1, struct RedisAI_RunInfo* } for (int i=0; imctx, 0, i); - RAI_Tensor* input2 = RAI_ModelRunCtxInputTensor(rinfo2->mctx, 0, i); + RAI_Tensor* input1 = RAI_ModelRunCtxInputTensor(rinfo1->mctx, i); + RAI_Tensor* input2 = RAI_ModelRunCtxInputTensor(rinfo2->mctx, i); int ndims1 = RAI_TensorNumDims(input1); int ndims2 = RAI_TensorNumDims(input2); @@ -512,8 +481,8 @@ void *RedisAI_Run_ThreadMain(void *arg) { evicted_items = array_new(queueItem *, run_queue_len); batch_rinfo = array_new(struct RedisAI_RunInfo *, run_queue_len); - array_append(evicted_items, item); - array_append(batch_rinfo, rinfo); + evicted_items = array_append(evicted_items, item); + batch_rinfo = array_append(batch_rinfo, rinfo); if (rinfo->sctx) { break; @@ -548,8 +517,8 @@ void *RedisAI_Run_ThreadMain(void *arg) { break; } - array_append(evicted_items, next_item); - array_append(batch_rinfo, next_rinfo); + evicted_items = array_append(evicted_items, next_item); + batch_rinfo = array_append(batch_rinfo, next_rinfo); current_batchsize += next_batchsize; next_item = queueNext(next_item); @@ -1182,12 +1151,12 @@ int RedisAI_ModelRun_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, RedisModule_RetainString(NULL, argv[1]); rinfo->runkey = argv[1]; rinfo->mctx = RAI_ModelRunCtxCreate(mto); + // rinfo->mctxs = array_new(RAI_ModelRunCtx, 10); + // rinfo->mctxs = array_append(rinfo->mctxs, ); rinfo->sctx = NULL; rinfo->outkeys = NULL; rinfo->err = NULL; - RAI_ModelRunCtxAddBatch(rinfo->mctx); - // parsing aux vars int is_input = 0; size_t ninputs = 0; @@ -1222,7 +1191,7 @@ int RedisAI_ModelRun_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, if (mto->inputs) { opname = mto->inputs[ninputs]; } - if (!RAI_ModelRunCtxAddInput(rinfo->mctx, 0, opname, inputTensor)) { + if (!RAI_ModelRunCtxAddInput(rinfo->mctx, opname, inputTensor)) { // todo free rinfo return RedisModule_ReplyWithError(ctx, "ERR Input key not found"); } @@ -1233,7 +1202,7 @@ int RedisAI_ModelRun_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, if (mto->outputs) { opname = mto->outputs[noutputs]; } - if (!RAI_ModelRunCtxAddOutput(rinfo->mctx, 0, opname)) { + if (!RAI_ModelRunCtxAddOutput(rinfo->mctx, opname)) { // todo free rinfo return RedisModule_ReplyWithError(ctx, "ERR Output key not found"); }