diff --git a/src/backends/tensorflow.c b/src/backends/tensorflow.c index 1baed43e9..997f0e5d6 100644 --- a/src/backends/tensorflow.c +++ b/src/backends/tensorflow.c @@ -15,6 +15,10 @@ int RAI_InitBackendTF(int (*get_api_fn)(const char *, void *)) { get_api_fn("RedisModule_Realloc", ((void **)&RedisModule_Realloc)); get_api_fn("RedisModule_Strdup", ((void **)&RedisModule_Strdup)); + // Set min logging level to 3 (out of 5) - this is workaround since if TF is writing extensively + // log messages that to stderr, it may cause the system to be stuck. + RedisModule_Assert(putenv("TF_CPP_MIN_LOG_LEVEL=3") == 0); + return REDISMODULE_OK; } diff --git a/src/execution/DAG/dag.c b/src/execution/DAG/dag.c index a5df92fe9..e1bbb8d5f 100644 --- a/src/execution/DAG/dag.c +++ b/src/execution/DAG/dag.c @@ -72,7 +72,7 @@ static void Dag_LoadInputsToCurrentOp(RedisAI_RunInfo *rinfo, RAI_DagOp *current } for (uint i = 0; i < n_outkeys; i++) { - RAI_ExecutionCtx_AddOuputPlaceholder(currentOp->ectx); + RAI_ExecutionCtx_AddOutputPlaceholder(currentOp->ectx); } } @@ -260,7 +260,7 @@ void RedisAI_DagRunSession_ScriptRun_Step(RedisAI_RunInfo *rinfo, RAI_DagOp *cur RAI_ExecutionCtx_AddInput(currentOp->ectx, inputTensors[i]); } for (uint i = 0; i < n_outkeys; i++) { - RAI_ExecutionCtx_AddOuputPlaceholder(currentOp->ectx); + RAI_ExecutionCtx_AddOutputPlaceholder(currentOp->ectx); } } @@ -574,44 +574,15 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc break; } - case REDISAI_DAG_CMD_MODELRUN: { - rinfo->dagReplyLength++; - struct RedisAI_RunStats *rstats = NULL; - RAI_GetRunStats(currentOp->runkey, &rstats); - if (currentOp->result == REDISMODULE_ERR) { - RAI_SafeAddDataPoint(rstats, 0, 1, 1, 0); - RedisModule_ReplyWithError(ctx, currentOp->err->detail_oneline); - dag_error = 1; - } else if (currentOp->result == -1) { - RedisModule_ReplyWithSimpleString(ctx, "NA"); - } else { - RAI_Tensor *t = NULL; - if (RAI_ExecutionCtx_NumOutputs(currentOp->ectx) > 0) { - t = RAI_ExecutionCtx_GetOutput(currentOp->ectx, 0); - } - int batch_size = 0; - if (t) { - batch_size = RAI_TensorDim(t, 0); - } - RAI_SafeAddDataPoint(rstats, currentOp->duration_us, 1, 0, batch_size); - RedisModule_ReplyWithSimpleString(ctx, "OK"); - } - break; - } - + case REDISAI_DAG_CMD_MODELRUN: case REDISAI_DAG_CMD_SCRIPTRUN: { rinfo->dagReplyLength++; - struct RedisAI_RunStats *rstats = NULL; - RAI_GetRunStats(currentOp->runkey, &rstats); if (currentOp->result == REDISMODULE_ERR) { - RAI_SafeAddDataPoint(rstats, 0, 1, 1, 0); RedisModule_ReplyWithError(ctx, currentOp->err->detail_oneline); dag_error = 1; } else if (currentOp->result == -1) { RedisModule_ReplyWithSimpleString(ctx, "NA"); } else { - int batch_size = 1; - RAI_SafeAddDataPoint(rstats, currentOp->duration_us, 1, 0, batch_size); RedisModule_ReplyWithSimpleString(ctx, "OK"); } break; diff --git a/src/execution/DAG/dag_builder.c b/src/execution/DAG/dag_builder.c index 73b72c1c5..47e5047d7 100644 --- a/src/execution/DAG/dag_builder.c +++ b/src/execution/DAG/dag_builder.c @@ -61,7 +61,6 @@ RAI_DAGRunOp *RAI_DAGCreateModelRunOp(RAI_Model *model) { op->commandType = REDISAI_DAG_CMD_MODELRUN; op->ectx = (RAI_ExecutionCtx *)mctx; op->devicestr = model->devicestr; - op->runkey = RAI_HoldString((RedisModuleString *)model->infokey); return (RAI_DAGRunOp *)op; } @@ -73,7 +72,6 @@ RAI_DAGRunOp *RAI_DAGCreateScriptRunOp(RAI_Script *script, const char *func_name op->commandType = REDISAI_DAG_CMD_SCRIPTRUN; op->ectx = (RAI_ExecutionCtx *)sctx; op->devicestr = script->devicestr; - op->runkey = RAI_HoldString((RedisModuleString *)script->infokey); return (RAI_DAGRunOp *)op; } diff --git a/src/execution/DAG/dag_op.c b/src/execution/DAG/dag_op.c index 95d512968..89e0ea989 100644 --- a/src/execution/DAG/dag_op.c +++ b/src/execution/DAG/dag_op.c @@ -13,7 +13,6 @@ int RAI_InitDagOp(RAI_DagOp **result) { dagOp = (RAI_DagOp *)RedisModule_Calloc(1, sizeof(RAI_DagOp)); dagOp->commandType = REDISAI_DAG_CMD_NONE; - dagOp->runkey = NULL; dagOp->inkeys = (RedisModuleString **)array_new(RedisModuleString *, 1); dagOp->outkeys = (RedisModuleString **)array_new(RedisModuleString *, 1); dagOp->inkeys_indices = array_new(size_t, 1); @@ -31,13 +30,9 @@ int RAI_InitDagOp(RAI_DagOp **result) { return REDISMODULE_OK; } -void RAI_DagOpSetRunKey(RAI_DagOp *dagOp, RedisModuleString *runkey) { dagOp->runkey = runkey; } - void RAI_FreeDagOp(RAI_DagOp *dagOp) { RAI_FreeError(dagOp->err); - if (dagOp->runkey) - RedisModule_FreeString(NULL, dagOp->runkey); if (dagOp->outTensor) RAI_TensorFree(dagOp->outTensor); diff --git a/src/execution/DAG/dag_op.h b/src/execution/DAG/dag_op.h index 2ac122b49..740b41919 100644 --- a/src/execution/DAG/dag_op.h +++ b/src/execution/DAG/dag_op.h @@ -18,7 +18,6 @@ typedef enum DAGCommand { typedef struct RAI_DagOp { DAGCommand commandType; - RedisModuleString *runkey; RedisModuleString **inkeys; RedisModuleString **outkeys; size_t *inkeys_indices; @@ -48,12 +47,3 @@ int RAI_InitDagOp(RAI_DagOp **result); * @param RAI_DagOp context in which RedisAI command operates. */ void RAI_FreeDagOp(RAI_DagOp *dagOp); - -/** - * @brief Sets the key name of current dag op execution subject. The subject is either a model or a - * script. - * - * @param dagOp Current op. - * @param runkey Subject key name. - */ -void RAI_DagOpSetRunKey(RAI_DagOp *dagOp, RedisModuleString *runkey); diff --git a/src/execution/background_workers.c b/src/execution/background_workers.c index 1c7c9d3e1..dd584e60a 100644 --- a/src/execution/background_workers.c +++ b/src/execution/background_workers.c @@ -54,10 +54,40 @@ static void _BGThread_Wait(RunQueueInfo *run_queue_info) { &absTimeout); } +static void _BGThread_SaveStats(RedisAI_RunInfo *rinfo) { + for (size_t i = 0; i < rinfo->dagOpCount; i++) { + RAI_DagOp *currentOp = rinfo->dagOps[i]; + + if (currentOp->commandType == REDISAI_DAG_CMD_MODELRUN || + currentOp->commandType == REDISAI_DAG_CMD_SCRIPTRUN) { + if (currentOp->result == REDISMODULE_ERR) { + RAI_StatsAddDataPoint(RAI_ExecutionCtx_GetStats(currentOp->ectx), 0, 1, 1, 0); + } else if (currentOp->result == REDISMODULE_OK) { + unsigned long batch_size = 1; + if (currentOp->commandType == REDISAI_DAG_CMD_MODELRUN) { + RAI_Tensor *t = NULL; + if (RAI_ExecutionCtx_NumOutputs(currentOp->ectx) > 0) { + t = RAI_ExecutionCtx_GetOutput(currentOp->ectx, 0); + } + if (t) { + batch_size = RAI_TensorDim(t, 0); + } else { + batch_size = 0; + } + } + RAI_StatsAddDataPoint(RAI_ExecutionCtx_GetStats(currentOp->ectx), + currentOp->duration_us, 1, 0, batch_size); + } + } + } +} + static void _BGThread_RinfoFinish(RedisAI_RunInfo *rinfo) { RedisAI_RunInfo *orig = rinfo->orig_copy; uint dagRefCount = RAI_DagRunInfoFreeShallowCopy(rinfo); if (dagRefCount == 0) { + // Save stats for every DAG execute operation. + _BGThread_SaveStats(orig); RedisAI_OnFinishCtx *finish_ctx = orig; orig->OnFinish(finish_ctx, orig->private_data); } diff --git a/src/execution/execution_contexts/execution_ctx.c b/src/execution/execution_contexts/execution_ctx.c index aa966315d..80ebd2d91 100644 --- a/src/execution/execution_contexts/execution_ctx.c +++ b/src/execution/execution_contexts/execution_ctx.c @@ -2,9 +2,11 @@ #include "redismodule.h" #include "util/arr.h" -void RAI_ExecutionCtx_Init(RAI_ExecutionCtx *ctx, RAI_ExecutionCtx_Free_fn freeFn) { +void RAI_ExecutionCtx_Init(RAI_ExecutionCtx *ctx, RAI_RunStats *run_stats, + RAI_ExecutionCtx_Free_fn freeFn) { ctx->inputs = array_new(RAI_Tensor *, 10); ctx->outputs = array_new(RAI_Tensor *, 10); + ctx->runStats = run_stats; ctx->freeFn = freeFn; } void RAI_ExecutionCtx_Free(RAI_ExecutionCtx *ctx) { @@ -21,6 +23,7 @@ void RAI_ExecutionCtx_Free(RAI_ExecutionCtx *ctx) { } inline size_t RAI_ExecutionCtx_NumInputs(RAI_ExecutionCtx *ctx) { return array_len(ctx->inputs); } + inline void RAI_ExecutionCtx_AddInput(RAI_ExecutionCtx *ctx, RAI_Tensor *t) { if (t != NULL) { t = RAI_TensorGetShallowCopy(t); @@ -35,7 +38,7 @@ inline RAI_Tensor *RAI_ExecutionCtx_GetInput(RAI_ExecutionCtx *ctx, size_t index inline size_t RAI_ExecutionCtx_NumOutputs(RAI_ExecutionCtx *ctx) { return array_len(ctx->outputs); } -inline void RAI_ExecutionCtx_AddOuputPlaceholder(RAI_ExecutionCtx *ctx) { +inline void RAI_ExecutionCtx_AddOutputPlaceholder(RAI_ExecutionCtx *ctx) { ctx->outputs = array_append(ctx->outputs, NULL); } @@ -48,3 +51,5 @@ inline RAI_Tensor *RAI_ExecutionCtx_GetOutput(RAI_ExecutionCtx *ctx, size_t inde RedisModule_Assert(index < array_len(ctx->outputs)); return ctx->outputs[index]; } + +inline RAI_RunStats *RAI_ExecutionCtx_GetStats(RAI_ExecutionCtx *ctx) { return ctx->runStats; } diff --git a/src/execution/execution_contexts/execution_ctx.h b/src/execution/execution_contexts/execution_ctx.h index 7219e8643..626add4e0 100644 --- a/src/execution/execution_contexts/execution_ctx.h +++ b/src/execution/execution_contexts/execution_ctx.h @@ -1,5 +1,6 @@ #pragma once +#include #include "redis_ai_objects/tensor.h" // Pre decleration @@ -15,6 +16,7 @@ typedef void (*RAI_ExecutionCtx_Free_fn)(RAI_ExecutionCtx *ctx); typedef struct RAI_ExecutionCtx { RAI_Tensor **inputs; // DAG op input tensors. RAI_Tensor **outputs; // DAG op output tensors. + RAI_RunStats *runStats; // The underline op's (Model/Script) stats entry. RAI_ExecutionCtx_Free_fn freeFn; // Inheriting execution context free function. } RAI_ExecutionCtx; @@ -24,7 +26,8 @@ typedef struct RAI_ExecutionCtx { * @param ctx - Execution context to initialize. * @param freeFn - Specific free function for inheriting execution contexts (script or model) */ -void RAI_ExecutionCtx_Init(RAI_ExecutionCtx *ctx, RAI_ExecutionCtx_Free_fn freeFn); +void RAI_ExecutionCtx_Init(RAI_ExecutionCtx *ctx, RAI_RunStats *run_stats, + RAI_ExecutionCtx_Free_fn freeFn); /** * @brief Frees the execution context internal structures. To be used from an inhereting execution @@ -72,7 +75,7 @@ size_t RAI_ExecutionCtx_NumOutputs(RAI_ExecutionCtx *ctx); * * @param ctx - Execution context. */ -void RAI_ExecutionCtx_AddOuputPlaceholder(RAI_ExecutionCtx *ctx); +void RAI_ExecutionCtx_AddOutputPlaceholder(RAI_ExecutionCtx *ctx); /** * @brief Sets an output tensor in a specfic index, populated before by a placeholder. @@ -91,3 +94,10 @@ void RAI_ExecutionCtx_SetOutput(RAI_ExecutionCtx *ctx, RAI_Tensor *t, size_t ind * @return RAI_Tensor* - Output tensor. */ RAI_Tensor *RAI_ExecutionCtx_GetOutput(RAI_ExecutionCtx *ctx, size_t index); + +/** + * @brief Returns the RunStats object for underline object. + * @param ctx - Execution context. + * @return RAI_RunStats + */ +RAI_RunStats *RAI_ExecutionCtx_GetStats(RAI_ExecutionCtx *ctx); diff --git a/src/execution/execution_contexts/modelRun_ctx.c b/src/execution/execution_contexts/modelRun_ctx.c index aa4b6bd96..ced2ef132 100644 --- a/src/execution/execution_contexts/modelRun_ctx.c +++ b/src/execution/execution_contexts/modelRun_ctx.c @@ -8,7 +8,8 @@ RAI_ModelRunCtx *RAI_ModelRunCtxCreate(RAI_Model *model) { RAI_ModelRunCtx *mctx = RedisModule_Calloc(1, sizeof(*mctx)); - RAI_ExecutionCtx_Init((RAI_ExecutionCtx *)mctx, (RAI_ExecutionCtx_Free_fn)RAI_ModelRunCtxFree); + RAI_ExecutionCtx_Init((RAI_ExecutionCtx *)mctx, model->info, + (RAI_ExecutionCtx_Free_fn)RAI_ModelRunCtxFree); mctx->model = RAI_ModelGetShallowCopy(model); return mctx; } @@ -19,7 +20,7 @@ int RAI_ModelRunCtxAddInput(RAI_ModelRunCtx *mctx, const char *inputName, RAI_Te } int RAI_ModelRunCtxAddOutput(RAI_ModelRunCtx *mctx, const char *outputName) { - RAI_ExecutionCtx_AddOuputPlaceholder((RAI_ExecutionCtx *)mctx); + RAI_ExecutionCtx_AddOutputPlaceholder((RAI_ExecutionCtx *)mctx); } inline size_t RAI_ModelRunCtxNumInputs(RAI_ModelRunCtx *mctx) { diff --git a/src/execution/execution_contexts/scriptRun_ctx.c b/src/execution/execution_contexts/scriptRun_ctx.c index fda275a80..c91ba571f 100644 --- a/src/execution/execution_contexts/scriptRun_ctx.c +++ b/src/execution/execution_contexts/scriptRun_ctx.c @@ -9,7 +9,8 @@ RAI_ScriptRunCtx *RAI_ScriptRunCtxCreate(RAI_Script *script, const char *fnname) { #define PARAM_INITIAL_SIZE 10 RAI_ScriptRunCtx *sctx = RedisModule_Calloc(1, sizeof(*sctx)); - RAI_ExecutionCtx_Init(&sctx->base, (RAI_ExecutionCtx_Free_fn)RAI_ScriptRunCtxFree); + RAI_ExecutionCtx_Init(&sctx->base, script->info, + (RAI_ExecutionCtx_Free_fn)RAI_ScriptRunCtxFree); sctx->script = RAI_ScriptGetShallowCopy(script); sctx->fnname = RedisModule_Strdup(fnname); sctx->keys = array_new(RedisModuleString *, PARAM_INITIAL_SIZE); @@ -58,7 +59,7 @@ int RAI_ScriptRunCtxAddArgInput(RAI_ScriptRunCtx *sctx, RedisModuleString *arg) } inline int RAI_ScriptRunCtxAddOutput(RAI_ScriptRunCtx *sctx) { - RAI_ExecutionCtx_AddOuputPlaceholder(&sctx->base); + RAI_ExecutionCtx_AddOutputPlaceholder(&sctx->base); return 1; } diff --git a/src/execution/parsing/deprecated.c b/src/execution/parsing/deprecated.c index 6817ce589..c11b1b493 100644 --- a/src/execution/parsing/deprecated.c +++ b/src/execution/parsing/deprecated.c @@ -18,7 +18,7 @@ static int _ModelRunCommand_ParseArgs(RedisModuleCtx *ctx, int argc, RedisModuleString **argv, RAI_Model **model, RAI_Error *error, RedisModuleString ***inkeys, RedisModuleString ***outkeys, - RedisModuleString **runkey, long long *timeout) { + long long *timeout) { if (argc < 6) { RAI_SetError(error, RAI_EMODELRUN, @@ -30,8 +30,6 @@ static int _ModelRunCommand_ParseArgs(RedisModuleCtx *ctx, int argc, RedisModule if (status == REDISMODULE_ERR) { return REDISMODULE_ERR; } - RAI_HoldString(argv[argpos]); - *runkey = argv[argpos]; const char *arg_string = RedisModule_StringPtrLen(argv[++argpos], NULL); // Parse timeout arg if given and store it in timeout @@ -87,8 +85,7 @@ int ParseModelRunCommand(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, RedisModu RAI_Model *model; long long timeout = 0; if (_ModelRunCommand_ParseArgs(ctx, argc, argv, &model, rinfo->err, ¤tOp->inkeys, - ¤tOp->outkeys, ¤tOp->runkey, - &timeout) == REDISMODULE_ERR) { + ¤tOp->outkeys, &timeout) == REDISMODULE_ERR) { goto cleanup; } @@ -305,7 +302,6 @@ int ModelSetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return ret; } - // TODO: if backend loaded, make sure there's a queue if (!RunQueue_IsExists(devicestr)) { RunQueueInfo *run_queue_info = RunQueue_Create(devicestr); if (run_queue_info == NULL) { @@ -332,12 +328,12 @@ int ModelSetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_ModuleTypeSetValue(key, RAI_ModelRedisType(), model); - model->infokey = RAI_AddStatsEntry(ctx, keystr, RAI_MODEL, backend, devicestr, tag); + RAI_RunStats *stats = RAI_StatsCreate(keystr, RAI_MODEL, backend, devicestr, tag); + RAI_StatsStoreEntry(keystr, stats); + model->info = stats; RedisModule_CloseKey(key); - RedisModule_ReplyWithSimpleString(ctx, "OK"); - RedisModule_ReplicateVerbatim(ctx); return REDISMODULE_OK; @@ -423,12 +419,12 @@ int ScriptSetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_ModuleTypeSetValue(key, RAI_ScriptRedisType(), script); - script->infokey = RAI_AddStatsEntry(ctx, keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_RunStats *stats = RAI_StatsCreate(keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_StatsStoreEntry(keystr, stats); + script->info = stats; RedisModule_CloseKey(key); - RedisModule_ReplyWithSimpleString(ctx, "OK"); - RedisModule_ReplicateVerbatim(ctx); return REDISMODULE_OK; @@ -536,7 +532,6 @@ int ParseScriptRunCommand(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, RedisMod if (!script) { goto cleanup; } - RAI_DagOpSetRunKey(currentOp, RAI_HoldString(argv[1])); const char *func_name = ScriptCommand_GetFunctionName(argv[2]); if (!func_name) { diff --git a/src/execution/parsing/deprecated.h b/src/execution/parsing/deprecated.h index 4adb6cf2b..452e75009 100644 --- a/src/execution/parsing/deprecated.h +++ b/src/execution/parsing/deprecated.h @@ -6,7 +6,7 @@ /** * @brief Parse and validate MODELRUN command: create a modelRunCtx based on the model obtained * from the key space and save it in the op. The keys of the input and output tensors are stored in - * the op's inkeys and outkeys arrays, the model key is saved in op's runkey, and the given timeout + * the op's inkeys and outkeys arrays, and the given timeout * is saved as well (if given, otherwise it is zero). * @return Returns REDISMODULE_OK if the command is valid, REDISMODULE_ERR otherwise. */ diff --git a/src/execution/parsing/model_commands_parser.c b/src/execution/parsing/model_commands_parser.c index 0a61f4bc7..40cf990be 100644 --- a/src/execution/parsing/model_commands_parser.c +++ b/src/execution/parsing/model_commands_parser.c @@ -7,7 +7,7 @@ static int _ModelExecuteCommand_ParseArgs(RedisModuleCtx *ctx, int argc, RedisModuleString **argv, RAI_Model **model, RAI_Error *error, RedisModuleString ***inkeys, RedisModuleString ***outkeys, - RedisModuleString **runkey, long long *timeout) { + long long *timeout) { if (argc < 8) { RAI_SetError(error, RAI_EMODELRUN, @@ -15,11 +15,11 @@ static int _ModelExecuteCommand_ParseArgs(RedisModuleCtx *ctx, int argc, RedisMo return REDISMODULE_ERR; } size_t arg_pos = 1; - const int status = RAI_GetModelFromKeyspace(ctx, argv[arg_pos], model, REDISMODULE_READ, error); + const int status = + RAI_GetModelFromKeyspace(ctx, argv[arg_pos++], model, REDISMODULE_READ, error); if (status == REDISMODULE_ERR) { return REDISMODULE_ERR; } - *runkey = RAI_HoldString(argv[arg_pos++]); const char *arg_string = RedisModule_StringPtrLen(argv[arg_pos++], NULL); if (strcasecmp(arg_string, "INPUTS") != 0) { @@ -128,8 +128,7 @@ int ParseModelExecuteCommand(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, Redis RAI_Model *model; long long timeout = 0; if (_ModelExecuteCommand_ParseArgs(ctx, argc, argv, &model, rinfo->err, ¤tOp->inkeys, - ¤tOp->outkeys, ¤tOp->runkey, - &timeout) == REDISMODULE_ERR) { + ¤tOp->outkeys, &timeout) == REDISMODULE_ERR) { goto cleanup; } diff --git a/src/execution/parsing/model_commands_parser.h b/src/execution/parsing/model_commands_parser.h index c288045b4..2dcb84ba2 100644 --- a/src/execution/parsing/model_commands_parser.h +++ b/src/execution/parsing/model_commands_parser.h @@ -5,7 +5,7 @@ /** * @brief Parse and validate MODELEXECUTE command: create a modelRunCtx based on the model obtained * from the key space and save it in the op. The keys of the input and output tensors are stored in - * the op's inkeys and outkeys arrays, the model key is saved in op's runkey, and the given timeout + * the op's inkeys and outkeys arrays, and the given timeout * is saved as well (if given, otherwise it is zero). * @return Returns REDISMODULE_OK if the command is valid, REDISMODULE_ERR otherwise. */ diff --git a/src/execution/parsing/script_commands_parser.c b/src/execution/parsing/script_commands_parser.c index e5d584e72..01a764a48 100644 --- a/src/execution/parsing/script_commands_parser.c +++ b/src/execution/parsing/script_commands_parser.c @@ -280,8 +280,6 @@ int ParseScriptExecuteCommand(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, goto cleanup; } - RAI_DagOpSetRunKey(currentOp, RAI_HoldString(argv[1])); - const char *func_name = ScriptCommand_GetFunctionName(argv[2]); if (!func_name) { RAI_SetError(rinfo->err, RAI_ESCRIPTRUN, "ERR function name not specified"); diff --git a/src/execution/parsing/script_commands_parser.h b/src/execution/parsing/script_commands_parser.h index 50ace86a8..2653b3610 100644 --- a/src/execution/parsing/script_commands_parser.h +++ b/src/execution/parsing/script_commands_parser.h @@ -5,8 +5,8 @@ /** * @brief Parse and validate SCRIPTEXECUTE command: create a scriptRunCtx based on the script * obtained from the key space and the function name given, and save it in the op. The keys of the - * input and output tensors are stored in the op's inkeys and outkeys arrays, the script key is - * saved in op's runkey, and the given timeout is saved as well (if given, otherwise it is zero). + * input and output tensors are stored in the op's inkeys and outkeys arrays, + * and the given timeout is saved as well (if given, otherwise it is zero). * @return Returns REDISMODULE_OK if the command is valid, REDISMODULE_ERR otherwise. */ int ParseScriptExecuteCommand(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, diff --git a/src/redis_ai_objects/model.c b/src/redis_ai_objects/model.c index 753df26df..5d5f60960 100644 --- a/src/redis_ai_objects/model.c +++ b/src/redis_ai_objects/model.c @@ -103,7 +103,15 @@ void RAI_ModelFree(RAI_Model *model, RAI_Error *err) { } RedisModule_FreeString(NULL, model->tag); - RAI_RemoveStatsEntry(model->infokey); + + // If the run stats which is stored under this key is the same one that the model holds a + // reference to, remove the entry from the global statistics dictionary as well. Otherwise, + // this key has been overwritten - just release the old run stats. + RAI_RunStats *stats = RAI_StatsGetEntry(model->info->key); + if (stats == model->info) { + RAI_StatsRemoveEntry(stats->key); + } + RAI_StatsFree(model->info); RedisModule_Free(model); } diff --git a/src/redis_ai_objects/model_struct.h b/src/redis_ai_objects/model_struct.h index ac5b2503f..5f61246f2 100644 --- a/src/redis_ai_objects/model_struct.h +++ b/src/redis_ai_objects/model_struct.h @@ -2,6 +2,7 @@ #include "config/config.h" #include "tensor_struct.h" +#include "redis_ai_objects/stats.h" typedef struct RAI_ModelOpts { size_t batchsize; @@ -30,5 +31,5 @@ typedef struct RAI_Model { long long refCount; char *data; long long datalen; - void *infokey; + RAI_RunStats *info; } RAI_Model; diff --git a/src/redis_ai_objects/script.c b/src/redis_ai_objects/script.c index 86268f13c..8b5fd9d6a 100644 --- a/src/redis_ai_objects/script.c +++ b/src/redis_ai_objects/script.c @@ -57,7 +57,14 @@ void RAI_ScriptFree(RAI_Script *script, RAI_Error *err) { RedisModule_FreeString(NULL, script->tag); - RAI_RemoveStatsEntry(script->infokey); + // If the run stats which is stored under this key is the same one that the script holds a + // reference to, remove the entry from the global statistics dictionary as well. Otherwise, + // this key has been overwritten - just release the old run stats. + RAI_RunStats *stats = RAI_StatsGetEntry(script->info->key); + if (stats == script->info) { + RAI_StatsRemoveEntry(stats->key); + } + RAI_StatsFree(script->info); RAI_backends.torch.script_free(script, err); } diff --git a/src/redis_ai_objects/script_struct.h b/src/redis_ai_objects/script_struct.h index 65b3d89da..940794396 100644 --- a/src/redis_ai_objects/script_struct.h +++ b/src/redis_ai_objects/script_struct.h @@ -2,6 +2,7 @@ #include "config/config.h" #include "tensor_struct.h" +#include "redis_ai_objects/stats.h" #include "util/dict.h" typedef enum { @@ -26,6 +27,6 @@ typedef struct RAI_Script { char *devicestr; RedisModuleString *tag; long long refCount; - void *infokey; + RAI_RunStats *info; char **entryPoints; } RAI_Script; diff --git a/src/redis_ai_objects/stats.c b/src/redis_ai_objects/stats.c index eaa51de23..3cfa85a49 100644 --- a/src/redis_ai_objects/stats.c +++ b/src/redis_ai_objects/stats.c @@ -2,15 +2,19 @@ * stats.c * * Contains the helper methods to create, - * initialize, get, reset, and free run-time statics, like call count, error + * initialize, get, reset, and free run-time statistics, like call count, error * count, and aggregate durations of ModelRun and ScriptRun sessions. * */ #include +#include #include "stats.h" #include "util/string_utils.h" +// Global dictionary that stores run statistics for all models and scripts in the shard. +AI_dict *RunStats; + long long ustime(void) { struct timeval tv; long long ust; @@ -23,112 +27,92 @@ long long ustime(void) { mstime_t mstime(void) { return ustime() / 1000; } -void *RAI_AddStatsEntry(RedisModuleCtx *ctx, RedisModuleString *key, RAI_RunType runtype, - RAI_Backend backend, const char *devicestr, RedisModuleString *tag) { - struct RedisAI_RunStats *rstats = NULL; - rstats = RedisModule_Calloc(1, sizeof(struct RedisAI_RunStats)); - rstats->key = RAI_HoldString(key); - rstats->type = runtype; - rstats->backend = backend; - rstats->devicestr = RedisModule_Strdup(devicestr); - rstats->tag = RAI_HoldString(tag); +RAI_RunStats *RAI_StatsCreate(RedisModuleString *key, RAI_RunType type, RAI_Backend backend, + const char *device_str, RedisModuleString *tag) { + RAI_RunStats *r_stats = RedisModule_Calloc(1, sizeof(RAI_RunStats)); + r_stats->key = RedisModule_CreateStringFromString(NULL, key); + r_stats->type = type; + r_stats->backend = backend; + r_stats->device_str = RedisModule_Strdup(device_str); + r_stats->tag = RAI_HoldString(tag); + return r_stats; +} - AI_dictAdd(run_stats, (void *)key, (void *)rstats); +void RAI_StatsReset(RAI_RunStats *r_stats) { + RedisModule_Assert(r_stats); + __atomic_store_n(&r_stats->duration_us, 0, __ATOMIC_RELAXED); + __atomic_store_n(&r_stats->samples, 0, __ATOMIC_RELAXED); + __atomic_store_n(&r_stats->calls, 0, __ATOMIC_RELAXED); + __atomic_store_n(&r_stats->n_errors, 0, __ATOMIC_RELAXED); +} - return (void *)key; +void RAI_StatsAddDataPoint(RAI_RunStats *r_stats, unsigned long duration, unsigned long calls, + unsigned long errors, unsigned long samples) { + RedisModule_Assert(r_stats); + __atomic_add_fetch(&r_stats->duration_us, duration, __ATOMIC_RELAXED); + __atomic_add_fetch(&r_stats->calls, calls, __ATOMIC_RELAXED); + __atomic_add_fetch(&r_stats->n_errors, errors, __ATOMIC_RELAXED); + __atomic_add_fetch(&r_stats->samples, samples, __ATOMIC_RELAXED); +} + +void RAI_StatsFree(RAI_RunStats *r_stats) { + if (r_stats) { + if (r_stats->device_str) { + RedisModule_Free(r_stats->device_str); + } + if (r_stats->tag) { + RedisModule_FreeString(NULL, r_stats->tag); + } + if (r_stats->key) { + RedisModule_FreeString(NULL, r_stats->key); + } + RedisModule_Free(r_stats); + } } -void RAI_ListStatsEntries(RAI_RunType type, long long *nkeys, RedisModuleString ***keys, - RedisModuleString ***tags) { - AI_dictIterator *stats_iter = AI_dictGetSafeIterator(run_stats); +/************************************* Global RunStats dict API *********************************/ + +void RAI_StatsStoreEntry(RedisModuleString *key, RAI_RunStats *new_stats_entry) { + AI_dictReplace(RunStats, (void *)key, (void *)new_stats_entry); +} - long long stats_size = AI_dictSize(run_stats); +void RAI_StatsGetAllEntries(RAI_RunType type, long long *nkeys, RedisModuleString ***keys, + RedisModuleString ***tags) { + AI_dictIterator *stats_iter = AI_dictGetSafeIterator(RunStats); + long long stats_size = AI_dictSize(RunStats); *keys = RedisModule_Calloc(stats_size, sizeof(RedisModuleString *)); *tags = RedisModule_Calloc(stats_size, sizeof(RedisModuleString *)); - *nkeys = 0; AI_dictEntry *stats_entry = AI_dictNext(stats_iter); - struct RedisAI_RunStats *rstats = NULL; + RAI_RunStats *r_stats = NULL; while (stats_entry) { - rstats = AI_dictGetVal(stats_entry); - - if (rstats->type == type) { - (*keys)[*nkeys] = rstats->key; - (*tags)[*nkeys] = rstats->tag; + r_stats = AI_dictGetVal(stats_entry); + if (r_stats->type == type) { + (*keys)[*nkeys] = r_stats->key; + (*tags)[*nkeys] = r_stats->tag; *nkeys += 1; } - stats_entry = AI_dictNext(stats_iter); } - AI_dictReleaseIterator(stats_iter); } -void RAI_RemoveStatsEntry(void *infokey) { - AI_dictEntry *stats_entry = AI_dictFind(run_stats, infokey); +void RAI_StatsRemoveEntry(RedisModuleString *info_key) { + AI_dictEntry *stats_entry = AI_dictFind(RunStats, info_key); if (stats_entry) { - struct RedisAI_RunStats *rstats = AI_dictGetVal(stats_entry); - AI_dictDelete(run_stats, infokey); - RAI_FreeRunStats(rstats); - } -} - -int RAI_ResetRunStats(struct RedisAI_RunStats *rstats) { - rstats->duration_us = 0; - rstats->samples = 0; - rstats->calls = 0; - rstats->nerrors = 0; - return 0; -} - -int RAI_SafeAddDataPoint(struct RedisAI_RunStats *rstats, long long duration, long long calls, - long long errors, long long samples) { - int result = 1; - if (rstats == NULL) { - return result; - } else { - rstats->duration_us += duration; - rstats->calls += calls; - rstats->nerrors += errors; - rstats->samples += samples; - result = 0; - } - return result; -} - -void RAI_FreeRunStats(struct RedisAI_RunStats *rstats) { - if (rstats) { - if (rstats->devicestr) { - RedisModule_Free(rstats->devicestr); - } - if (rstats->tag) { - RedisModule_FreeString(NULL, rstats->tag); - } - if (rstats->key) { - RedisModule_FreeString(NULL, rstats->key); - } - RedisModule_Free(rstats); + AI_dictDelete(RunStats, info_key); } } -int RAI_GetRunStats(RedisModuleString *runkey, struct RedisAI_RunStats **rstats) { - int result = 1; - if (run_stats == NULL) { - return result; - } - AI_dictEntry *entry = AI_dictFind(run_stats, runkey); - if (entry) { - *rstats = AI_dictGetVal(entry); - result = 0; +RAI_RunStats *RAI_StatsGetEntry(RedisModuleString *runkey) { + RedisModule_Assert(RunStats); + AI_dictEntry *entry = AI_dictFind(RunStats, runkey); + if (!entry) { + return NULL; } - return result; + return AI_dictGetVal(entry); } - -void RedisAI_FreeRunStats(RedisModuleCtx *ctx, struct RedisAI_RunStats *rstats) { - RedisModule_FreeString(ctx, rstats->key); - RAI_FreeRunStats(rstats); -} \ No newline at end of file diff --git a/src/redis_ai_objects/stats.h b/src/redis_ai_objects/stats.h index 27edea14e..b0a64268b 100644 --- a/src/redis_ai_objects/stats.h +++ b/src/redis_ai_objects/stats.h @@ -13,19 +13,18 @@ #include "redismodule.h" #include "util/dict.h" -struct RedisAI_RunStats { +typedef struct RAI_RunStats { RedisModuleString *key; RAI_RunType type; RAI_Backend backend; - char *devicestr; + char *device_str; RedisModuleString *tag; - long long duration_us; - long long samples; - long long calls; - long long nerrors; -}; - -AI_dict *run_stats; + unsigned long duration_us; + unsigned long samples; + unsigned long calls; + unsigned long n_errors; + unsigned long ref_count; +} RAI_RunStats; long long ustime(void); mstime_t mstime(void); @@ -34,66 +33,73 @@ mstime_t mstime(void); * Adds an entry to the ephemeral run-time statistic. The statistics are not * saved to the keyspace, and on maximum live for the duration of the DB uptime. * - * @param ctx Context in which Redis modules operate - * @param keyName key name to use as unique stats identifier + * @param key key name to use as unique stats identifier * @param type type of stats identifier ( one of RAI_MODEL or RAI_SCRIPT ) * @param backend backend identifier (one of RAI_BACKEND_TENSORFLOW, * RAI_BACKEND_TFLITE, RAI_BACKEND_TORCH, RAI_BACKEND_ONNXRUNTIME,) - * @param devicestr + * @param device_str device to execute the model on (CPU, GPU, ...) * @param tag optional tag of Model/Script - * @return + * @return A newly heap allocated RedisAI_RunStats object with the given fields. */ -void *RAI_AddStatsEntry(RedisModuleCtx *ctx, RedisModuleString *key, RAI_RunType type, - RAI_Backend backend, const char *devicestr, RedisModuleString *tag); +RAI_RunStats *RAI_StatsCreate(RedisModuleString *key, RAI_RunType type, RAI_Backend backend, + const char *device_str, RedisModuleString *tag); /** - * Removes the statistical entry with the provided unique stats identifier - * - * @param infokey + * @brief Reset atomically counters for a given run_stats of some model/script. + * @param run_stats entry to reset. */ -void RAI_RemoveStatsEntry(void *infokey); +void RAI_StatsReset(RAI_RunStats *run_stats); /** - * Returns a list of all statistical entries that match a specific RAI_RunType ( - * model or script ) - * - * @param type type of stats identifier to provide the list for ( one of - * RAI_MODEL or RAI_SCRIPT ) - * @param nkeys output variable containing the number of returned stats - * @param keys output variable containing the list of returned keys - * @param tags output variable containing the list of returned tags + * Update atomically stats counters after execution. + * @param r_stats runStats entry that matches some model/script. + * @param duration execution runtime in us + * @param calls number of calls to the underline model/script operation. + * @param errors number of errors that had occurred. + * @param samples number of samples that the model execute (batch size) */ -void RAI_ListStatsEntries(RAI_RunType type, long long *nkeys, RedisModuleString ***keys, - RedisModuleString ***tags); +void RAI_StatsAddDataPoint(RAI_RunStats *r_stats, unsigned long duration, unsigned long calls, + unsigned long errors, unsigned long samples); /** - * - * @param rstats - * @return 0 on success, or 1 if the reset failed + * @brief Release RunStats struct. + * @param run_stats entry to remove. */ -int RAI_ResetRunStats(struct RedisAI_RunStats *rstats); +void RAI_StatsFree(RAI_RunStats *r_stats); +/************************************* Global RunStats dict API *********************************/ /** - * Safely add datapoint to the run stats. Protected against null pointer - * runstats - * @param rstats - * @param duration - * @param calls - * @param errors - * @param samples - * @return 0 on success, or 1 if the addition failed + * Adds an entry to the ephemeral run-time statistic. The statistics are not + * saved to the keyspace, and on maximum live for the duration of the DB uptime. + * If a run stats object already exists for this key, it will override it. + * + * @param keyName key name to use as unique stats identifier. + * @param run_stats_entry RunStats entry pointer to store. */ -int RAI_SafeAddDataPoint(struct RedisAI_RunStats *rstats, long long duration, long long calls, - long long errors, long long samples); +void RAI_StatsStoreEntry(RedisModuleString *key, RAI_RunStats *run_stats_entry); -void RAI_FreeRunStats(struct RedisAI_RunStats *rstats); +/** + * @brief: Removes the statistical entry with the provided unique stats identifier + * @param info_key + */ +void RAI_StatsRemoveEntry(RedisModuleString *info_key); /** - * - * @param runkey - * @param rstats - * @return 0 on success, or 1 if the the run stats with runkey does not exist + * Returns a list of all statistical entries that match a specific RAI_RunType ( + * model or script). + * @param type type of stats identifier to provide the list for (one of + * RAI_MODEL or RAI_SCRIPT). + * @param nkeys output variable containing the number of returned stats. + * @param keys output variable containing the list of returned keys. + * @param tags output variable containing the list of returned tags. */ -int RAI_GetRunStats(RedisModuleString *runkey, struct RedisAI_RunStats **rstats); +void RAI_StatsGetAllEntries(RAI_RunType type, long long *nkeys, RedisModuleString ***keys, + RedisModuleString ***tags); -void RedisAI_FreeRunStats(RedisModuleCtx *ctx, struct RedisAI_RunStats *rstats); +/** + * @brief Retrieve the run stats info of run_key from the global RunStat dictionary and set it in + * r_stats. + * @param run_key module/script key name + * @return The RAI_RunStats object that is associated with the key, or NULL if it doesn't exist. + */ +RAI_RunStats *RAI_StatsGetEntry(RedisModuleString *run_key); diff --git a/src/redisai.c b/src/redisai.c index 918bfe330..75a299ff4 100644 --- a/src/redisai.c +++ b/src/redisai.c @@ -64,6 +64,8 @@ extern int rlecBuild; extern pthread_key_t ThreadIdKey; +extern AI_dict *RunStats; + /* ----------------------- RedisAI Module Commands ------------------------- */ /** @@ -372,13 +374,12 @@ int RedisAI_ModelStore_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg } RedisModule_ModuleTypeSetValue(key, RAI_ModelRedisType(), model); - - model->infokey = RAI_AddStatsEntry(ctx, keystr, RAI_MODEL, backend, devicestr, tag); + RAI_RunStats *stats = RAI_StatsCreate(keystr, RAI_MODEL, backend, devicestr, tag); + RAI_StatsStoreEntry(keystr, stats); + model->info = stats; RedisModule_CloseKey(key); - RedisModule_ReplyWithSimpleString(ctx, "OK"); - RedisModule_ReplicateVerbatim(ctx); return REDISMODULE_OK; @@ -536,7 +537,7 @@ int RedisAI_ModelScan_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv long long nkeys; RedisModuleString **keys; RedisModuleString **tags; - RAI_ListStatsEntries(RAI_MODEL, &nkeys, &keys, &tags); + RAI_StatsGetAllEntries(RAI_MODEL, &nkeys, &keys, &tags); RedisModule_ReplyWithArray(ctx, nkeys); @@ -801,12 +802,12 @@ int RedisAI_ScriptStore_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **ar RedisModule_ModuleTypeSetValue(key, RAI_ScriptRedisType(), script); - script->infokey = RAI_AddStatsEntry(ctx, keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_RunStats *stats = RAI_StatsCreate(keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_StatsStoreEntry(keystr, stats); + script->info = stats; RedisModule_CloseKey(key); - RedisModule_ReplyWithSimpleString(ctx, "OK"); - RedisModule_ReplicateVerbatim(ctx); return REDISMODULE_OK; @@ -825,7 +826,7 @@ int RedisAI_ScriptScan_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg long long nkeys; RedisModuleString **keys; RedisModuleString **tags; - RAI_ListStatsEntries(RAI_SCRIPT, &nkeys, &keys, &tags); + RAI_StatsGetAllEntries(RAI_SCRIPT, &nkeys, &keys, &tags); RedisModule_ReplyWithArray(ctx, nkeys); @@ -851,15 +852,15 @@ int RedisAI_Info_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int return RedisModule_WrongArity(ctx); RedisModuleString *runkey = argv[1]; - struct RedisAI_RunStats *rstats = NULL; - if (RAI_GetRunStats(runkey, &rstats) == REDISMODULE_ERR) { + RAI_RunStats *rstats = RAI_StatsGetEntry(runkey); + if (!rstats) { return RedisModule_ReplyWithError(ctx, "ERR cannot find run info for key"); } if (argc == 3) { const char *subcommand = RedisModule_StringPtrLen(argv[2], NULL); if (!strcasecmp(subcommand, "RESETSTAT")) { - RAI_ResetRunStats(rstats); + RAI_StatsReset(rstats); RedisModule_ReplyWithSimpleString(ctx, "OK"); return REDISMODULE_OK; } @@ -878,7 +879,7 @@ int RedisAI_Info_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int RedisModule_ReplyWithCString(ctx, "backend"); RedisModule_ReplyWithCString(ctx, RAI_GetBackendName(rstats->backend)); RedisModule_ReplyWithCString(ctx, "device"); - RedisModule_ReplyWithCString(ctx, rstats->devicestr); + RedisModule_ReplyWithCString(ctx, rstats->device_str); RedisModule_ReplyWithCString(ctx, "tag"); if (rstats->tag) { RedisModule_ReplyWithString(ctx, rstats->tag); @@ -886,17 +887,17 @@ int RedisAI_Info_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int RedisModule_ReplyWithCString(ctx, ""); } RedisModule_ReplyWithCString(ctx, "duration"); - RedisModule_ReplyWithLongLong(ctx, rstats->duration_us); + RedisModule_ReplyWithLongLong(ctx, (long long)rstats->duration_us); RedisModule_ReplyWithCString(ctx, "samples"); if (rstats->type == RAI_MODEL) { - RedisModule_ReplyWithLongLong(ctx, rstats->samples); + RedisModule_ReplyWithLongLong(ctx, (long long)rstats->samples); } else { RedisModule_ReplyWithLongLong(ctx, -1); } RedisModule_ReplyWithCString(ctx, "calls"); - RedisModule_ReplyWithLongLong(ctx, rstats->calls); + RedisModule_ReplyWithLongLong(ctx, (long long)rstats->calls); RedisModule_ReplyWithCString(ctx, "errors"); - RedisModule_ReplyWithLongLong(ctx, rstats->nerrors); + RedisModule_ReplyWithLongLong(ctx, (long long)rstats->n_errors); return REDISMODULE_OK; } @@ -1418,7 +1419,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_Log(ctx, "warning", "RedisAI could not initialize run queue for CPU"); return REDISMODULE_ERR; } - run_stats = AI_dictCreate(&AI_dictTypeHeapRStrings, NULL); + RunStats = AI_dictCreate(&AI_dictTypeHeapRStrings, NULL); return REDISMODULE_OK; } diff --git a/src/serialization/RDB/decoder/previous/v0/decode_v0.c b/src/serialization/RDB/decoder/previous/v0/decode_v0.c index dd5ed48c2..df2cb3e05 100644 --- a/src/serialization/RDB/decoder/previous/v0/decode_v0.c +++ b/src/serialization/RDB/decoder/previous/v0/decode_v0.c @@ -124,7 +124,9 @@ void *RAI_RDBLoadModel_v0(RedisModuleIO *io) { RedisModuleString *stats_keystr = RedisModule_CreateStringFromString(stats_ctx, RedisModule_GetKeyNameFromIO(io)); - model->infokey = RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_MODEL, backend, devicestr, tag); + RAI_RunStats *stats = RAI_StatsCreate(stats_keystr, RAI_MODEL, backend, devicestr, tag); + RAI_StatsStoreEntry(stats_keystr, stats); + model->info = stats; for (size_t i = 0; i < ninputs; i++) { RedisModule_Free((void *)inputs[i]); @@ -212,8 +214,10 @@ void *RAI_RDBLoadScript_v0(RedisModuleIO *io) { RedisModuleString *stats_keystr = RedisModule_CreateStringFromString(stats_ctx, RedisModule_GetKeyNameFromIO(io)); - script->infokey = - RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_RunStats *stats = + RAI_StatsCreate(stats_keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_StatsStoreEntry(stats_keystr, stats); + script->info = stats; RedisModule_FreeString(NULL, stats_keystr); RedisModule_FreeString(NULL, tag); diff --git a/src/serialization/RDB/decoder/previous/v1/decode_v1.c b/src/serialization/RDB/decoder/previous/v1/decode_v1.c index 2551595d3..ebc389397 100644 --- a/src/serialization/RDB/decoder/previous/v1/decode_v1.c +++ b/src/serialization/RDB/decoder/previous/v1/decode_v1.c @@ -100,7 +100,9 @@ void *RAI_RDBLoadModel_v1(RedisModuleIO *io) { RedisModuleString *stats_keystr = RedisModule_CreateStringFromString(stats_ctx, RedisModule_GetKeyNameFromIO(io)); - model->infokey = RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_MODEL, backend, devicestr, tag); + RAI_RunStats *stats = RAI_StatsCreate(stats_keystr, RAI_MODEL, backend, devicestr, tag); + RAI_StatsStoreEntry(stats_keystr, stats); + model->info = stats; for (size_t i = 0; i < ninputs; i++) { RedisModule_Free((void *)inputs[i]); @@ -185,8 +187,10 @@ void *RAI_RDBLoadScript_v1(RedisModuleIO *io) { RedisModuleString *stats_keystr = RedisModule_CreateStringFromString(stats_ctx, RedisModule_GetKeyNameFromIO(io)); - script->infokey = - RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_RunStats *stats = + RAI_StatsCreate(stats_keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_StatsStoreEntry(stats_keystr, stats); + script->info = stats; RedisModule_FreeString(NULL, stats_keystr); RedisModule_FreeString(NULL, tag); diff --git a/src/serialization/RDB/decoder/previous/v2/decode_v2.c b/src/serialization/RDB/decoder/previous/v2/decode_v2.c index d51f4a559..92fc40a62 100644 --- a/src/serialization/RDB/decoder/previous/v2/decode_v2.c +++ b/src/serialization/RDB/decoder/previous/v2/decode_v2.c @@ -102,7 +102,9 @@ void *RAI_RDBLoadModel_v2(RedisModuleIO *io) { RedisModuleString *stats_keystr = RedisModule_CreateStringFromString(stats_ctx, RedisModule_GetKeyNameFromIO(io)); - model->infokey = RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_MODEL, backend, devicestr, tag); + RAI_RunStats *stats = RAI_StatsCreate(stats_keystr, RAI_MODEL, backend, devicestr, tag); + RAI_StatsStoreEntry(stats_keystr, stats); + model->info = stats; for (size_t i = 0; i < ninputs; i++) { RedisModule_Free((void *)inputs[i]); @@ -187,8 +189,10 @@ void *RAI_RDBLoadScript_v2(RedisModuleIO *io) { RedisModuleString *stats_keystr = RedisModule_CreateStringFromString(stats_ctx, RedisModule_GetKeyNameFromIO(io)); - script->infokey = - RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_RunStats *stats = + RAI_StatsCreate(stats_keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_StatsStoreEntry(stats_keystr, stats); + script->info = stats; RedisModule_FreeString(NULL, stats_keystr); RedisModule_FreeString(NULL, tag); diff --git a/src/serialization/RDB/decoder/previous/v3/decode_v3.c b/src/serialization/RDB/decoder/previous/v3/decode_v3.c index 51a51e43e..d50348011 100644 --- a/src/serialization/RDB/decoder/previous/v3/decode_v3.c +++ b/src/serialization/RDB/decoder/previous/v3/decode_v3.c @@ -102,7 +102,9 @@ void *RAI_RDBLoadModel_v3(RedisModuleIO *io) { RedisModuleString *stats_keystr = RedisModule_CreateStringFromString(stats_ctx, RedisModule_GetKeyNameFromIO(io)); - model->infokey = RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_MODEL, backend, devicestr, tag); + RAI_RunStats *stats = RAI_StatsCreate(stats_keystr, RAI_MODEL, backend, devicestr, tag); + RAI_StatsStoreEntry(stats_keystr, stats); + model->info = stats; for (size_t i = 0; i < ninputs; i++) { RedisModule_Free((void *)inputs[i]); @@ -202,8 +204,10 @@ void *RAI_RDBLoadScript_v3(RedisModuleIO *io) { RedisModuleString *stats_keystr = RedisModule_CreateStringFromString(stats_ctx, RedisModule_GetKeyNameFromIO(io)); - script->infokey = - RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_RunStats *stats = + RAI_StatsCreate(stats_keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag); + RAI_StatsStoreEntry(stats_keystr, stats); + script->info = stats; RedisModule_FreeString(NULL, stats_keystr); RedisModule_FreeString(NULL, tag); diff --git a/tests/flow/includes.py b/tests/flow/includes.py index c5286d875..1c61c175b 100755 --- a/tests/flow/includes.py +++ b/tests/flow/includes.py @@ -3,7 +3,7 @@ import random import sys import time -from multiprocessing import Process +from multiprocessing import Process, Pipe import threading import redis @@ -31,6 +31,9 @@ MAX_TRANSACTIONS=100 +def get_connection(env, routing_hint): + return env.getConnectionByKey(routing_hint, 'SET') + # returns the test name and line number from which a helper function within this file was called. # For example, if an assertion fails in check_error_message function, and the caller function to check_error_message # is in tests_onnx.py line 25, this should return: "tests_onnx:py:25" @@ -188,7 +191,7 @@ def load_creditcardfraud_data(env,max_tensors=10000): return model_pb, creditcard_transactions, creditcard_referencedata -def run_mobilenet(con, img, input_var, output_var): +def run_mobilenet(con, i, img, input_var, output_var): time.sleep(0.5 * random.randint(0, 10)) con.execute_command('AI.TENSORSET', 'input{1}', 'FLOAT', 1, img.shape[1], img.shape[0], img.shape[2], @@ -201,19 +204,31 @@ def run_mobilenet(con, img, input_var, output_var): def run_test_multiproc(env, routing_hint, n_procs, fn, args=tuple()): procs = [] - def tmpfn(): - con = env.getConnectionByKey(routing_hint, 'SET') - fn(con, *args) + def tmpfn(i): + con = get_connection(env, routing_hint) + fn(con, i, *args) return 1 - for _ in range(n_procs): - p = Process(target=tmpfn) + for i in range(n_procs): + p = Process(target=tmpfn, args=(i, )) p.start() procs.append(p) [p.join() for p in procs] +def get_parent_children_pipes(num_children): + parent_end_pipes = [] + children_end_pipes = [] + + # Create a pipe for every child process, so it can report number of successful runs. + for i in range(num_children): + parent_pipe, child_pipe = Pipe() + parent_end_pipes.append(parent_pipe) + children_end_pipes.append(child_pipe) + + return parent_end_pipes, children_end_pipes + # Load a model/script from a file located in test_data dir. def load_file_content(file_name): test_data_path = os.path.join(os.path.dirname(__file__), 'test_data') @@ -250,7 +265,3 @@ def get_info_section(con, section): section_ind = [i for i in range(len(sections)) if sections[i] == 'ai_'+section][0] return {k.split(":")[0]: k.split(":")[1] for k in con.execute_command("INFO MODULES").decode().split("#")[section_ind+2].split()[1:]} - - -def get_connection(env, routing_hint): - return env.getConnectionByKey(routing_hint, 'SET') diff --git a/tests/flow/onnx_benchmark.py b/tests/flow/onnx_benchmark.py index f208053e6..f3a701e59 100755 --- a/tests/flow/onnx_benchmark.py +++ b/tests/flow/onnx_benchmark.py @@ -64,7 +64,7 @@ def run_benchmark(env, num_runs_mnist, num_runs_inception, num_runs_bert, num_pa ret = con.execute_command('AI.TENSORSET', 'bert_in{1}', 'INT64', 10, 100, 'BLOB', bert_in_data.tobytes()) env.assertEqual(ret, b'OK') - def run_parallel_onnx_sessions(con, model, input, num_runs): + def run_parallel_onnx_sessions(con, i, model, input, num_runs): for _ in range(num_runs): if terminate_flag == 1: return diff --git a/tests/flow/tests_commands.py b/tests/flow/tests_commands.py index 251a0c9d9..a7d1f151c 100644 --- a/tests/flow/tests_commands.py +++ b/tests/flow/tests_commands.py @@ -1,6 +1,5 @@ -import redis - from includes import * +from tests_llapi import with_test_module ''' python -m RLTest --test tests_commands.py --module path/to/redisai.so @@ -358,3 +357,182 @@ def test_pytorch_scriptexecute_variadic_errors(env): check_error(env, con, 'AI.SCRIPTEXECUTE', 'ket{$}', 'bar_variadic', 'KEYS', 1 , '{$}', 'INPUTS', 'OUTPUTS') + +def create_run_update_models_parallel(con, client_id, # these are mandatory + env, model_pb, pipes, iterations_num, multiple_keys): + my_pipe = pipes[client_id] + total_success_num = 0 + model_key_name = str(client_id)+'_m{1}' if multiple_keys else 'm{1}' + ret = con.execute_command('AI.MODELSTORE', model_key_name, 'TF', DEVICE, + 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', model_pb) + env.assertEqual(ret, b'OK') + + # Sanity test to verify that AI.INFO command can run safely in parallel with write and execution commands, which + # read and write (atomically) to same counters of a RunStats entry, and change the global RunStats dict. + for i in range(iterations_num): + try: + con.execute_command('AI.MODELDEL', model_key_name) + ret = con.execute_command('AI.MODELSTORE', model_key_name, 'TF', DEVICE, + 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', model_pb) + env.assertEqual(ret, b'OK') + env.assertEqual(con.execute_command('AI.MODELEXECUTE', model_key_name, + 'INPUTS', 2, 'a{1}', 'b{1}', 'OUTPUTS', 1, 'c{1}'), b'OK') + total_success_num += 1 + info = info_to_dict(con.execute_command('AI.INFO', model_key_name)) + env.assertEqual(info['key'], model_key_name) + env.assertEqual(info['device'], DEVICE) + if multiple_keys: + # Calls number can be verified only in a multiple keys scenario, since when parallel clients use + # the same key, one might delete the key before another try to run the model + # (hence the try/catch structure of the test, which is relevant only for a single key scenario) + env.assertEqual(info['calls'], 1) + con.execute_command('AI.INFO', model_key_name, 'RESETSTAT') + info = info_to_dict(con.execute_command('AI.INFO', model_key_name)) + env.assertEqual(info['calls'], 0) + except Exception as e: + env.assertEqual(type(e), redis.exceptions.ResponseError) + env.assertTrue("model key is empty" == str(e) or "cannot find run info for key" == str(e)) + my_pipe.send(total_success_num) + + +def run_model_execute_from_llapi_parallel(con, client_id, # these are mandatory + env, model_pb, pipes, iterations_num): + + my_pipe = pipes[client_id] + total_success_num = 0 + model_key_name = str(client_id)+'_m{1}' + + # Use a different device than the one that is run from command API, to test also multi-device scenario. + ret = con.execute_command('AI.MODELSTORE', model_key_name, 'TF', 'CPU:1', + 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', model_pb) + env.assertEqual(ret, b'OK') + for i in range(1, iterations_num+1): + # In every call, this commands runs the model twice - once it returns with an error, and the other returns OK. + env.assertEqual(con.execute_command("RAI_llapi.modelRun", model_key_name), b'Async run success') + total_success_num += 1 + info = info_to_dict(con.execute_command('AI.INFO', model_key_name)) + env.assertEqual(info['calls'], 2*i) + env.assertEqual(info['errors'], i) + my_pipe.send(total_success_num) + + +def test_ai_info_multiproc_multi_keys(env): + if not TEST_TF: + env.debugPrint("Skipping test since TF is not available", force=True) + return + con = get_connection(env, '{1}') + + # Load model protobuf and store its input tensors. + model_pb = load_file_content('graph.pb') + con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) + con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) + + num_parallel_clients = 20 + num_iterations_per_client = 50 + + # Create a pipe for every child process, so it can report number of successful runs. + parent_end_pipes, children_end_pipes = get_parent_children_pipes(num_parallel_clients) + + # Run create_run_delete_models_parallel where every client uses a different model key. + # In every iteration, clients update the model key - which triggers deletion and insertion of the model's stats + # to the global dict. + run_test_multiproc(env, '{1}', num_parallel_clients, create_run_update_models_parallel, + args=(env, model_pb, children_end_pipes, num_iterations_per_client, True)) + # Expect that every child will succeed in every model execution - and report it to the parent thorough the pipe. + env.assertEqual(sum([p.recv() for p in parent_end_pipes]), num_parallel_clients*num_iterations_per_client) + + # Get the list of models in the system and verify that every model appears once in the global dict. + models = con.execute_command('AI._MODELSCAN') + env.assertEqual(len(models), num_parallel_clients) + + +def test_ai_info_multiproc_single_key(env): + if not TEST_TF: + env.debugPrint("Skipping test since TF is not available", force=True) + return + con = get_connection(env, '{1}') + + # Load model protobuf and store its input tensors. + model_pb = load_file_content('graph.pb') + con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) + con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) + + num_parallel_clients = 20 + num_iterations_per_client = 50 + + # Run create_run_delete_models_parallel, but this time over the same model key. + # Note that there may be cases where the model is deleted by some client while other clients + # try to access the model key. + ret = con.execute_command('AI.MODELSTORE', 'm{1}', 'TF', DEVICE, + 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', model_pb) + env.assertEqual(ret, b'OK') + parent_end_pipes, children_end_pipes = get_parent_children_pipes(num_parallel_clients) + run_test_multiproc(env, '{1}', num_parallel_clients, create_run_update_models_parallel, + args=(env, model_pb, children_end_pipes, num_iterations_per_client, False)) + + # Expect minimal number of success (one per client in average) + # in running the models (without having it deleted by another client). + num_success = sum([p.recv() for p in parent_end_pipes]) + env.assertGreaterEqual(num_parallel_clients*num_iterations_per_client, num_success) + # Valgrind impacts the timings, so number of success may be lower. + env.assertGreaterEqual(num_success, 1 if VALGRIND or DEVICE == "GPU" else num_parallel_clients) + + # At the end, expect that every client ran the last created model at most once. + info = info_to_dict(con.execute_command('AI.INFO', 'm{1}')) + env.assertGreaterEqual(info['calls'], 0) + env.assertGreaterEqual(num_parallel_clients, info['calls']) + + +@with_test_module +def test_ai_info_multiproc_with_llapi(env): + if not TEST_TF: + env.debugPrint("Skipping test since TF is not available", force=True) + return + con = get_connection(env, '{1}') + + # Load model protobuf and store its input tensors. + model_pb = load_file_content('graph.pb') + con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) + con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) + + num_parallel_clients = 20 + num_iterations_per_client = 50 + + # This is just a wrapper that triggers the multi-proc test + def run_model_execute_from_llapi(): + parent_end_pipes_llapi, children_end_pipes_llapi = get_parent_children_pipes(num_parallel_clients) + run_test_multiproc(env, '{1}', num_parallel_clients, run_model_execute_from_llapi_parallel, + args=(env, model_pb, children_end_pipes_llapi, num_iterations_per_client)) + # Expect that every child will succeed in every model execution - and report it to the parent thorough the pipe. + env.assertEqual(sum([p.recv() for p in parent_end_pipes_llapi]), + num_parallel_clients*num_iterations_per_client) + + # Run models both from low-level API and from command. + t = threading.Thread(target=run_model_execute_from_llapi) + t.start() + + # Run with single model key. + ret = con.execute_command('AI.MODELSTORE', 'm{1}', 'TF', DEVICE, + 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', model_pb) + env.assertEqual(ret, b'OK') + parent_end_pipes, children_end_pipes = get_parent_children_pipes(num_parallel_clients) + run_test_multiproc(env, '{1}', num_parallel_clients, create_run_update_models_parallel, + args=(env, model_pb, children_end_pipes, num_iterations_per_client, False)) + + # Expect minimal number of success (one per five clients in average) + # in running the models (without having it deleted by another client). + num_success = sum([p.recv() for p in parent_end_pipes]) + env.assertGreaterEqual(num_parallel_clients*num_iterations_per_client, num_success) + # Valgrind impacts the timings, so number of success may be lower. + env.assertGreaterEqual(num_success, 1 if VALGRIND or DEVICE == "GPU" else num_parallel_clients/5) + + t.join() + # Get the list of models in the system and verify that every model appears once in the global dict. + models = con.execute_command('AI._MODELSCAN') + # In LLAPI, every client used a distinct model name, while clients that ran via command line used a different name. + env.assertEqual(len(models), num_parallel_clients + 1) + + # At the end, expect that every client (that didn't use the LLAPI) ran the last created model at most once. + info = info_to_dict(con.execute_command('AI.INFO', 'm{1}')) + env.assertGreaterEqual(info['calls'], 0) + env.assertGreaterEqual(num_parallel_clients, info['calls']) diff --git a/tests/flow/tests_common.py b/tests/flow/tests_common.py index 445fc1679..717e4f6a4 100644 --- a/tests/flow/tests_common.py +++ b/tests/flow/tests_common.py @@ -224,7 +224,7 @@ def test_common_tensorget_error_replies(env): def test_common_tensorset_multiproc(env): run_test_multiproc(env, 'x', 10, - lambda env: env.execute_command('AI.TENSORSET', 'x', 'FLOAT', 2, 'VALUES', 2, 3)) + lambda env, i: env.execute_command('AI.TENSORSET', 'x', 'FLOAT', 2, 'VALUES', 2, 3)) con = get_connection(env, 'x') ensureSlaveSynced(con, env) @@ -258,7 +258,7 @@ def funcname(env, blob, repetitions, same_key): tensor_blob = tested_datatypes_map["FLOAT"] t = time.time() run_test_multiproc(env, '{0}', 10, - lambda env: funcname(env,tensor_blob,MAX_TRANSACTIONS,10) ) + lambda env, i: funcname(env,tensor_blob,MAX_TRANSACTIONS,10) ) elapsed_time = time.time() - t avg_ops_sec = 100000*10/elapsed_time # env.debugPrint("AI.TENSORSET elapsed time(sec) {:6.2f}\tAvg. ops/sec {:10.2f}".format(elapsed_time, avg_ops_sec), True) diff --git a/tests/flow/tests_dag.py b/tests/flow/tests_dag.py index d9a5fb465..095ee80ba 100644 --- a/tests/flow/tests_dag.py +++ b/tests/flow/tests_dag.py @@ -311,14 +311,14 @@ def run(): elif command[3][0] == b"AI.DAGEXECUTE": # Found second command. add the slower time to total_time. if first > command[2]: total_time += first - env.assertTrue((end - start)*1000000 >= first) + env.assertGreaterEqual((end - start)*1000000, first) else: total_time += command[2] - env.assertTrue((end - start)*1000000 >= command[2]) + env.assertGreaterEqual((end - start)*1000000, command[2]) break elif command[3][0] == b"SLOWLOG": # The "SLOWLOG" is used as a mark for the previus iteration. total_time += first # Try adding 'first'. The next assert test if first was not zero. - env.assertTrue((end - start)*1000000 >= first) + env.assertGreaterEqual((end - start)*1000000, first) break env.assertNotEqual(total_time, prev_total) # if somehow we didn't find any "AI.DAGEXECUTE" command, assert diff --git a/tests/flow/tests_gears_llapi.py b/tests/flow/tests_gears_llapi.py index f988449b5..cf16abe78 100644 --- a/tests/flow/tests_gears_llapi.py +++ b/tests/flow/tests_gears_llapi.py @@ -99,34 +99,53 @@ async def ModelRun_AsyncRunError(record): con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) + self.info = info_to_dict(con.execute_command('AI.INFO', 'm{1}')) # Runtime stats reported by AI.INFO. + for stats_item in ['duration', 'samples', 'calls', 'errors']: + self.env.assertEqual(self.info[stats_item], 0) - def test_old_api(self): + def test1_old_api(self): con = get_connection(self.env, '{1}') ret = con.execute_command('rg.trigger', 'ModelRun_oldAPI_test1') self.env.assertEqual(ret[0], b'ModelRun_oldAPI_OK') values = con.execute_command('AI.TENSORGET', 'c{1}', 'VALUES') self.env.assertEqual(values, [b'4', b'9', b'4', b'9']) - def test_async_run(self): + def test2_async_run(self): con = get_connection(self.env, '{1}') ret = con.execute_command('rg.trigger', 'ModelRun_Async_test2') self.env.assertEqual(ret[0], b'ModelRun_Async_OK') values = con.execute_command('AI.TENSORGET', 'c_1{1}', 'VALUES') self.env.assertEqual(values, [b'4', b'9', b'4', b'9']) - def test_tf_ignore_inputs_names(self): + # Check that statistics were saved properly. Note that in test1 (old API) stats are not saved. + self.info = info_to_dict(con.execute_command('AI.INFO', 'm{1}')) + self.env.assertEqual(self.info['key'], 'm{1}') + self.env.assertEqual(self.info['type'], 'MODEL') + self.env.assertEqual(self.info['backend'], 'TF') + self.env.assertEqual(self.info['device'], DEVICE) + self.env.assertGreater(self.info['duration'], 0) + self.env.assertEqual(self.info['samples'], 2) + self.env.assertEqual(self.info['calls'], 1) + self.env.assertEqual(self.info['errors'], 0) + + def test3_tf_ignore_inputs_names(self): con = get_connection(self.env, '{1}') ret = con.execute_command('rg.trigger', 'ModelRun_Async_test3') self.env.assertEqual(ret[0], b'ModelRun_Async_OK') values = con.execute_command('AI.TENSORGET', 'c_2{1}', 'VALUES') self.env.assertEqual(values, [b'4', b'9', b'4', b'9']) - def test_runtime_error(self): + def test4_runtime_error(self): con = get_connection(self.env, '{1}') ret = con.execute_command('rg.trigger', 'ModelRun_AsyncRunError_test4') # This should raise an exception self.env.assertEqual(str(ret[0]), "b'Must specify at least one target to fetch or execute.'") + info = info_to_dict(con.execute_command('AI.INFO', 'm{1}')) + self.env.assertEqual(info['calls'], 3) + self.env.assertEqual(info['errors'], 1) + self.env.assertGreater(info['duration'], self.info['duration']) + class TestScriptExecuteFromGears: @@ -190,27 +209,46 @@ async def ScriptRun_AsyncRunError(record): self.env.assertEqual(ret, b'OK') ret = con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) self.env.assertEqual(ret, b'OK') + self.info = info_to_dict(con.execute_command('AI.INFO', 'myscript{1}')) # Runtime stats reported by AI.INFO. + for stats_item in ['duration', 'calls', 'errors']: + self.env.assertEqual(self.info[stats_item], 0) - def test_old_api(self): + def test1_old_api(self): con = get_connection(self.env, '{1}') ret = con.execute_command('rg.trigger', 'ScriptRun_oldAPI_test1') self.env.assertEqual(ret[0], b'ScriptRun_oldAPI_OK') values = con.execute_command('AI.TENSORGET', 'c{1}', 'VALUES') self.env.assertEqual(values, [b'4', b'6', b'4', b'6']) - def test_async_execution(self): + def test2_async_execution(self): con = get_connection(self.env, '{1}') ret = con.execute_command('rg.trigger', 'ScriptRun_Async_test2') self.env.assertEqual(ret[0], b'ScriptRun_Async_OK') values = con.execute_command('AI.TENSORGET', 'c_1{1}', 'VALUES') self.env.assertEqual(values, [b'4', b'6', b'4', b'6']) - def test_runtime_error(self): + # Check that statistics were saved properly. Note that in test1 (old API) stats are not saved. + self.info = info_to_dict(con.execute_command('AI.INFO', 'myscript{1}')) + self.env.assertEqual(self.info['key'], 'myscript{1}') + self.env.assertEqual(self.info['type'], 'SCRIPT') + self.env.assertEqual(self.info['backend'], 'TORCH') + self.env.assertEqual(self.info['tag'], 'version1') + self.env.assertEqual(self.info['device'], DEVICE) + self.env.assertGreater(self.info['duration'], 0) + self.env.assertEqual(self.info['calls'], 1) + self.env.assertEqual(self.info['errors'], 0) + + def test3_runtime_error(self): con = get_connection(self.env, '{1}') ret = con.execute_command('rg.trigger', 'ScriptRun_AsyncRunError_test3') # This should raise an exception self.env.assertTrue(str(ret[0]).startswith("b'Function does not exist:")) + info = info_to_dict(con.execute_command('AI.INFO', 'myscript{1}')) + self.env.assertEqual(info['calls'], 2) + self.env.assertEqual(info['errors'], 1) + self.env.assertEqual(info['duration'], self.info['duration']) + class TestDAGRunExecution: @@ -315,12 +353,12 @@ def test_modelset_modelget_ops(self): self.env.assertEqual(values, [b'2', b'3', b'2', b'3']) def test_modelexecute_op(self): - executions_num = 500 + executions_num = 100 if VALGRIND: executions_num = 10 - def multiple_executions(con): + def multiple_executions(con, i): ret = con.execute_command('rg.trigger', 'DAGRun_test2') self.env.assertEqual(ret[0], b'test2_OK') values = con.execute_command('AI.TENSORGET', 'test2_res{1}', 'VALUES') diff --git a/tests/flow/tests_llapi.py b/tests/flow/tests_llapi.py index b7ef44938..f454342c4 100644 --- a/tests/flow/tests_llapi.py +++ b/tests/flow/tests_llapi.py @@ -3,6 +3,7 @@ from includes import * import os from functools import wraps +from RLTest import Env ''' python -m RLTest --test tests_llapi.py --module path/to/redisai.so @@ -11,6 +12,7 @@ def with_test_module(f): @wraps(f) def wrapper(env, *args, **kwargs): + env = Env(moduleArgs='THREADS_PER_QUEUE 8') con = get_connection(env, '{1}') modules = con.execute_command("MODULE", "LIST") if b'RAI_llapi' in [module[1] for module in modules]: @@ -49,19 +51,28 @@ def test_model_run_async(env): env.assertEqual(ret, b'OK') con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) + + # This command in test module runs the model twice - one run returns with an error and the second with success. ret = con.execute_command("RAI_llapi.modelRun") env.assertEqual(ret, b'Async run success') + # Check that statistics were saved properly + info = info_to_dict(con.execute_command('AI.INFO', 'm{1}')) + env.assertEqual(info['key'], 'm{1}') + env.assertEqual(info['type'], 'MODEL') + env.assertEqual(info['backend'], 'TF') + env.assertEqual(info['device'], DEVICE) + env.assertGreater(info['duration'], 0) + env.assertEqual(info['samples'], 2) + env.assertEqual(info['calls'], 2) + env.assertEqual(info['errors'], 1) + @with_test_module def test_script_run_async(env): con = get_connection(env, '{1}') - test_data_path = os.path.join(os.path.dirname(__file__), 'test_data') - script_filename = os.path.join(test_data_path, 'script.txt') - - with open(script_filename, 'rb') as f: - script = f.read() + script = load_file_content('script.txt') ret = con.execute_command('AI.SCRIPTSTORE', 'myscript{1}', DEVICE, 'TAG', 'version1', 'ENTRY_POINTS', 2, 'bar', 'bar_variadic', 'SOURCE', script) env.assertEqual(ret, b'OK') @@ -71,46 +82,71 @@ def test_script_run_async(env): ret = con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) env.assertEqual(ret, b'OK') + # This command in test module runs the script twice - onc run returns with an error and the second with success. ret = con.execute_command("RAI_llapi.scriptRun") env.assertEqual(ret, b'Async run success') + # Check that statistics were saved properly + info = info_to_dict(con.execute_command('AI.INFO', 'myscript{1}')) + env.assertEqual(info['key'], 'myscript{1}') + env.assertEqual(info['type'], 'SCRIPT') + env.assertEqual(info['backend'], 'TORCH') + env.assertEqual(info['device'], DEVICE) + env.assertGreater(info['duration'], 0) + env.assertEqual(info['calls'], 2) + env.assertEqual(info['errors'], 1) + @with_test_module def test_dag_build_and_run(env): con = get_connection(env, '{1}') - con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', - 2, 2, 'VALUES', 2, 3, 2, 3) - con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', - 2, 2, 'VALUES', 2, 3, 2, 3) - test_data_path = os.path.join(os.path.dirname(__file__), 'test_data') - model_filename = os.path.join(test_data_path, 'graph.pb') + con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) + con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) - with open(model_filename, 'rb') as f: - model_pb = f.read() + model_pb = load_file_content('graph.pb') ret = con.execute_command('AI.MODELSTORE', 'm{1}', 'TF', DEVICE, 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', model_pb) env.assertEqual(ret, b'OK') - script_filename = os.path.join(test_data_path, 'script.txt') - with open(script_filename, 'rb') as f: - script = f.read() + script = load_file_content('script.txt') ret = con.execute_command('AI.SCRIPTSTORE', 'myscript{1}', DEVICE, 'TAG', 'version1', 'ENTRY_POINTS', 2, 'bar', 'bar_variadic', 'SOURCE', script) env.assertEqual(ret, b'OK') ret = con.execute_command("RAI_llapi.DAGrun") env.assertEqual(ret, b'DAG run success') - # Run the DAG LLAPI test again with multi process test to ensure that there are no dead-locks - executions_num = 500 - if VALGRIND: - executions_num = 10 + # Run the DAG LLAPI test again with multiprocess test to ensure that there are no deadlocks + executions_num = 100 if not VALGRIND else 10 - def run_dag_llapi(con): + def run_dag_llapi(con, i): con.execute_command("RAI_llapi.DAGrun") run_test_multiproc(env, '{1}', executions_num, run_dag_llapi) + # Check that statistics were saved properly - in every execution the model run (successfully) twice. + # Every run is over two samples (the dim[0] of the input tensors). + info = info_to_dict(con.execute_command('AI.INFO', 'm{1}')) + env.assertEqual(info['key'], 'm{1}') + env.assertEqual(info['type'], 'MODEL') + env.assertEqual(info['backend'], 'TF') + env.assertEqual(info['device'], DEVICE) + env.assertGreater(info['duration'], executions_num) + env.assertEqual(info['samples'], 4*(executions_num + 1)) + env.assertEqual(info['calls'], 2*(executions_num + 1)) + env.assertEqual(info['errors'], 0) + + # Check that statistics were saved properly - in every execution the script run twice - once successfully + # and once with an error. + info = info_to_dict(con.execute_command('AI.INFO', 'myscript{1}')) + env.assertEqual(info['key'], 'myscript{1}') + env.assertEqual(info['type'], 'SCRIPT') + env.assertEqual(info['backend'], 'TORCH') + env.assertEqual(info['device'], DEVICE) + env.assertGreater(info['duration'], executions_num) + env.assertEqual(info['calls'], 2*(executions_num + 1)) + env.assertEqual(info['errors'], executions_num + 1) + @with_test_module def test_dagrun_multidevice_resnet(env): @@ -119,15 +155,9 @@ def test_dagrun_multidevice_resnet(env): model_name_0 = 'imagenet_model1:{1}' model_name_1 = 'imagenet_model2:{1}' script_name_0 = 'imagenet_script1:{1}' - script_name_1 = 'imagenet_script2:{1}' - inputvar = 'images' - outputvar = 'output' + input_var = 'images' + output_var = 'output' image_key = 'image:{1}' - temp_key1 = 'temp_key1:{1}' - temp_key2_0 = 'temp_key2_0' - temp_key2_1 = 'temp_key2_1' - class_key_0 = 'output0:{1}' - class_key_1 = 'output1:{1}' model_pb, script, labels, img = load_resnet_test_data() @@ -135,19 +165,18 @@ def test_dagrun_multidevice_resnet(env): device_1 = DEVICE ret = con.execute_command('AI.MODELSTORE', model_name_0, 'TF', device_0, - 'INPUTS', 1, inputvar, - 'OUTPUTS', 1, outputvar, + 'INPUTS', 1, input_var, + 'OUTPUTS', 1, output_var, 'BLOB', model_pb) env.assertEqual(ret, b'OK') ret = con.execute_command('AI.MODELSTORE', model_name_1, 'TF', device_1, - 'INPUTS', 1, inputvar, - 'OUTPUTS', 1, outputvar, + 'INPUTS', 1, input_var, + 'OUTPUTS', 1, output_var, 'BLOB', model_pb) env.assertEqual(ret, b'OK') - ret = con.execute_command('AI.SCRIPTSTORE', script_name_0, device_0, 'ENTRY_POINTS', 4, 'pre_process_3ch', 'pre_process_4ch', 'post_process', 'ensemble', 'SOURCE', script) - env.assertEqual(ret, b'OK') - ret = con.execute_command('AI.SCRIPTSTORE', script_name_1, device_1, 'ENTRY_POINTS', 4, 'pre_process_3ch', 'pre_process_4ch', 'post_process', 'ensemble', 'SOURCE', script) + ret = con.execute_command('AI.SCRIPTSTORE', script_name_0, DEVICE, 'ENTRY_POINTS', 4, 'pre_process_3ch', + 'pre_process_4ch', 'post_process', 'ensemble', 'SOURCE', script) env.assertEqual(ret, b'OK') ret = con.execute_command('AI.TENSORSET', image_key, 'UINT8', img.shape[1], img.shape[0], 3, 'BLOB', img.tobytes()) env.assertEqual(ret, b'OK') @@ -155,6 +184,37 @@ def test_dagrun_multidevice_resnet(env): ret = con.execute_command("RAI_llapi.DAG_resnet") env.assertEqual(ret, b'DAG resnet success') + # Check that statistics were saved properly - in every execution both model run once, each on a different + # device, and the script run 3 times. + info = info_to_dict(con.execute_command('AI.INFO', model_name_0)) + env.assertEqual(info['key'], model_name_0) + env.assertEqual(info['type'], 'MODEL') + env.assertEqual(info['backend'], 'TF') + env.assertEqual(info['device'], device_0) + env.assertGreater(info['duration'], 0) + env.assertEqual(info['samples'], 1) + env.assertEqual(info['calls'], 1) + env.assertEqual(info['errors'], 0) + + info = info_to_dict(con.execute_command('AI.INFO', model_name_1)) + env.assertEqual(info['key'], model_name_1) + env.assertEqual(info['type'], 'MODEL') + env.assertEqual(info['backend'], 'TF') + env.assertEqual(info['device'], device_1) + env.assertGreater(info['duration'], 0) + env.assertEqual(info['samples'], 1) + env.assertEqual(info['calls'], 1) + env.assertEqual(info['errors'], 0) + + info = info_to_dict(con.execute_command('AI.INFO', script_name_0)) + env.assertEqual(info['key'], script_name_0) + env.assertEqual(info['type'], 'SCRIPT') + env.assertEqual(info['backend'], 'TORCH') + env.assertEqual(info['device'], DEVICE) + env.assertGreater(info['duration'], 0) + env.assertEqual(info['calls'], 3) + env.assertEqual(info['errors'], 0) + @with_test_module def test_tensor_create(env): diff --git a/tests/flow/tests_onnx.py b/tests/flow/tests_onnx.py index f79bb3e9b..7a0a575b9 100644 --- a/tests/flow/tests_onnx.py +++ b/tests/flow/tests_onnx.py @@ -479,7 +479,7 @@ def test_3_memory_limit(self): 'AI.MODELEXECUTE', 'mnist_0{1}', 'INPUTS', 1, 'a{1}', 'OUTPUTS', 1, 'b{1}', error_msg_is_substr=True) - def run_parallel_onnx_sessions(con): + def run_parallel_onnx_sessions(con, i): check_error_message(self.env, con, "Onnxruntime memory limit exceeded, memory allocation failed.", 'AI.MODELEXECUTE', 'mnist_0{1}', 'INPUTS', 1, 'a{1}', 'OUTPUTS', 1, 'b{1}', error_msg_is_substr=True) @@ -523,7 +523,7 @@ def test_multiple_working_threads(self): self.env.assertEqual(ret, b'OK') con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', 1, 1, 28, 28, 'BLOB', sample_raw) - def run_parallel_onnx_sessions(con): + def run_parallel_onnx_sessions(con, i): ret = con.execute_command('AI.MODELEXECUTE', 'mnist{1}', 'INPUTS', 1, 'a{1}', 'OUTPUTS', 1, 'b{1}') self.env.assertEqual(ret, b'OK') check_error_message(self.env, con, "Exiting due to terminate flag being set to true", diff --git a/tests/flow/tests_tensorflow.py b/tests/flow/tests_tensorflow.py index 8ba8ef1ca..b02c7d26c 100644 --- a/tests/flow/tests_tensorflow.py +++ b/tests/flow/tests_tensorflow.py @@ -630,7 +630,7 @@ def functor_financialNet(env, key_max, repetitions): t = time.time() run_test_multiproc(env, '{1}', 10, - lambda env: functor_financialNet(env, MAX_TRANSACTIONS, 100) ) + lambda env, i: functor_financialNet(env, MAX_TRANSACTIONS, 100) ) elapsed_time = time.time() - t total_ops = len(transaction_tensor)*100 avg_ops_sec = total_ops/elapsed_time diff --git a/tests/module/LLAPI.c b/tests/module/LLAPI.c index 64db4a863..d02005122 100644 --- a/tests/module/LLAPI.c +++ b/tests/module/LLAPI.c @@ -142,14 +142,21 @@ static int _ExecuteScriptRunAsync(RedisModuleCtx *ctx, RAI_ScriptRunCtx *sctx) { int RAI_llapi_modelRun(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); - if (argc > 1) { + if (argc > 2) { RedisModule_WrongArity(ctx); return REDISMODULE_OK; } - // The model m{1} should exist in key space. + // The model should exist in key space. The default name is `m{1}`, but it can also be + // overridden with argv[1]. const char *keyNameStr = "m{1}"; + if (argc == 2) { + keyNameStr = RedisModule_StringPtrLen(argv[1], NULL); + } RedisModuleString *keyRedisStr = RedisModule_CreateString(ctx, keyNameStr, strlen(keyNameStr)); RedisModuleKey *key = RedisModule_OpenKey(ctx, keyRedisStr, REDISMODULE_READ); + if (!key) { + return RedisModule_ReplyWithError(ctx, "ERR model key is empty"); + } RAI_Model *model = RedisModule_ModuleTypeGetValue(key); RAI_ModelRunCtx *mctx = RedisAI_ModelRunCtxCreate(model); RedisModule_FreeString(ctx, keyRedisStr);