Skip to content

Dag llapi #556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ ADD_LIBRARY(redisai_obj OBJECT
config.c
DAG/dag.c
DAG/dag_parser.c
DAG/dag_builder.c
DAG/dag_execute.c
modelRun_ctx.c
backends.c
backends/util.c
Expand Down
346 changes: 137 additions & 209 deletions src/DAG/dag.c

Large diffs are not rendered by default.

188 changes: 188 additions & 0 deletions src/DAG/dag_builder.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
#include "dag_builder.h"
#include "run_info.h"
#include "dag_parser.h"
#include "string_utils.h"
#include "modelRun_ctx.h"

// Store the given arguments from the string in argv array and their amount in argc.
int _StringToRMArray(const char *dag, RedisModuleString ***argv, int *argc, RAI_Error *err) {

char dag_string[strlen(dag) + 1];
strcpy(dag_string, dag);

char *token = strtok(dag_string, " ");
if (strcmp(token, "|>") != 0) {
RAI_SetError(err, RAI_EDAGBUILDER, "DAG op should start with: '|>' ");
return REDISMODULE_ERR;
}

while (token != NULL) {
RedisModuleString *RS_token = RedisModule_CreateString(NULL, token, strlen(token));
*argv = array_append(*argv, RS_token);
(*argc)++;
token = strtok(NULL, " ");
}
return REDISMODULE_OK;
}

int RAI_DAGLoadTensor(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Tensor *tensor) {

RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)run_info;
RedisModuleString *key_name = RedisModule_CreateString(NULL, t_name, strlen(t_name));
// Add the tensor under its "mangled" key name to the DAG local context dict.
char buf[16];
sprintf(buf, "%04d", 1);
RedisModule_StringAppendBuffer(NULL, key_name, buf, strlen(buf));
AI_dictAdd(rinfo->dagTensorsContext, (void *)key_name,
(void *)RAI_TensorGetShallowCopy(tensor));
RedisModule_FreeString(NULL, key_name);

return REDISMODULE_OK;
}

RAI_DAGRunCtx *RAI_DAGRunCtxCreate(void) {
RedisAI_RunInfo *rinfo;
RAI_InitRunInfo(&rinfo);
return (RAI_DAGRunCtx *)rinfo;
}

RAI_DAGRunOp *RAI_DAGCreateModelRunOp(RAI_Model *model) {
RAI_ModelRunCtx *mctx = RAI_ModelRunCtxCreate(model);
RAI_DagOp *op;
RAI_InitDagOp(&op);

op->commandType = REDISAI_DAG_CMD_MODELRUN;
op->mctx = mctx;
op->devicestr = model->devicestr;
op->runkey = RAI_HoldString(NULL, (RedisModuleString *)model->infokey);
return (RAI_DAGRunOp *)op;
}

RAI_DAGRunOp *RAI_DAGCreateScriptRunOp(RAI_Script *script, const char *func_name) {
RAI_ScriptRunCtx *sctx = RAI_ScriptRunCtxCreate(script, func_name);
RAI_DagOp *op;
RAI_InitDagOp(&op);

op->commandType = REDISAI_DAG_CMD_SCRIPTRUN;
op->sctx = sctx;
op->devicestr = script->devicestr;
op->runkey = RAI_HoldString(NULL, (RedisModuleString *)script->infokey);
return (RAI_DAGRunOp *)op;
}

int RAI_DAGRunOpAddInput(RAI_DAGRunOp *DAGOp, const char *input) {
RAI_DagOp *op = (RAI_DagOp *)DAGOp;
RedisModuleString *inkey = RedisModule_CreateString(NULL, input, strlen(input));
op->inkeys = array_append(op->inkeys, inkey);
return REDISMODULE_OK;
}

