Skip to content

Enable saving run stats from low-level API #904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
662298b
Refactor RAI_RunStats object and API + save stats entry in model/scri…
alonre24 Mar 15, 2022
adc3b03
Add tests with low-level API
alonre24 Mar 15, 2022
0bd424b
Add multi-proc test + some refactor in run_test_multiproc - allows ev…
alonre24 Mar 16, 2022
f6ad289
Refactor - separate between RunStats struct API to RunStats global di…
alonre24 Mar 16, 2022
7c75008
Refactor - separate between RunStats struct API to RunStats global di…
alonre24 Mar 16, 2022
6f30421
Merge branch 'master' into save_stats_in_LLAPI
alonre24 Mar 16, 2022
d1e4d77
PR comment
alonre24 Mar 16, 2022
86c2c9e
Merge branch 'save_stats_in_LLAPI' of https://github.com/RedisAI/Redi…
alonre24 Mar 16, 2022
c8ce9b2
Change test slightly to test case where system has multiple models
alonre24 Mar 17, 2022
4dd6dd2
Refactor test (WIP)
alonre24 Mar 20, 2022
378f814
Add ref count mechanism for the RunStats struct + refactor synchronis…
alonre24 Mar 26, 2022
ca7c28c
formatting
alonre24 Mar 26, 2022
7c0299c
Update test to pass valgrind + extend timeout on tests with GPU in CI
alonre24 Mar 27, 2022
65cca2d
Refactor test + do not use ref count for stats
alonre24 Mar 27, 2022
7959f8e
Change logging level in TF - to avoid "freeze" upon extensive writing…
alonre24 Apr 5, 2022
8246df5
Code review changes
alonre24 Apr 6, 2022
95f0d6e
Remove leftover
alonre24 Apr 6, 2022
345df8b
Change test in GPU - decrease the minimal number of expected success
alonre24 Apr 6, 2022
7be5f49
Relax demand in min num success
alonre24 Apr 6, 2022
6923e86
Merge branch 'master' into save_stats_in_LLAPI
alonre24 Apr 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/backends/tensorflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ int RAI_InitBackendTF(int (*get_api_fn)(const char *, void *)) {
get_api_fn("RedisModule_Realloc", ((void **)&RedisModule_Realloc));
get_api_fn("RedisModule_Strdup", ((void **)&RedisModule_Strdup));

// Set min logging level to 3 (out of 5) - this is workaround since if TF is writing extensively
// log messages that to stderr, it may cause the system to be stuck.
RedisModule_Assert(putenv("TF_CPP_MIN_LOG_LEVEL=3") == 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not 5?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that it's working with 3 as well, so I left it like that (not really matters...)


return REDISMODULE_OK;
}

Expand Down
35 changes: 3 additions & 32 deletions src/execution/DAG/dag.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static void Dag_LoadInputsToCurrentOp(RedisAI_RunInfo *rinfo, RAI_DagOp *current
}

for (uint i = 0; i < n_outkeys; i++) {
RAI_ExecutionCtx_AddOuputPlaceholder(currentOp->ectx);
RAI_ExecutionCtx_AddOutputPlaceholder(currentOp->ectx);
}
}

Expand Down Expand Up @@ -260,7 +260,7 @@ void RedisAI_DagRunSession_ScriptRun_Step(RedisAI_RunInfo *rinfo, RAI_DagOp *cur
RAI_ExecutionCtx_AddInput(currentOp->ectx, inputTensors[i]);
}
for (uint i = 0; i < n_outkeys; i++) {
RAI_ExecutionCtx_AddOuputPlaceholder(currentOp->ectx);
RAI_ExecutionCtx_AddOutputPlaceholder(currentOp->ectx);
}
}

Expand Down Expand Up @@ -574,44 +574,15 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
break;
}

case REDISAI_DAG_CMD_MODELRUN: {
rinfo->dagReplyLength++;
struct RedisAI_RunStats *rstats = NULL;
RAI_GetRunStats(currentOp->runkey, &rstats);
if (currentOp->result == REDISMODULE_ERR) {
RAI_SafeAddDataPoint(rstats, 0, 1, 1, 0);
RedisModule_ReplyWithError(ctx, currentOp->err->detail_oneline);
dag_error = 1;
} else if (currentOp->result == -1) {
RedisModule_ReplyWithSimpleString(ctx, "NA");
} else {
RAI_Tensor *t = NULL;
if (RAI_ExecutionCtx_NumOutputs(currentOp->ectx) > 0) {
t = RAI_ExecutionCtx_GetOutput(currentOp->ectx, 0);
}
int batch_size = 0;
if (t) {
batch_size = RAI_TensorDim(t, 0);
}
RAI_SafeAddDataPoint(rstats, currentOp->duration_us, 1, 0, batch_size);
RedisModule_ReplyWithSimpleString(ctx, "OK");
}
break;
}

