diff --git a/src/serialization/AOF/rai_aof_rewrite.c b/src/serialization/AOF/rai_aof_rewrite.c index 813877cf2..0120ef052 100644 --- a/src/serialization/AOF/rai_aof_rewrite.c +++ b/src/serialization/AOF/rai_aof_rewrite.c @@ -41,23 +41,10 @@ void RAI_AOFRewriteModel(RedisModuleIO *aof, RedisModuleString *key, void *value return; } - // AI.MODELSET model_key backend device [INPUTS name1 name2 ... OUTPUTS name1 name2 ...] - // model_blob - - RedisModuleString **inputs_ = array_new(RedisModuleString *, model->ninputs); - RedisModuleString **outputs_ = array_new(RedisModuleString *, model->noutputs); - - RedisModuleCtx *ctx = RedisModule_GetContextFromIO(aof); - - for (size_t i = 0; i < model->ninputs; i++) { - inputs_ = array_append( - inputs_, RedisModule_CreateString(ctx, model->inputs[i], strlen(model->inputs[i]))); - } - - for (size_t i = 0; i < model->noutputs; i++) { - outputs_ = array_append( - outputs_, RedisModule_CreateString(ctx, model->outputs[i], strlen(model->outputs[i]))); - } + // AI.MODELSTORE model_key backend device [TAG tag] + // [BATCHSIZE n [MINBATCHSIZE m [MINBATCHTIMEOUT t]]] + // [INPUTS name1 name2 ... OUTPUTS name1 name2 ...] + // BLOB model_blob long long chunk_size = getModelChunkSize(); const size_t n_chunks = len / chunk_size + 1; @@ -66,7 +53,7 @@ void RAI_AOFRewriteModel(RedisModuleIO *aof, RedisModuleString *key, void *value for (size_t i = 0; i < n_chunks; i++) { size_t chunk_len = i < n_chunks - 1 ? chunk_size : len % chunk_size; buffers_ = array_append(buffers_, - RedisModule_CreateString(ctx, buffer + i * chunk_size, chunk_len)); + RedisModule_CreateString(NULL, buffer + i * chunk_size, chunk_len)); } if (buffer) { @@ -75,29 +62,54 @@ void RAI_AOFRewriteModel(RedisModuleIO *aof, RedisModuleString *key, void *value const char *backendstr = RAI_BackendName(model->backend); - RedisModule_EmitAOF(aof, "AI.MODELSET", "slccclclcvcvcv", key, backendstr, model->devicestr, - model->tag, "BATCHSIZE", model->opts.batchsize, "MINBATCHSIZE", - model->opts.minbatchsize, "INPUTS", inputs_, model->ninputs, "OUTPUTS", - outputs_, model->noutputs, "BLOB", buffers_, n_chunks); + if (model->backend != RAI_BACKEND_TENSORFLOW) { + + RedisModule_EmitAOF(aof, "AI.MODELSTORE", "scccsclclclcv", key, backendstr, + model->devicestr, "TAG", model->tag, "BATCHSIZE", model->opts.batchsize, + "MINBATCHSIZE", model->opts.minbatchsize, "MINBATCHTIMEOUT", + model->opts.minbatchtimeout, "BLOB", buffers_, n_chunks); + } else { + // For TF backend, the command should contain INPUTS and OUTPUTS names. + // Create RedisModuleString* arrays from the char* arrays, so we can send a proper vector + // to RedisModule_EmitAOF. + array_new_on_stack(RedisModuleString *, 5, inputs_); + array_new_on_stack(RedisModuleString *, 5, outputs_); + + for (size_t i = 0; i < model->ninputs; i++) { + inputs_ = array_append(inputs_, RedisModule_CreateString(NULL, model->inputs[i], + strlen(model->inputs[i]))); + } + for (size_t i = 0; i < model->noutputs; i++) { + outputs_ = array_append(outputs_, RedisModule_CreateString(NULL, model->outputs[i], + strlen(model->outputs[i]))); + } - for (size_t i = 0; i < model->ninputs; i++) { - RedisModule_FreeString(ctx, inputs_[i]); - } - array_free(inputs_); + RedisModule_EmitAOF(aof, "AI.MODELSTORE", "scccsclclclclvclvcv", key, backendstr, + model->devicestr, "TAG", model->tag, "BATCHSIZE", model->opts.batchsize, + "MINBATCHSIZE", model->opts.minbatchsize, "MINBATCHTIMEOUT", + model->opts.minbatchtimeout, "INPUTS", model->ninputs, inputs_, + model->ninputs, "OUTPUTS", model->noutputs, outputs_, model->noutputs, + "BLOB", buffers_, n_chunks); - for (size_t i = 0; i < model->noutputs; i++) { - RedisModule_FreeString(ctx, outputs_[i]); + for (size_t i = 0; i < model->ninputs; i++) { + RedisModule_FreeString(NULL, inputs_[i]); + } + array_free(inputs_); + + for (size_t i = 0; i < model->noutputs; i++) { + RedisModule_FreeString(NULL, outputs_[i]); + } + array_free(outputs_); } - array_free(outputs_); for (size_t i = 0; i < n_chunks; i++) { - RedisModule_FreeString(ctx, buffers_[i]); + RedisModule_FreeString(NULL, buffers_[i]); } array_free(buffers_); } void RAI_AOFRewriteScript(RedisModuleIO *aof, RedisModuleString *key, void *value) { RAI_Script *script = (RAI_Script *)value; - RedisModule_EmitAOF(aof, "AI.SCRIPTSET", "scccc", key, script->devicestr, script->tag, "SOURCE", - script->scriptdef); + RedisModule_EmitAOF(aof, "AI.SCRIPTSET", "sccscc", key, script->devicestr, "TAG", script->tag, + "SOURCE", script->scriptdef); } diff --git a/tests/flow/test_serializations.py b/tests/flow/test_serializations.py index a3baea8d9..cec3923c9 100644 --- a/tests/flow/test_serializations.py +++ b/tests/flow/test_serializations.py @@ -40,7 +40,8 @@ def torch_script_run(env, script_key): con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3) - con.execute_command('AI.SCRIPTRUN', script_key, 'bar', 'INPUTS', 'a{1}', 'b{1}', 'OUTPUTS', 'c{1}') + con.execute_command('AI.SCRIPTEXECUTE', script_key, 'bar', 'KEYS', 1, '{1}', 'INPUTS', 2, 'a{1}', 'b{1}', + 'OUTPUTS', 1, 'c{1}') ensureSlaveSynced(con, env) @@ -216,3 +217,160 @@ def test_v2_tensor(self): self.env.assertEqual([tensor_type, tensor_shape], [b"INT32", [2, 1]]) values = con.execute_command('AI.TENSORGET', key_name, 'VALUES') self.env.assertEqual(values, [1, 2]) + + +class TestAofRewrite: + + def __init__(self): + self.env = Env(useAof=True) + + def test_aof_rewrite_tf_model(self): + key_name = "tf_graph{1}" + con = self.env.getConnection() + tf_model = load_file_content("graph.pb") + con.execute_command('AI.MODELSTORE', key_name, 'TF', 'CPU', 'TAG', 'TF_GRAPH', 'batchsize', 4, 'minbatchsize', 2, + 'minbatchtimeout', 1000, 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', tf_model) + + # Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF. + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout\ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"TF", b"CPU", b"TF_GRAPH", 4, 2, 1000, [b"a", b"b"], [b"mul"]]) + tf_model_run(self.env, key_name) + + # Reinsert the model (without minbatchtimeout) + con.execute_command('AI.MODELSTORE', key_name, 'TF', 'CPU', 'TAG', 'TF_GRAPH1', 'batchsize', 4, 'minbatchsize', 2, + 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', tf_model) + # Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF. + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout\ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"TF", b"CPU", b"TF_GRAPH1", 4, 2, 0, [b"a", b"b"], [b"mul"]]) + + # Reinsert the model (without minbatch) + con.execute_command('AI.MODELSTORE', key_name, 'TF', 'CPU', 'TAG', 'TF_GRAPH2', 'batchsize', 4, + 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', tf_model) + # Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF. + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"TF", b"CPU", b"TF_GRAPH2", 4, 0, 0, [b"a", b"b"], [b"mul"]]) + + # Reinsert the model (without batching) + con.execute_command('AI.MODELSTORE', key_name, 'TF', 'CPU', 'TAG', 'TF_GRAPH3', + 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', tf_model) + # Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF. + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"TF", b"CPU", b"TF_GRAPH3", 0, 0, 0, [b"a", b"b"], [b"mul"]]) + + def test_aof_rewrite_torch_model(self): + key_name = "pt-minimal{1}" + con = self.env.getConnection() + torch_model = load_file_content("pt-minimal.pt") + con.execute_command('AI.MODELSTORE', key_name, 'TORCH', 'CPU', 'TAG', 'PT_MINIMAL', 'batchsize', 4, 'minbatchsize', 2, + 'minbatchtimeout', 1000, 'BLOB', torch_model) + + # Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF. + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout\ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"TORCH", b"CPU", b"PT_MINIMAL", 4, 2, 1000, [b"a", b"b"], [b'']]) + torch_model_run(self.env, key_name) + + # Reinsert the model (without minbatchtimeout) + con.execute_command('AI.MODELSTORE', key_name, 'TORCH', 'CPU', 'TAG', 'PT_MINIMAL1', 'batchsize', 4, 'minbatchsize', 2, + 'BLOB', torch_model) + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"TORCH", b"CPU", b"PT_MINIMAL1", 4, 2, 0, [b"a", b"b"], [b'']]) + + # Reinsert the model (without minbatch) + con.execute_command('AI.MODELSTORE', key_name, 'TORCH', 'CPU', 'TAG', 'PT_MINIMAL2', 'batchsize', 4, + 'BLOB', torch_model) + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"TORCH", b"CPU", b"PT_MINIMAL2", 4, 0, 0, [b"a", b"b"], [b'']]) + + # Reinsert the model (without batching) + con.execute_command('AI.MODELSTORE', key_name, 'TORCH', 'CPU', 'TAG', 'PT_MINIMAL3', + 'BLOB', torch_model) + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"TORCH", b"CPU", b"PT_MINIMAL3", 0, 0, 0, [b"a", b"b"], [b'']]) + + def test_aof_rewrite_troch_script(self): + key_name = "torch_script{1}" + con = self.env.getConnection() + torch_script = load_file_content("script.txt") + con.execute_command('AI.SCRIPTSET', key_name, 'CPU', 'TAG', 'TORCH_SCRIPT', 'SOURCE', torch_script) + + # Redis should save the stored script by calling the AOF rewrite callback and then reload from AOF. + self.env.restartAndReload() + _, device, _, tag = con.execute_command("AI.SCRIPTGET", key_name, "META") + self.env.assertEqual([device, tag], [b"CPU", b"TORCH_SCRIPT"]) + torch_script_run(self.env, key_name) + + def test_aof_rewrite_onnx_model(self): + key_name = "linear_iris{1}" + con = self.env.getConnection() + onnx_model = load_file_content("linear_iris.onnx") + con.execute_command('AI.MODELSTORE', key_name, 'ONNX', 'CPU', 'TAG', 'ONNX_LINEAR_IRIS', 'batchsize', 4, 'minbatchsize', 2, + 'minbatchtimeout', 1000, 'BLOB', onnx_model) + # Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF. + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout\ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"ONNX", b"CPU", b"ONNX_LINEAR_IRIS", 4, 2, 1000, [b'float_input'], [b'variable']]) + onnx_model_run(self.env, key_name) + + # Reinsert the model (without minbatchtimeout) + con.execute_command('AI.MODELSTORE', key_name, 'ONNX', 'CPU', 'TAG', 'ONNX_LINEAR_IRIS1', 'batchsize', 4, + 'minbatchsize', 2, 'BLOB', onnx_model) + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"ONNX", b"CPU", b"ONNX_LINEAR_IRIS1", 4, 2, 0, [b'float_input'], [b'variable']]) + + # Reinsert the model (without minbatch) + con.execute_command('AI.MODELSTORE', key_name, 'ONNX', 'CPU', 'TAG', 'ONNX_LINEAR_IRIS2', 'batchsize', 4, + 'BLOB', onnx_model) + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"ONNX", b"CPU", b"ONNX_LINEAR_IRIS2", 4, 0, 0, [b'float_input'], [b'variable']]) + + # Reinsert the model (without batching) + con.execute_command('AI.MODELSTORE', key_name, 'ONNX', 'CPU', 'TAG', 'ONNX_LINEAR_IRIS3', + 'BLOB', onnx_model) + self.env.restartAndReload() + _, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \ + = con.execute_command("AI.MODELGET", key_name, "META") + self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs], + [b"ONNX", b"CPU", b"ONNX_LINEAR_IRIS3", 0, 0, 0, [b'float_input'], [b'variable']]) + + def test_aof_rewrite_tensor(self): + key_name = "tensor{1}" + con = self.env.getConnection() + con.execute_command('AI.TENSORSET', key_name, 'INT32', 2, 1, 'VALUES', 1, 2) + # Redis should save the stored tensor by calling the AOF rewrite callback and then reload from AOF. + self.env.restartAndReload() + _, tensor_type, _, tensor_shape = con.execute_command('AI.TENSORGET', key_name, 'META') + self.env.assertEqual([tensor_type, tensor_shape], [b"INT32", [2, 1]]) + values = con.execute_command('AI.TENSORGET', key_name, 'VALUES') + self.env.assertEqual(values, [1, 2])