int RAI_DAGRunOpAddOutput(RAI_DAGRunOp *DAGOp, const char *output) {
RAI_DagOp *op = (RAI_DagOp *)DAGOp;
RedisModuleString *outkey = RedisModule_CreateString(NULL, output, strlen(output));
op->outkeys = array_append(op->outkeys, outkey);
return REDISMODULE_OK;
}

int RAI_DAGAddRunOp(RAI_DAGRunCtx *run_info, RAI_DAGRunOp *DAGop, RAI_Error *err) {

RAI_DagOp *op = (RAI_DagOp *)DAGop;
RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)run_info;
if (op->mctx) {
RAI_Model *model = op->mctx->model;
if (ModelGetNumInputs(model) != array_len(op->inkeys)) {
RAI_SetError(err, RAI_EDAGBUILDER,
"Number of keys given as INPUTS does not match model definition");
return REDISMODULE_ERR;
}
if (ModelGetNumOutputs(model) != array_len(op->outkeys)) {
RAI_SetError(err, RAI_EDAGBUILDER,
"Number of keys given as OUTPUTS does not match model definition");
return REDISMODULE_ERR;
}
}
rinfo->dagOps = array_append(rinfo->dagOps, op);

return REDISMODULE_OK;
}

int RAI_DAGAddTensorGet(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Error *err) {

RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)run_info;
RAI_DagOp *op;
RAI_InitDagOp(&op);
rinfo->dagOps = array_append(rinfo->dagOps, op);
op->commandType = REDISAI_DAG_CMD_TENSORGET;
op->devicestr = "CPU";
RedisModuleString *name = RedisModule_CreateString(NULL, t_name, strlen(t_name));
op->inkeys = array_append(op->inkeys, name);
return REDISMODULE_OK;
}

int RAI_DAGAddTensorSet(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Tensor *tensor) {

RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)run_info;
RAI_DagOp *op;
RAI_InitDagOp(&op);
rinfo->dagOps = array_append(rinfo->dagOps, op);
op->commandType = REDISAI_DAG_CMD_TENSORSET;
op->devicestr = "CPU";
RedisModuleString *name = RedisModule_CreateString(NULL, t_name, strlen(t_name));
op->outkeys = array_append(op->outkeys, name);
op->outTensor = RAI_TensorGetShallowCopy(tensor);
return REDISMODULE_OK;
}

int RAI_DAGAddOpsFromString(RAI_DAGRunCtx *run_info, const char *dag, RAI_Error *err) {

int res = REDISMODULE_ERR;
RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)run_info;
array_new_on_stack(RAI_DagOp *, 10, new_ops);
array_new_on_stack(RedisModuleString *, 100, argv);
int argc = 0;
if (_StringToRMArray(dag, &argv, &argc, err) != REDISMODULE_OK) {
goto cleanup;
}

RAI_DagOp *op;
for (size_t i = 0; i < argc; i++) {
const char *arg_string = RedisModule_StringPtrLen(argv[i], NULL);
if (strcmp(arg_string, "|>") == 0 && i < argc - 1) {
RAI_InitDagOp(&op);
new_ops = array_append(new_ops, op);
op->argv = &argv[i + 1];
} else {
op->argc++;
}
}

if (ParseDAGOps(rinfo, new_ops) != REDISMODULE_OK) {
RAI_SetError(err, RAI_GetErrorCode(rinfo->err), RAI_GetError(rinfo->err));
goto cleanup;
}
rinfo->dagOpCount = array_len(rinfo->dagOps);
res = REDISMODULE_OK;

cleanup:
array_free(new_ops);
for (size_t i = 0; i < argc; i++) {
RedisModule_FreeString(NULL, argv[i]);
}
array_free(argv);
return res;
}

size_t RAI_DAGNumOps(RAI_DAGRunCtx *run_info) {
RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)run_info;
return array_len(rinfo->dagOps);
}

void RAI_DAGRunOpFree(RAI_DAGRunOp *dagOp) {
RAI_DagOp *op = (RAI_DagOp *)dagOp;
RAI_FreeDagOp(op);
}