case REDISAI_DAG_CMD_MODELRUN:
case REDISAI_DAG_CMD_SCRIPTRUN: {
rinfo->dagReplyLength++;
struct RedisAI_RunStats *rstats = NULL;
RAI_GetRunStats(currentOp->runkey, &rstats);
if (currentOp->result == REDISMODULE_ERR) {
RAI_SafeAddDataPoint(rstats, 0, 1, 1, 0);
RedisModule_ReplyWithError(ctx, currentOp->err->detail_oneline);
dag_error = 1;
} else if (currentOp->result == -1) {
RedisModule_ReplyWithSimpleString(ctx, "NA");
} else {
int batch_size = 1;
RAI_SafeAddDataPoint(rstats, currentOp->duration_us, 1, 0, batch_size);
RedisModule_ReplyWithSimpleString(ctx, "OK");
}
break;
Expand Down
2 changes: 0 additions & 2 deletions src/execution/DAG/dag_builder.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ RAI_DAGRunOp *RAI_DAGCreateModelRunOp(RAI_Model *model) {
op->commandType = REDISAI_DAG_CMD_MODELRUN;
op->ectx = (RAI_ExecutionCtx *)mctx;
op->devicestr = model->devicestr;
op->runkey = RAI_HoldString((RedisModuleString *)model->infokey);
return (RAI_DAGRunOp *)op;
}

Expand All @@ -73,7 +72,6 @@ RAI_DAGRunOp *RAI_DAGCreateScriptRunOp(RAI_Script *script, const char *func_name
op->commandType = REDISAI_DAG_CMD_SCRIPTRUN;
op->ectx = (RAI_ExecutionCtx *)sctx;
op->devicestr = script->devicestr;
op->runkey = RAI_HoldString((RedisModuleString *)script->infokey);
return (RAI_DAGRunOp *)op;
}

Expand Down
5 changes: 0 additions & 5 deletions src/execution/DAG/dag_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ int RAI_InitDagOp(RAI_DagOp **result) {
dagOp = (RAI_DagOp *)RedisModule_Calloc(1, sizeof(RAI_DagOp));

dagOp->commandType = REDISAI_DAG_CMD_NONE;
dagOp->runkey = NULL;
dagOp->inkeys = (RedisModuleString **)array_new(RedisModuleString *, 1);
dagOp->outkeys = (RedisModuleString **)array_new(RedisModuleString *, 1);
dagOp->inkeys_indices = array_new(size_t, 1);
Expand All @@ -31,13 +30,9 @@ int RAI_InitDagOp(RAI_DagOp **result) {
return REDISMODULE_OK;
}

void RAI_DagOpSetRunKey(RAI_DagOp *dagOp, RedisModuleString *runkey) { dagOp->runkey = runkey; }

void RAI_FreeDagOp(RAI_DagOp *dagOp) {

RAI_FreeError(dagOp->err);
if (dagOp->runkey)
RedisModule_FreeString(NULL, dagOp->runkey);

if (dagOp->outTensor)
RAI_TensorFree(dagOp->outTensor);
Expand Down
10 changes: 0 additions & 10 deletions src/execution/DAG/dag_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ typedef enum DAGCommand {

typedef struct RAI_DagOp {
DAGCommand commandType;
RedisModuleString *runkey;
RedisModuleString **inkeys;
RedisModuleString **outkeys;
size_t *inkeys_indices;
Expand Down Expand Up @@ -48,12 +47,3 @@ int RAI_InitDagOp(RAI_DagOp **result);
* @param RAI_DagOp context in which RedisAI command operates.
*/
void RAI_FreeDagOp(RAI_DagOp *dagOp);

/**
* @brief Sets the key name of current dag op execution subject. The subject is either a model or a
* script.
*
* @param dagOp Current op.
* @param runkey Subject key name.
*/
void RAI_DagOpSetRunKey(RAI_DagOp *dagOp, RedisModuleString *runkey);
30 changes: 30 additions & 0 deletions src/execution/background_workers.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,40 @@ static void _BGThread_Wait(RunQueueInfo *run_queue_info) {
&absTimeout);
}

static void _BGThread_SaveStats(RedisAI_RunInfo *rinfo) {
for (size_t i = 0; i < rinfo->dagOpCount; i++) {
RAI_DagOp *currentOp = rinfo->dagOps[i];

if (currentOp->commandType == REDISAI_DAG_CMD_MODELRUN ||
currentOp->commandType == REDISAI_DAG_CMD_SCRIPTRUN) {
if (currentOp->result == REDISMODULE_ERR) {
RAI_StatsAddDataPoint(RAI_ExecutionCtx_GetStats(currentOp->ectx), 0, 1, 1, 0);
} else if (currentOp->result == REDISMODULE_OK) {
unsigned long batch_size = 1;
if (currentOp->commandType == REDISAI_DAG_CMD_MODELRUN) {
RAI_Tensor *t = NULL;
if (RAI_ExecutionCtx_NumOutputs(currentOp->ectx) > 0) {
t = RAI_ExecutionCtx_GetOutput(currentOp->ectx, 0);
}
if (t) {
batch_size = RAI_TensorDim(t, 0);
} else {
batch_size = 0;
}
}
RAI_StatsAddDataPoint(RAI_ExecutionCtx_GetStats(currentOp->ectx),
currentOp->duration_us, 1, 0, batch_size);
}
}
}
}

static void _BGThread_RinfoFinish(RedisAI_RunInfo *rinfo) {
RedisAI_RunInfo *orig = rinfo->orig_copy;
uint dagRefCount = RAI_DagRunInfoFreeShallowCopy(rinfo);
if (dagRefCount == 0) {
// Save stats for every DAG execute operation.
_BGThread_SaveStats(orig);
RedisAI_OnFinishCtx *finish_ctx = orig;
orig->OnFinish(finish_ctx, orig->private_data);
}
Expand Down
9 changes: 7 additions & 2 deletions src/execution/execution_contexts/execution_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
#include "redismodule.h"
#include "util/arr.h"

void RAI_ExecutionCtx_Init(RAI_ExecutionCtx *ctx, RAI_ExecutionCtx_Free_fn freeFn) {
void RAI_ExecutionCtx_Init(RAI_ExecutionCtx *ctx, RAI_RunStats *run_stats,
RAI_ExecutionCtx_Free_fn freeFn) {
ctx->inputs = array_new(RAI_Tensor *, 10);
ctx->outputs = array_new(RAI_Tensor *, 10);
ctx->runStats = run_stats;
ctx->freeFn = freeFn;
}
void RAI_ExecutionCtx_Free(RAI_ExecutionCtx *ctx) {
Expand All @@ -21,6 +23,7 @@ void RAI_ExecutionCtx_Free(RAI_ExecutionCtx *ctx) {
}

inline size_t RAI_ExecutionCtx_NumInputs(RAI_ExecutionCtx *ctx) { return array_len(ctx->inputs); }

inline void RAI_ExecutionCtx_AddInput(RAI_ExecutionCtx *ctx, RAI_Tensor *t) {
if (t != NULL) {
t = RAI_TensorGetShallowCopy(t);
Expand All @@ -35,7 +38,7 @@ inline RAI_Tensor *RAI_ExecutionCtx_GetInput(RAI_ExecutionCtx *ctx, size_t index

inline size_t RAI_ExecutionCtx_NumOutputs(RAI_ExecutionCtx *ctx) { return array_len(ctx->outputs); }

inline void RAI_ExecutionCtx_AddOuputPlaceholder(RAI_ExecutionCtx *ctx) {
inline void RAI_ExecutionCtx_AddOutputPlaceholder(RAI_ExecutionCtx *ctx) {
ctx->outputs = array_append(ctx->outputs, NULL);
}

Expand All @@ -48,3 +51,5 @@ inline RAI_Tensor *RAI_ExecutionCtx_GetOutput(RAI_ExecutionCtx *ctx, size_t inde
RedisModule_Assert(index < array_len(ctx->outputs));
return ctx->outputs[index];
}

inline RAI_RunStats *RAI_ExecutionCtx_GetStats(RAI_ExecutionCtx *ctx) { return ctx->runStats; }
14 changes: 12 additions & 2 deletions src/execution/execution_contexts/execution_ctx.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <redis_ai_objects/stats.h>
#include "redis_ai_objects/tensor.h"

// Pre decleration
Expand All @@ -15,6 +16,7 @@ typedef void (*RAI_ExecutionCtx_Free_fn)(RAI_ExecutionCtx *ctx);
typedef struct RAI_ExecutionCtx {
RAI_Tensor **inputs; // DAG op input tensors.
RAI_Tensor **outputs; // DAG op output tensors.
RAI_RunStats *runStats; // The underline op's (Model/Script) stats entry.
RAI_ExecutionCtx_Free_fn freeFn; // Inheriting execution context free function.
} RAI_ExecutionCtx;

Expand All @@ -24,7 +26,8 @@ typedef struct RAI_ExecutionCtx {
* @param ctx - Execution context to initialize.
* @param freeFn - Specific free function for inheriting execution contexts (script or model)
*/
void RAI_ExecutionCtx_Init(RAI_ExecutionCtx *ctx, RAI_ExecutionCtx_Free_fn freeFn);
void RAI_ExecutionCtx_Init(RAI_ExecutionCtx *ctx, RAI_RunStats *run_stats,
RAI_ExecutionCtx_Free_fn freeFn);

/**
* @brief Frees the execution context internal structures. To be used from an inhereting execution
Expand Down Expand Up @@ -72,7 +75,7 @@ size_t RAI_ExecutionCtx_NumOutputs(RAI_ExecutionCtx *ctx);
*
* @param ctx - Execution context.
*/
void RAI_ExecutionCtx_AddOuputPlaceholder(RAI_ExecutionCtx *ctx);
void RAI_ExecutionCtx_AddOutputPlaceholder(RAI_ExecutionCtx *ctx);

/**
* @brief Sets an output tensor in a specfic index, populated before by a placeholder.
Expand All @@ -91,3 +94,10 @@ void RAI_ExecutionCtx_SetOutput(RAI_ExecutionCtx *ctx, RAI_Tensor *t, size_t ind
* @return RAI_Tensor* - Output tensor.
*/
RAI_Tensor *RAI_ExecutionCtx_GetOutput(RAI_ExecutionCtx *ctx, size_t index);

/**
* @brief Returns the RunStats object for underline object.
* @param ctx - Execution context.
* @return RAI_RunStats
*/
RAI_RunStats *RAI_ExecutionCtx_GetStats(RAI_ExecutionCtx *ctx);
5 changes: 3 additions & 2 deletions src/execution/execution_contexts/modelRun_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

RAI_ModelRunCtx *RAI_ModelRunCtxCreate(RAI_Model *model) {
RAI_ModelRunCtx *mctx = RedisModule_Calloc(1, sizeof(*mctx));
RAI_ExecutionCtx_Init((RAI_ExecutionCtx *)mctx, (RAI_ExecutionCtx_Free_fn)RAI_ModelRunCtxFree);
RAI_ExecutionCtx_Init((RAI_ExecutionCtx *)mctx, model->info,
(RAI_ExecutionCtx_Free_fn)RAI_ModelRunCtxFree);
mctx->model = RAI_ModelGetShallowCopy(model);
return mctx;
}
Expand All @@ -19,7 +20,7 @@ int RAI_ModelRunCtxAddInput(RAI_ModelRunCtx *mctx, const char *inputName, RAI_Te
}

int RAI_ModelRunCtxAddOutput(RAI_ModelRunCtx *mctx, const char *outputName) {
RAI_ExecutionCtx_AddOuputPlaceholder((RAI_ExecutionCtx *)mctx);
RAI_ExecutionCtx_AddOutputPlaceholder((RAI_ExecutionCtx *)mctx);
}

inline size_t RAI_ModelRunCtxNumInputs(RAI_ModelRunCtx *mctx) {
Expand Down
5 changes: 3 additions & 2 deletions src/execution/execution_contexts/scriptRun_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
RAI_ScriptRunCtx *RAI_ScriptRunCtxCreate(RAI_Script *script, const char *fnname) {
#define PARAM_INITIAL_SIZE 10
RAI_ScriptRunCtx *sctx = RedisModule_Calloc(1, sizeof(*sctx));
RAI_ExecutionCtx_Init(&sctx->base, (RAI_ExecutionCtx_Free_fn)RAI_ScriptRunCtxFree);
RAI_ExecutionCtx_Init(&sctx->base, script->info,
(RAI_ExecutionCtx_Free_fn)RAI_ScriptRunCtxFree);
sctx->script = RAI_ScriptGetShallowCopy(script);
sctx->fnname = RedisModule_Strdup(fnname);
sctx->keys = array_new(RedisModuleString *, PARAM_INITIAL_SIZE);
Expand Down Expand Up @@ -58,7 +59,7 @@ int RAI_ScriptRunCtxAddArgInput(RAI_ScriptRunCtx *sctx, RedisModuleString *arg)
}

inline int RAI_ScriptRunCtxAddOutput(RAI_ScriptRunCtx *sctx) {
RAI_ExecutionCtx_AddOuputPlaceholder(&sctx->base);
RAI_ExecutionCtx_AddOutputPlaceholder(&sctx->base);
return 1;
}

Expand Down
21 changes: 8 additions & 13 deletions src/execution/parsing/deprecated.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
static int _ModelRunCommand_ParseArgs(RedisModuleCtx *ctx, int argc, RedisModuleString **argv,
RAI_Model **model, RAI_Error *error,
RedisModuleString ***inkeys, RedisModuleString ***outkeys,
RedisModuleString **runkey, long long *timeout) {
long long *timeout) {

if (argc < 6) {
RAI_SetError(error, RAI_EMODELRUN,
Expand All @@ -30,8 +30,6 @@ static int _ModelRunCommand_ParseArgs(RedisModuleCtx *ctx, int argc, RedisModule
if (status == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
RAI_HoldString(argv[argpos]);
*runkey = argv[argpos];
const char *arg_string = RedisModule_StringPtrLen(argv[++argpos], NULL);

// Parse timeout arg if given and store it in timeout
Expand Down Expand Up @@ -87,8 +85,7 @@ int ParseModelRunCommand(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, RedisModu
RAI_Model *model;
long long timeout = 0;
if (_ModelRunCommand_ParseArgs(ctx, argc, argv, &model, rinfo->err, &currentOp->inkeys,
&currentOp->outkeys, &currentOp->runkey,
&timeout) == REDISMODULE_ERR) {
&currentOp->outkeys, &timeout) == REDISMODULE_ERR) {
goto cleanup;
}

Expand Down Expand Up @@ -305,7 +302,6 @@ int ModelSetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return ret;
}

// TODO: if backend loaded, make sure there's a queue
if (!RunQueue_IsExists(devicestr)) {
RunQueueInfo *run_queue_info = RunQueue_Create(devicestr);
if (run_queue_info == NULL) {
Expand All @@ -332,12 +328,12 @@ int ModelSetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {

RedisModule_ModuleTypeSetValue(key, RAI_ModelRedisType(), model);

model->infokey = RAI_AddStatsEntry(ctx, keystr, RAI_MODEL, backend, devicestr, tag);
RAI_RunStats *stats = RAI_StatsCreate(keystr, RAI_MODEL, backend, devicestr, tag);
RAI_StatsStoreEntry(keystr, stats);
model->info = stats;

RedisModule_CloseKey(key);

RedisModule_ReplyWithSimpleString(ctx, "OK");

RedisModule_ReplicateVerbatim(ctx);

return REDISMODULE_OK;
Expand Down Expand Up @@ -423,12 +419,12 @@ int ScriptSetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {

RedisModule_ModuleTypeSetValue(key, RAI_ScriptRedisType(), script);

script->infokey = RAI_AddStatsEntry(ctx, keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag);
RAI_RunStats *stats = RAI_StatsCreate(keystr, RAI_SCRIPT, RAI_BACKEND_TORCH, devicestr, tag);
RAI_StatsStoreEntry(keystr, stats);
script->info = stats;

RedisModule_CloseKey(key);

RedisModule_ReplyWithSimpleString(ctx, "OK");

RedisModule_ReplicateVerbatim(ctx);

return REDISMODULE_OK;
Expand Down Expand Up @@ -536,7 +532,6 @@ int ParseScriptRunCommand(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, RedisMod
if (!script) {
goto cleanup;
}
RAI_DagOpSetRunKey(currentOp, RAI_HoldString(argv[1]));

const char *func_name = ScriptCommand_GetFunctionName(argv[2]);
if (!func_name) {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/parsing/deprecated.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* @brief Parse and validate MODELRUN command: create a modelRunCtx based on the model obtained
* from the key space and save it in the op. The keys of the input and output tensors are stored in
* the op's inkeys and outkeys arrays, the model key is saved in op's runkey, and the given timeout
* the op's inkeys and outkeys arrays, and the given timeout
* is saved as well (if given, otherwise it is zero).
* @return Returns REDISMODULE_OK if the command is valid, REDISMODULE_ERR otherwise.
*/
Expand Down
Loading