diff --git a/src/graph.c b/src/graph.c index 509b60380..53f8ed6e1 100644 --- a/src/graph.c +++ b/src/graph.c @@ -1,7 +1,28 @@ #include "graph.h" +#include "utils/arr_rm_alloc.h" RedisModuleType *RedisDL_GraphType = NULL; +typedef struct RDL_Graph{ + TF_Graph* graph; + // TODO: use session pool? The ideal would be to use one session per client. + // If a client disconnects, we dispose the session or reuse it for + // another client. + void *session; + size_t refCount; +}RDL_Graph; + +typedef struct RDL_GraphCtxParam{ + TF_Output name; + RDL_Tensor* tensor; +}RDL_GraphCtxParam; + +typedef struct RDL_GraphRunCtx{ + RDL_Graph* graph; + RDL_GraphCtxParam* inputs; + RDL_GraphCtxParam* outputs; +}RDL_GraphRunCtx; + static void* Graph_RdbLoad(struct RedisModuleIO *io, int encver){ //todo return NULL; @@ -12,10 +33,10 @@ static void Graph_RdbSave(RedisModuleIO *rdb, void *value){ } static void Graph_DTFree(void *value){ - Graph_Free(value); + RDL_GraphFree(value); } -int Graph_Init(RedisModuleCtx* ctx){ +int RDL_GraphInit(RedisModuleCtx* ctx){ RedisModuleTypeMethods tmGraph = { .version = REDISMODULE_TYPE_METHOD_VERSION, .rdb_load = Graph_RdbLoad, @@ -30,7 +51,7 @@ int Graph_Init(RedisModuleCtx* ctx){ return RedisDL_GraphType != NULL; } -RDL_Graph* Graph_Create(const char* prefix, const char* graphdef, size_t graphlen){ +RDL_Graph* RDL_GraphCreate(const char* prefix, const char* graphdef, size_t graphlen){ TF_Graph* graph = TF_NewGraph(); TF_ImportGraphDefOptions* options = TF_NewImportGraphDefOptions(); @@ -74,7 +95,10 @@ RDL_Graph* Graph_Create(const char* prefix, const char* graphdef, size_t graphle return ret; } -void Graph_Free(RDL_Graph* graph){ +void RDL_GraphFree(RDL_Graph* graph){ + if(--graph->refCount > 0){ + return; + } TF_Status *status = TF_NewStatus(); TF_CloseSession(graph->session, status); @@ -100,3 +124,102 @@ void Graph_Free(RDL_Graph* graph){ RedisModule_Free(graph); } + +RDL_GraphRunCtx* RDL_RunCtxCreate(RDL_Graph* graph){ +#define PARAM_INITIAL_SIZE 10 + RDL_GraphRunCtx* gctx = RedisModule_Alloc(sizeof(*gctx)); + gctx->graph = RDL_GraphGetShallowCopy(graph); + gctx->inputs = array_new(RDL_GraphCtxParam, PARAM_INITIAL_SIZE); + gctx->outputs = array_new(RDL_GraphCtxParam, PARAM_INITIAL_SIZE); + return gctx; +} + +static int Graph_RunCtxAddParam(RDL_GraphRunCtx* gctx, RDL_GraphCtxParam* paramArr, const char* name, RDL_Tensor* tensor){ + TF_Output port; + port.oper = TF_GraphOperationByName(gctx->graph->graph, name); + port.index = 0; + if(port.oper == NULL){ + return 0; + } + RDL_GraphCtxParam param = { + .name = port, + .tensor = tensor ? RDL_TensorGetShallowCopy(tensor): NULL, + }; + paramArr = array_append(paramArr, param); + return 1; +} + +int RDL_RunCtxAddInput(RDL_GraphRunCtx* gctx, const char* inputName, RDL_Tensor* inputTensor){ + return Graph_RunCtxAddParam(gctx, gctx->inputs, inputName, inputTensor); +} + +int RDL_RunCtxAddOutput(RDL_GraphRunCtx* gctx, const char* outputName){ + return Graph_RunCtxAddParam(gctx, gctx->outputs, outputName, NULL); +} + +size_t RDL_RunCtxNumOutputs(RDL_GraphRunCtx* gctx){ + return array_len(gctx->outputs); +} + +RDL_Tensor* RDL_RunCtxOutputTensor(RDL_GraphRunCtx* gctx, size_t index){ + assert(RDL_RunCtxNumOutputs(gctx) > index && index >= 0); + return gctx->outputs[index].tensor; +} + +void RDL_RunCtxFree(RDL_GraphRunCtx* gctx){ + for(size_t i = 0 ; i < array_len(gctx->inputs) ; ++i){ + RDL_TensorFree(gctx->inputs[i].tensor); + } + array_free(gctx->inputs); + + for(size_t i = 0 ; i < array_len(gctx->outputs) ; ++i){ + if(gctx->outputs[i].tensor){ + RDL_TensorFree(gctx->outputs[i].tensor); + } + } + array_free(gctx->outputs); + + RDL_GraphFree(gctx->graph); +} + +int RDL_GraphRun(RDL_GraphRunCtx* gctx){ + TF_Status *status = TF_NewStatus(); + + TF_Tensor* inputTensorsValues[array_len(gctx->inputs)]; + TF_Output inputs[array_len(gctx->inputs)]; + TF_Tensor* outputTensorsValues[array_len(gctx->outputs)]; + TF_Output outputs[array_len(gctx->outputs)]; + + for(size_t i = 0 ; i < array_len(gctx->inputs) ; ++i){ + inputTensorsValues[i] = RDL_TensorGetTensor(gctx->inputs[i].tensor); + inputs[i] = gctx->inputs[i].name; + } + + for(size_t i = 0 ; i < array_len(gctx->outputs) ; ++i){ + outputs[i] = gctx->outputs[i].name; + } + + TF_SessionRun(gctx->graph->session, NULL /* run_options */, + inputs, inputTensorsValues, array_len(gctx->inputs), + outputs, outputTensorsValues, array_len(gctx->outputs), + NULL /* target_opers */, 0 /* ntargets */, + NULL /* run_Metadata */, + status); + + if (TF_GetCode(status) != TF_OK) { + TF_DeleteStatus(status); + return 0; + } + + for(size_t i = 0 ; i < array_len(gctx->outputs) ; ++i){ + gctx->outputs[i].tensor = RDL_TensorCreateFromTensor(outputTensorsValues[i]); + } + + TF_DeleteStatus(status); + return 1; +} + +RDL_Graph* RDL_GraphGetShallowCopy(RDL_Graph* graph){ + ++graph->refCount; + return graph; +} diff --git a/src/graph.h b/src/graph.h index aa87db493..86ea7a9db 100644 --- a/src/graph.h +++ b/src/graph.h @@ -10,21 +10,21 @@ #include "tensorflow/c/c_api.h" #include "redismodule.h" - -typedef struct RDL_Graph{ - TF_Graph* graph; - // TODO: use session pool? The ideal would be to use one session per client. - // If a client disconnects, we dispose the session or reuse it for - // another client. - void *session; - size_t refCount; -}RDL_Graph; +#include "tensor.h" extern RedisModuleType *RedisDL_GraphType; -int Graph_Init(RedisModuleCtx* ctx); -RDL_Graph* Graph_Create(const char* prefix, const char* graphdef, size_t graphlen); -void Graph_Free(RDL_Graph* graph); +int RDL_GraphInit(RedisModuleCtx* ctx); +RDL_Graph* RDL_GraphCreate(const char* prefix, const char* graphdef, size_t graphlen); +void RDL_GraphFree(RDL_Graph* graph); +RDL_GraphRunCtx* RDL_RunCtxCreate(RDL_Graph* graph); +int RDL_RunCtxAddInput(RDL_GraphRunCtx* gctx, const char* inputName, RDL_Tensor* inputTensor); +int RDL_RunCtxAddOutput(RDL_GraphRunCtx* gctx, const char* outputName); +size_t RDL_RunCtxNumOutputs(RDL_GraphRunCtx* gctx); +RDL_Tensor* RDL_RunCtxOutputTensor(RDL_GraphRunCtx* gctx, size_t index); +void RDL_RunCtxFree(RDL_GraphRunCtx* gctx); +int RDL_GraphRun(RDL_GraphRunCtx* gctx); +RDL_Graph* RDL_GraphGetShallowCopy(RDL_Graph* graph); diff --git a/src/redisdl.c b/src/redisdl.c index c1edb0fd0..cccbf847e 100644 --- a/src/redisdl.c +++ b/src/redisdl.c @@ -93,20 +93,6 @@ mstime_t mstime(void) { return ustime()/1000; } -TF_Tensor* RedisDL_clone(RedisModuleCtx *ctx, const TF_Tensor *tensor) { - int ndims = TF_NumDims(tensor); - long long *dims = RedisModule_PoolAlloc(ctx, ndims * sizeof(long long)); - for (int j=0; jtensor); + long long ndims = RDL_TensorNumDims(t); RedisModule_CloseKey(key); @@ -327,11 +313,11 @@ int RedisDL_TShape_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, i RDL_Tensor *t = RedisModule_ModuleTypeGetValue(key); - long long ndims = TF_NumDims(t->tensor); + long long ndims = RDL_TensorNumDims(t); RedisModule_ReplyWithArray(ctx, ndims); for (long long i=0; itensor, i); + long long dim = RDL_TensorDim(t, i); RedisModule_ReplyWithLongLong(ctx, dim); } @@ -355,7 +341,7 @@ int RedisDL_TByteSize_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv RDL_Tensor *t = RedisModule_ModuleTypeGetValue(key); - long long size = TF_TensorByteSize(t->tensor); + long long size = RDL_TensorByteSize(t); RedisModule_CloseKey(key); @@ -389,8 +375,8 @@ int RedisDL_TGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int RDL_Tensor *t = RedisModule_ModuleTypeGetValue(key); if (datafmt == REDISDL_DATA_BLOB) { - long long size = TF_TensorByteSize(t->tensor); - char *data = TF_TensorData(t->tensor); + long long size = RDL_TensorByteSize(t); + char *data = RDL_TensorData(t); int ret = RedisModule_ReplyWithStringBuffer(ctx, data, size); @@ -400,21 +386,21 @@ int RedisDL_TGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int } } else { // datafmt == REDISDL_DATA_VALUES - long long ndims = TF_NumDims(t->tensor); + long long ndims = RDL_TensorNumDims(t); long long len = 1; long long i; for (i=0; itensor, i); + len *= RDL_TensorDim(t, i); } - TF_DataType datatype = TF_TensorType(t->tensor); + TF_DataType datatype = RDL_TensorDataType(t); RedisModule_ReplyWithArray(ctx, len); if (datatype == TF_FLOAT || datatype == TF_DOUBLE) { double val; for (i=0; iinputs); - for (int i=0; ininputs; i++) { - TF_DeleteTensor(rinfo->input_values[i]); +void RedisDL_FreeRunInfo(RedisModuleCtx *ctx, struct RedisDL_RunInfo *rinfo) { + for(int i = 0 ; i < RDL_RunCtxNumOutputs(rinfo->gctx) ; ++i){ + RedisModule_FreeString(ctx, rinfo->outkeys[i]); } - RedisModule_Free(rinfo->input_values); - RedisModule_Free(rinfo->outputs); - RedisModule_Free(rinfo->output_values); RedisModule_Free(rinfo->outkeys); - TF_DeleteStatus(rinfo->status); + + RDL_RunCtxFree(rinfo->gctx); + RedisModule_Free(rinfo); } @@ -525,14 +502,8 @@ void *RedisDL_RunSession(void *arg) { mstime_t start = mstime(); - TF_Status *status = TF_NewStatus(); - TF_SessionRun(rinfo->session, NULL /* run_options */, - rinfo->inputs, rinfo->input_values, rinfo->ninputs, - rinfo->outputs, rinfo->output_values, rinfo->noutputs, - NULL /* target_opers */, 0 /* ntargets */, - NULL /* run_Metadata */, - rinfo->status); + rinfo->status = RDL_GraphRun(rinfo->gctx); mstime_t end = mstime(); @@ -540,9 +511,10 @@ void *RedisDL_RunSession(void *arg) { RedisModule_Log(ctx, "notice", "TF_SessionRun took %fs", (end - start) / 1000.0); RedisModule_ThreadSafeContextUnlock(ctx); - RedisModule_FreeThreadSafeContext(ctx); RedisModule_UnblockClient(rinfo->client, rinfo); + RedisModule_FreeThreadSafeContext(ctx); + return NULL; } @@ -557,31 +529,33 @@ int RedisDL_Run_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argc); struct RedisDL_RunInfo *rinfo = RedisModule_GetBlockedClientPrivateData(ctx); - if (TF_GetCode(rinfo->status) != TF_OK) { - int ret = RedisModule_ReplyWithError(ctx, TF_Message(rinfo->status)); - RedisDL_FreeRunInfo(rinfo); + if (!rinfo->status) { + int ret = RedisModule_ReplyWithError(ctx, "graph run failed"); + RedisDL_FreeRunInfo(ctx, rinfo); return ret; } - for (int i=0; inoutputs; i++) { + for (size_t i=0; igctx); ++i){ RedisModuleKey *outkey = RedisModule_OpenKey(ctx, rinfo->outkeys[i], REDISMODULE_READ|REDISMODULE_WRITE); int type = RedisModule_KeyType(outkey); if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(outkey) != RedisDL_TensorType) { RedisModule_CloseKey(outkey); - RedisDL_FreeRunInfo(rinfo); + RedisDL_FreeRunInfo(ctx, rinfo); return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); } - RDL_Tensor *t = Tensor_CreateFromTensor(rinfo->output_values[i]); - RedisModule_ModuleTypeSetValue(outkey, RedisDL_TensorType, t); + RDL_Tensor *t = RDL_RunCtxOutputTensor(rinfo->gctx, i); + if(t){ + RedisModule_ModuleTypeSetValue(outkey, RedisDL_TensorType, RDL_TensorGetShallowCopy(t)); + } RedisModule_CloseKey(outkey); } // FIXME This crashes Redis, we need to investigate. //RedisModule_CloseKey(rinfo->graphkey); - RedisDL_FreeRunInfo(rinfo); + RedisDL_FreeRunInfo(ctx, rinfo); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } @@ -610,9 +584,6 @@ int RedisDL_GRun_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int RDL_Graph *gto = RedisModule_ModuleTypeGetValue(key); - TF_Graph* graph = gto->graph; - TF_Session* session = gto->session; - long long ninputs; if ((RedisModule_StringToLongLong(argv[2], &ninputs) != REDISMODULE_OK) || ninputs < 0) { @@ -634,64 +605,42 @@ int RedisDL_GRun_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int return RedisModule_ReplyWithError(ctx, "ERR odd key/name pairs"); } - TF_Output *inputs = RedisModule_Alloc(ninputs*sizeof(TF_Output)); - TF_Output *outputs = RedisModule_Alloc(noutputs*sizeof(TF_Output)); - TF_Tensor **input_values = RedisModule_Alloc(ninputs*sizeof(TF_Tensor*)); - TF_Tensor **output_values = RedisModule_Alloc(noutputs*sizeof(TF_Tensor*)); + struct RedisDL_RunInfo *rinfo = RedisModule_Alloc(sizeof(struct RedisDL_RunInfo)); + rinfo->gctx = RDL_RunCtxCreate(gto); - RedisModuleString **outkeys = RedisModule_Alloc(noutputs*sizeof(RedisModuleString*)); + rinfo->outkeys = RedisModule_Alloc(noutputs*sizeof(RedisModuleString*)); for (int i=pairoffset; itensor); RedisModule_CloseKey(argkey); - const char* opname = RedisModule_StringPtrLen(argname, &namelen); - // RedisModule_Log(ctx, "warning", "%s", opname); - TF_Output port; - port.oper = TF_GraphOperationByName(graph, opname); - port.index = 0; - if (port.oper == NULL) { + const char* opname = RedisModule_StringPtrLen(argname, NULL); + if(!RDL_RunCtxAddInput(rinfo->gctx, opname, t)){ + // todo free rinfo return RedisModule_ReplyWithError(ctx, "Input key not found."); } - inputs[(i-pairoffset)/2] = port; } else { - const char* opname = RedisModule_StringPtrLen(argname, &namelen); - TF_Output port; - port.oper = TF_GraphOperationByName(graph, opname); - port.index = 0; - if (port.oper == NULL) { + const char* opname = RedisModule_StringPtrLen(argname, NULL); + if(!RDL_RunCtxAddOutput(rinfo->gctx, opname)){ + // todo free rinfo return RedisModule_ReplyWithError(ctx, "Output key not found."); } - outputs[(i-pairoffset)/2-ninputs] = port; - outkeys[(i-pairoffset)/2-ninputs] = argv[i]; + RedisModule_RetainString(ctx, argv[i]); + rinfo->outkeys[(i-pairoffset)/2-ninputs] = argv[i]; } } - RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, RedisDL_Run_Reply, NULL, NULL, 0); - - struct RedisDL_RunInfo *rinfo = RedisModule_Alloc(sizeof(struct RedisDL_RunInfo)); - rinfo->client = bc; - rinfo->session = session; - rinfo->inputs = inputs; - rinfo->input_values = input_values; - rinfo->ninputs = ninputs; - rinfo->outputs = outputs; - rinfo->output_values = output_values; - rinfo->noutputs = noutputs; - rinfo->graphkey = key; - rinfo->outkeys = outkeys; - rinfo->status = TF_NewStatus(); + rinfo->client = RedisModule_BlockClient(ctx, RedisDL_Run_Reply, NULL, NULL, 0); // RedisModule_AbortBlock(bc); // return RedisModule_ReplyWithError(ctx, "-ERR Can't start thread"); @@ -733,17 +682,62 @@ int RedisDL_StartRunThread() { return REDISMODULE_OK; } +#define EXECUTION_PLAN_FREE_MSG 100 + +#define REGISTER_API(name, registerApiCallback) \ + if(registerApiCallback("RedisDL_" #name, RDL_ ## name)){\ + printf("could not register RedisDL_" #name "\r\n");\ + return false;\ + } + +static bool RediDL_RegisterApi(int (*registerApiCallback)(const char *funcname, void *funcptr)){ + REGISTER_API(TensorCreate, registerApiCallback); + REGISTER_API(TensorCreateFromTensor, registerApiCallback); + REGISTER_API(TensorGetDataSize, registerApiCallback); + REGISTER_API(TensorDataType, registerApiCallback); + REGISTER_API(TensorFree, registerApiCallback); + REGISTER_API(TensorSetData, registerApiCallback); + REGISTER_API(TensorSetValueFromLongLong, registerApiCallback); + REGISTER_API(TensorSetValueFromDouble, registerApiCallback); + REGISTER_API(TensorGetValueAsDouble, registerApiCallback); + REGISTER_API(TensorGetValueAsLongLong, registerApiCallback); + REGISTER_API(TensorGetShallowCopy, registerApiCallback); + REGISTER_API(TensorNumDims, registerApiCallback); + REGISTER_API(TensorDim, registerApiCallback); + REGISTER_API(TensorByteSize, registerApiCallback); + REGISTER_API(TensorData, registerApiCallback); + + REGISTER_API(GraphCreate, registerApiCallback); + REGISTER_API(GraphFree, registerApiCallback); + REGISTER_API(RunCtxCreate, registerApiCallback); + REGISTER_API(RunCtxAddInput, registerApiCallback); + REGISTER_API(RunCtxAddOutput, registerApiCallback); + REGISTER_API(RunCtxNumOutputs, registerApiCallback); + REGISTER_API(RunCtxOutputTensor, registerApiCallback); + REGISTER_API(RunCtxFree, registerApiCallback); + REGISTER_API(GraphRun, registerApiCallback); + REGISTER_API(GraphGetShallowCopy, registerApiCallback); + return true; +} + +int moduleRegisterApi(const char *funcname, void *funcptr); + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (RedisModule_Init(ctx, "dl", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if(!Tensor_Init(ctx)){ + if(!RediDL_RegisterApi(moduleRegisterApi)){ + RedisModule_Log(ctx, "warning", "could not register RedisDL api\r\n"); + return REDISMODULE_ERR; + } + + if(!RDL_TensorInit(ctx)){ RedisModule_Log(ctx, "warning", "can not initialize tensor dt"); return REDISMODULE_ERR; } - if(!Graph_Init(ctx)){ + if(!RDL_GraphInit(ctx)){ RedisModule_Log(ctx, "warning", "can not initialize tensor dt"); return REDISMODULE_ERR; } diff --git a/src/redisdl.h b/src/redisdl.h new file mode 100644 index 000000000..f86cdba3a --- /dev/null +++ b/src/redisdl.h @@ -0,0 +1,86 @@ +/* + * redisdl.h + * + * Created on: 2 Dec 2018 + * Author: root + */ + +#ifndef SRC_REDISDL_H_ +#define SRC_REDISDL_H_ + +#include +#include "redismodule.h" + +#define MODULE_API_FUNC(x) (*x) + +typedef struct RDL_Tensor RDL_Tensor; + +typedef struct RDL_Graph RDL_Graph; + +typedef struct RDL_GraphRunCtx RDL_GraphRunCtx; + +RDL_Tensor* MODULE_API_FUNC(RedisDL_TensorCreate)(const char* dataTypeStr, long long* dims,int ndims); +RDL_Tensor* MODULE_API_FUNC(RedisDL_TensorCreateFromTensor)(TF_Tensor *tensor); +size_t MODULE_API_FUNC(RedisDL_TensorGetDataSize)(const char* dataTypeStr); +TF_DataType MODULE_API_FUNC(RedisDL_TensorDataType)(RDL_Tensor* t); +void MODULE_API_FUNC(RedisDL_TensorFree)(RDL_Tensor* t); +int MODULE_API_FUNC(RedisDL_TensorSetData)(RDL_Tensor* tensor, const char* data, size_t len); +int MODULE_API_FUNC(RedisDL_TensorSetValueFromLongLong)(RDL_Tensor* tensor, long long i, long long val); +int MODULE_API_FUNC(RedisDL_TensorSetValueFromDouble)(RDL_Tensor* tensor, long long i, double val); +int MODULE_API_FUNC(RedisDL_TensorGetValueAsDouble)(RDL_Tensor* t, long long i, double* val); +int MODULE_API_FUNC(RedisDL_TensorGetValueAsLongLong)(RDL_Tensor* t, long long i, long long* val); +RDL_Tensor* MODULE_API_FUNC(RedisDL_TensorGetShallowCopy)(RDL_Tensor* t); +int MODULE_API_FUNC(RedisDL_TensorNumDims)(RDL_Tensor* t); +long long MODULE_API_FUNC(RedisDL_TensorDim)(RDL_Tensor* t, int dim); +size_t MODULE_API_FUNC(RedisDL_TensorByteSize)(RDL_Tensor* t); +char* MODULE_API_FUNC(RedisDL_TensorData)(RDL_Tensor* t); + +RDL_Graph* MODULE_API_FUNC(RedisDL_GraphCreate)(const char* prefix, const char* graphdef, size_t graphlen); +void MODULE_API_FUNC(RedisDL_GraphFree)(RDL_Graph* graph); +RDL_GraphRunCtx* MODULE_API_FUNC(RedisDL_RunCtxCreate)(RDL_Graph* graph); +int MODULE_API_FUNC(RedisDL_RunCtxAddInput)(RDL_GraphRunCtx* gctx, const char* inputName, RDL_Tensor* inputTensor); +int MODULE_API_FUNC(RedisDL_RunCtxAddOutput)(RDL_GraphRunCtx* gctx, const char* outputName); +size_t MODULE_API_FUNC(RedisDL_RunCtxNumOutputs)(RDL_GraphRunCtx* gctx); +RDL_Tensor* MODULE_API_FUNC(RedisDL_RunCtxOutputTensor)(RDL_GraphRunCtx* gctx, size_t index); +void MODULE_API_FUNC(RedisDL_RunCtxFree)(RDL_GraphRunCtx* gctx); +int MODULE_API_FUNC(RedisDL_GraphRun)(RDL_GraphRunCtx* gctx); +RDL_Graph* MODULE_API_FUNC(RedisDL_GraphGetShallowCopy)(RDL_Graph* graph); + +#define REDIDL_MODULE_INIT_FUNCTION(name) \ + if (RedisModule_GetApi("RedisDL_" #name, ((void **)&RedisDL_ ## name))) { \ + printf("could not initialize RedisDL_" #name "\r\n");\ + return false; \ + } + +static bool RediDL_Initialize(){ + REDIDL_MODULE_INIT_FUNCTION(TensorCreate); + REDIDL_MODULE_INIT_FUNCTION(TensorCreateFromTensor); + REDIDL_MODULE_INIT_FUNCTION(TensorGetDataSize); + REDIDL_MODULE_INIT_FUNCTION(TensorDataType); + REDIDL_MODULE_INIT_FUNCTION(TensorFree); + REDIDL_MODULE_INIT_FUNCTION(TensorSetData); + REDIDL_MODULE_INIT_FUNCTION(TensorSetValueFromLongLong); + REDIDL_MODULE_INIT_FUNCTION(TensorSetValueFromDouble); + REDIDL_MODULE_INIT_FUNCTION(TensorGetValueAsDouble); + REDIDL_MODULE_INIT_FUNCTION(TensorGetValueAsLongLong); + REDIDL_MODULE_INIT_FUNCTION(TensorGetShallowCopy); + REDIDL_MODULE_INIT_FUNCTION(TensorNumDims); + REDIDL_MODULE_INIT_FUNCTION(TensorDim); + REDIDL_MODULE_INIT_FUNCTION(TensorByteSize); + REDIDL_MODULE_INIT_FUNCTION(TensorData); + + REDIDL_MODULE_INIT_FUNCTION(GraphCreate); + REDIDL_MODULE_INIT_FUNCTION(GraphFree); + REDIDL_MODULE_INIT_FUNCTION(RunCtxCreate); + REDIDL_MODULE_INIT_FUNCTION(RunCtxAddInput); + REDIDL_MODULE_INIT_FUNCTION(RunCtxAddOutput); + REDIDL_MODULE_INIT_FUNCTION(RunCtxNumOutputs); + REDIDL_MODULE_INIT_FUNCTION(RunCtxOutputTensor); + REDIDL_MODULE_INIT_FUNCTION(RunCtxFree); + REDIDL_MODULE_INIT_FUNCTION(GraphRun); + REDIDL_MODULE_INIT_FUNCTION(GraphGetShallowCopy); + return true; +} + + +#endif /* SRC_REDISDL_H_ */ diff --git a/src/tensor.c b/src/tensor.c index ac85ed162..792c6bebd 100644 --- a/src/tensor.c +++ b/src/tensor.c @@ -5,6 +5,11 @@ RedisModuleType *RedisDL_TensorType = NULL; +typedef struct RDL_Tensor { + TF_Tensor* tensor; + size_t refCount; +}RDL_Tensor; + static TF_DataType Tensor_GetDataType(const char* typestr){ if (strcasecmp(typestr, "FLOAT") == 0){ return TF_FLOAT; @@ -80,10 +85,10 @@ static void Tensor_RdbSave(RedisModuleIO *rdb, void *value){ } static void Tensor_DTFree(void *value){ - Tensor_Free(value); + RDL_TensorFree(value); } -int Tensor_Init(RedisModuleCtx* ctx){ +int RDL_TensorInit(RedisModuleCtx* ctx){ RedisModuleTypeMethods tmTensor = { .version = REDISMODULE_TYPE_METHOD_VERSION, .rdb_load = Tensor_RdbLoad, @@ -97,7 +102,7 @@ int Tensor_Init(RedisModuleCtx* ctx){ return RedisDL_TensorType != NULL; } -RDL_Tensor* Tensor_Create(const char* dataTypeStr, long long* dims,int ndims){ +RDL_Tensor* RDL_TensorCreate(const char* dataTypeStr, long long* dims,int ndims){ TF_DataType dtype = Tensor_GetDataType(dataTypeStr); if(!dtype){ return NULL; @@ -117,18 +122,18 @@ RDL_Tensor* Tensor_Create(const char* dataTypeStr, long long* dims,int ndims){ return ret; } -RDL_Tensor* Tensor_CreateFromTensor(TF_Tensor *tensor){ +RDL_Tensor* RDL_TensorCreateFromTensor(TF_Tensor *tensor){ RDL_Tensor* ret = RedisModule_Alloc(sizeof(*ret)); ret->tensor = tensor; ret->refCount = 1; return ret; } -TF_DataType Tensor_DataType(RDL_Tensor* t){ +TF_DataType RDL_TensorDataType(RDL_Tensor* t){ return TF_TensorType(t->tensor); } -size_t Tensor_GetDataSize(const char* dataTypeStr){ +size_t RDL_TensorGetDataSize(const char* dataTypeStr){ TF_DataType dtype = Tensor_GetDataType(dataTypeStr); if(!dtype){ return 0; @@ -136,19 +141,19 @@ size_t Tensor_GetDataSize(const char* dataTypeStr){ return Tensor_DataSize(dtype); } -void Tensor_Free(RDL_Tensor* t){ +void RDL_TensorFree(RDL_Tensor* t){ if(--t->refCount <= 0){ TF_DeleteTensor(t->tensor); RedisModule_Free(t); } } -int Tensor_SetData(RDL_Tensor* t, const char* data, size_t len){ +int RDL_TensorSetData(RDL_Tensor* t, const char* data, size_t len){ memcpy(TF_TensorData(t->tensor), data, len); return 1; } -int Tensor_SetValueFromLongLong(RDL_Tensor* t, long long i, long long val){ +int RDL_TensorSetValueFromLongLong(RDL_Tensor* t, long long i, long long val){ switch (TF_TensorType(t->tensor)) { case TF_BOOL: ((int8_t*)TF_TensorData(t->tensor))[i] = val; break; @@ -170,7 +175,7 @@ int Tensor_SetValueFromLongLong(RDL_Tensor* t, long long i, long long val){ return 1; } -int Tensor_SetValueFromDouble(RDL_Tensor* t, long long i, double val){ +int RDL_TensorSetValueFromDouble(RDL_Tensor* t, long long i, double val){ switch (TF_TensorType(t->tensor)) { case TF_FLOAT: ((float*)TF_TensorData(t->tensor))[i] = val; break; @@ -182,7 +187,7 @@ int Tensor_SetValueFromDouble(RDL_Tensor* t, long long i, double val){ return 1; } -int Tensor_GetValueAsDouble(RDL_Tensor* t, long long i, double* val) { +int RDL_TensorGetValueAsDouble(RDL_Tensor* t, long long i, double* val) { switch (TF_TensorType(t->tensor)) { case TF_FLOAT: *val = ((float*)TF_TensorData(t->tensor))[i]; break; @@ -194,7 +199,7 @@ int Tensor_GetValueAsDouble(RDL_Tensor* t, long long i, double* val) { return 1; } -int Tensor_GetValueAsLongLong(RDL_Tensor* t, long long i, long long* val) { +int RDL_TensorGetValueAsLongLong(RDL_Tensor* t, long long i, long long* val) { switch (TF_TensorType(t->tensor)) { case TF_BOOL: *val = ((int8_t*)TF_TensorData(t->tensor))[i]; break; @@ -215,3 +220,28 @@ int Tensor_GetValueAsLongLong(RDL_Tensor* t, long long i, long long* val) { } return 1; } + +RDL_Tensor* RDL_TensorGetShallowCopy(RDL_Tensor* t){ + ++t->refCount; + return t; +} + +int RDL_TensorNumDims(RDL_Tensor* t){ + return TF_NumDims(t->tensor); +} + +long long RDL_TensorDim(RDL_Tensor* t, int dim){ + return TF_Dim(t->tensor, dim); +} + +size_t RDL_TensorByteSize(RDL_Tensor* t){ + return TF_TensorByteSize(t->tensor); +} + +char* RDL_TensorData(RDL_Tensor* t){ + return TF_TensorData(t->tensor); +} + +TF_Tensor* RDL_TensorGetTensor(RDL_Tensor* t){ + return t->tensor; +} diff --git a/src/tensor.h b/src/tensor.h index d7c7394cc..7d0bb045f 100644 --- a/src/tensor.h +++ b/src/tensor.h @@ -9,26 +9,27 @@ #define SRC_TENSOR_H_ #include "tensorflow/c/c_api.h" -#include "redismodule.h" - -typedef struct RDL_Tensor { - TF_Tensor* tensor; - size_t refCount; -}RDL_Tensor; +#include "redisdl.h" extern RedisModuleType *RedisDL_TensorType; -int Tensor_Init(RedisModuleCtx* ctx); -RDL_Tensor* Tensor_Create(const char* dataTypeStr, long long* dims,int ndims); -RDL_Tensor* Tensor_CreateFromTensor(TF_Tensor *tensor); -size_t Tensor_GetDataSize(const char* dataTypeStr); -TF_DataType Tensor_DataType(RDL_Tensor* t); -void Tensor_Free(RDL_Tensor* t); -int Tensor_SetData(RDL_Tensor* tensor, const char* data, size_t len); -int Tensor_SetValueFromLongLong(RDL_Tensor* tensor, long long i, long long val); -int Tensor_SetValueFromDouble(RDL_Tensor* tensor, long long i, double val); -int Tensor_GetValueAsDouble(RDL_Tensor* t, long long i, double* val); -int Tensor_GetValueAsLongLong(RDL_Tensor* t, long long i, long long* val); +int RDL_TensorInit(RedisModuleCtx* ctx); +RDL_Tensor* RDL_TensorCreate(const char* dataTypeStr, long long* dims,int ndims); +RDL_Tensor* RDL_TensorCreateFromTensor(TF_Tensor *tensor); +size_t RDL_TensorGetDataSize(const char* dataTypeStr); +TF_DataType RDL_TensorDataType(RDL_Tensor* t); +void RDL_TensorFree(RDL_Tensor* t); +int RDL_TensorSetData(RDL_Tensor* tensor, const char* data, size_t len); +int RDL_TensorSetValueFromLongLong(RDL_Tensor* tensor, long long i, long long val); +int RDL_TensorSetValueFromDouble(RDL_Tensor* tensor, long long i, double val); +int RDL_TensorGetValueAsDouble(RDL_Tensor* t, long long i, double* val); +int RDL_TensorGetValueAsLongLong(RDL_Tensor* t, long long i, long long* val); +RDL_Tensor* RDL_TensorGetShallowCopy(RDL_Tensor* t); +int RDL_TensorNumDims(RDL_Tensor* t); +long long RDL_TensorDim(RDL_Tensor* t, int dim); +size_t RDL_TensorByteSize(RDL_Tensor* t); +char* RDL_TensorData(RDL_Tensor* t); +TF_Tensor* RDL_TensorGetTensor(RDL_Tensor* t); diff --git a/src/utils/arr.h b/src/utils/arr.h new file mode 100644 index 000000000..119fd6aa7 --- /dev/null +++ b/src/utils/arr.h @@ -0,0 +1,166 @@ +#ifndef UTIL_ARR_H_ +#define UTIL_ARR_H_ +/* arr.h - simple, easy to use dynamic array with fat pointers, + * to allow native access to members. It can accept pointers, struct literals and scalars. + * + * Example usage: + * + * int *arr = array_new(int, 8); + * // Add elements to the array + * for (int i = 0; i < 100; i++) { + * arr = array_append(arr, i); + * } + * + * // read individual elements + * for (int i = 0; i < array_len(arr); i++) { + * printf("%d\n", arr[i]); + * } + * + * array_free(arr); + * + * + * */ +#include +#include +#include +#include +#include +#include + +/* Definition of malloc & friedns that can be overridden before including arr.h. + * Alternatively you can include arr_rm_alloc.h, which wraps arr.h and sets the allcoation functions + * to those of the RM_ family + */ +#ifndef array_alloc_fn +#define array_alloc_fn malloc +#define array_realloc_fn realloc +#define array_free_fn free +#endif + +typedef struct { + uint32_t len; + // TODO: optimize memory by making cap a 16-bit delta from len, and elem_sz 16 bit as well. This + // makes the whole header fit in 64 bit + uint32_t cap; + uint32_t elem_sz; + char buf[]; +} array_hdr_t; + +typedef void *array_t; +/* Internal - calculate the array size for allocations */ +#define array_sizeof(hdr) (sizeof(array_hdr_t) + hdr->cap * hdr->elem_sz) +/* Internal - get a pointer to the array header */ +#define array_hdr(arr) ((array_hdr_t *)(((char *)arr) - sizeof(array_hdr_t))) +/* Interanl - get a pointer to an element inside the array at a given index */ +#define array_elem(arr, idx) (*((void **)((char *)arr + (idx * array_hdr(arr)->elem_sz)))) + +/* Initialize a new array with a given element size and capacity. Should not be used directly - use + * array_new instead */ +static array_t array_new_sz(uint32_t elem_sz, uint32_t cap, uint32_t len) { + array_hdr_t *hdr = array_alloc_fn(sizeof(array_hdr_t) + cap * elem_sz); + hdr->cap = cap; + hdr->elem_sz = elem_sz; + hdr->len = len; + return (array_t)(hdr->buf); +} + +/* Initialize an array for a given type T with a given capacity and zero length. The array should be + * case to a pointer to that type. e.g. + * + * int *arr = array_new(int, 4); + * + * This allows direct access to elements + * */ +#define array_new(T, cap) (array_new_sz(sizeof(T), cap, 0)) + +/* Initialize an array for a given type T with a given length. The capacity allocated is identical + * to the length + * */ +#define array_newlen(T, len) (array_new_sz(sizeof(T), len, len)) + +static inline array_t array_ensure_cap(array_t arr, uint32_t cap) { + array_hdr_t *hdr = array_hdr(arr); + if (cap > hdr->cap) { + hdr->cap = MAX(hdr->cap * 2, cap); + hdr = array_realloc_fn(hdr, array_sizeof(hdr)); + } + return (array_t)hdr->buf; +} + +/* Ensure capacity for the array to grow by one */ +static inline array_t array_grow(array_t arr) { + return array_ensure_cap(arr, ++array_hdr(arr)->len); +} + +/* get the last element in the array */ +#define array_tail(arr) (arr[array_hdr(arr)->len - 1]) + +/* Append an element to the array, returning the array which may have been reallocated */ +#define array_append(arr, x) \ + ({ \ + (arr) = array_grow((arr)); \ + array_tail((arr)) = (x); \ + (arr); \ + }) + +/* Get the length of the array */ +static inline uint32_t array_len(array_t arr) { + return arr ? array_hdr(arr)->len : 0; +} + +static inline void *array_trimm(array_t arr, uint32_t len, uint32_t cap) { + array_hdr_t *arr_hdr = array_hdr(arr); + assert(len >= 0 && "trimming len is negative"); + assert((cap == -1 || cap > 0 || len == cap) && "trimming capacity is illegal"); + assert((cap == -1 || cap >= len) && "trimming len is greater then capacity"); + assert((len <= arr_hdr->len) && "trimming len is greater then current len"); + arr_hdr->len = len; + if (cap != -1) { + arr_hdr->cap = cap; + arr_hdr = array_realloc_fn(arr_hdr, array_sizeof(arr_hdr)); + } + return arr_hdr->buf; +} + +#define array_trimm_len(arr, len) array_trimm(arr, len, -1) +#define array_trimm_cap(arr, len) array_trimm(arr, len, len) + +/* Free the array, without dealing with individual elements */ +static void array_free(array_t arr) { + array_free_fn(array_hdr(arr)); +} + +/* Repeate the code in "blk" for each element in the array, and give it the name of "as". + * e.g: + * int *arr = array_new(int, 10); + * arr = array_append(arr, 1); + * array_foreach(arr, i, printf("%d\n", i)); + */ +#define array_foreach(arr, as, blk) \ + ({ \ + for (uint32_t i = 0; i < array_len(arr); i++) { \ + typeof(*arr) as = arr[i]; \ + blk; \ + } \ + }) + +/* Free the array, freeing individual elements with free_cb */ +#define array_free_ex(arr, blk) \ + ({ \ + if (arr) { \ + for (uint32_t i = 0; i < array_len(arr); i++) { \ + void *ptr = &arr[i]; \ + { blk; } \ + } \ + array_free(arr); \ + } \ + }) + +/* Pop the top element from the array, reduce the size and return it */ +#define array_pop(arr) \ + ({ \ + assert(array_hdr(arr)->len > 0); \ + arr[--(array_hdr(arr)->len)]; \ + }) + +#endif diff --git a/src/utils/arr_rm_alloc.h b/src/utils/arr_rm_alloc.h new file mode 100644 index 000000000..c9cd41fd7 --- /dev/null +++ b/src/utils/arr_rm_alloc.h @@ -0,0 +1,17 @@ +#ifndef ARR_RM_ALLOC_H_ +#define ARR_RM_ALLOC_H_ + +/* A wrapper for arr.h that sets the allocation functions to those of the RedisModule_Alloc & + * friends. This file should not be included alongside arr.h, and should not be included from .h + * files in general */ + +#include "redismodule.h" + +/* Define the allcation functions before including arr.h */ +#define array_alloc_fn RedisModule_Alloc +#define array_realloc_fn RedisModule_Realloc +#define array_free_fn RedisModule_Free + +#include "arr.h" + +#endif