void RAI_DAGFree(RAI_DAGRunCtx *run_info) {
RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)run_info;
RAI_FreeRunInfo(rinfo);
}
92 changes: 92 additions & 0 deletions src/DAG/dag_builder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#pragma once

#include "redisai.h"

/**
* @brief Create a new empty DAG runInfo object.
*/
RAI_DAGRunCtx *RAI_DAGRunCtxCreate(void);

/**
* @brief Create a new MODELRUN op for a DAG.
* @param model The model to run.
*/
RAI_DAGRunOp *RAI_DAGCreateModelRunOp(RAI_Model *model);

/**
* @brief Create a new SCRIPTRUN op for a DAG.
* @param script The script to run.
* @param func_name The specific function to run in the given script.
*/
RAI_DAGRunOp *RAI_DAGCreateScriptRunOp(RAI_Script *script, const char *func_name);

/**
* @brief Add an input key to a DAG run op (before inserting it to the DAG).
* @param DAGop The DAG run op (MODELRUN / SCRIPTRUN).
* @param input The tensor input name (this name should appear in a previous op of the DAG).
*/
int RAI_DAGRunOpAddInput(RAI_DAGRunOp *DAGOp, const char *input);

/**
* @brief Add an output key to a DAG run op (before inserting it to the DAG).
* @param DAGop The DAG run op (MODELRUN / SCRIPTRUN).
* @param output The tensor output name (this name may appear in one of the following ops of the
* DAG).
*/
int RAI_DAGRunOpAddOutput(RAI_DAGRunOp *DAGOp, const char *output);

/**
* @brief Add a run op (MODELRUN/SCRIPTRUN) to a DAG.
* @param runInfo The DAG to insert the op to.
* @param DAGop The DAG run op (MODELRUN / SCRIPTRUN).
* @param err Error is returned in case of a MODELRUN op if the number of inputs and outputs
* given to the op does not match to the number of inputs and outputs in the model definition.
*/
int RAI_DAGAddRunOp(RAI_DAGRunCtx *run_info, RAI_DAGRunOp *DAGop, RAI_Error *err);

/**
* @brief Load a given tensor to the DAG local context.
* @param runInfo The DAG to load the tensor into.
* @param tname The tensor key.
* @param tensor The tensor to load to the DAG (we load a shallow copy).
*/
int RAI_DAGLoadTensor(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Tensor *tensor);

/**
* @brief Append a TENSORSET op to a DAG (can use to load an intermediate tensors)
* @param runInfo The DAG to append this op into.
* @param tensor The tensor to set.
*/
int RAI_DAGAddTensorSet(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Tensor *tensor);

/**
* @brief Append a TENSORGET op to a DAG (can use to output intermediate and final tensors)
* @param runInfo The DAG to append this op into.
* @param tensor The tensor to set.
*/
int RAI_DAGAddTensorGet(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Error *err);

/**
* @brief Add ops to a DAG from string (according to the command syntax). In case of a valid
* string, the ops are added to the DAG run info, and otherwise all the ops are discarded.
* @param runInfo The DAG to insert the ops into.
* @param dag The string representing the DAG ops to add.
* @param err Error is returned in case of a MODELRUN op if the number of inputs and outputs
* given to the op does not match to the number of inputs and outputs in the model definition.
*/
int RAI_DAGAddOpsFromString(RAI_DAGRunCtx *run_info, const char *dag, RAI_Error *err);

/**
* @brief Returns the number of ops in a DAG.
*/
size_t RAI_DAGNumOps(RAI_DAGRunCtx *run_info);

/**
* @brief Free DAG's runInfo and all its internal ops.
*/
void RAI_DAGFree(RAI_DAGRunCtx *run_info);

/**
* @brief Free a specific DAG run op (MODELRUN/SCRIPTRUN).
*/
void RAI_DAGRunOpFree(RAI_DAGRunOp *dagOp);
Loading