Skip to content

Aof rewrite fix and test #754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
0e372e6
Fix aof rewrite callback for model type, starting to test it (for TF)…
alonre24 May 16, 2021
09b82e8
Add changes to model rewrite
alonre24 May 18, 2021
3b4e7a2
Merge with master
alonre24 May 19, 2021
807fdde
Update aof rewrite to save minbatchtimeout as well
alonre24 May 20, 2021
4b01e40
More fixes for model rewrite + change script rewrite. Add tests for a…
alonre24 May 20, 2021
ff579e4
Merge branch 'master' into AOF_rewrite_fix_and_test
alonre24 May 20, 2021
3a1e418
PR fixes. Use patch in tests to pass when using slaves.
alonre24 May 23, 2021
8282910
Merge branch 'AOF_rewrite_fix_and_test' of https://github.com/RedisAI…
alonre24 May 23, 2021
86c6da3
Don't run tests when using slaves (may require RLTEST fix)
alonre24 May 23, 2021
a54643c
Merge branch 'master' into AOF_rewrite_fix_and_test
alonre24 May 23, 2021
e9f549c
restart only on master, not slaves
alonre24 May 24, 2021
a834f60
Merge branch 'AOF_rewrite_fix_and_test' of https://github.com/RedisAI…
alonre24 May 24, 2021
e62de80
Merge branch 'master' into AOF_rewrite_fix_and_test
alonre24 May 24, 2021
d529003
Change back tests to use `restartAndReload` after fixing RLTest
alonre24 May 25, 2021
aae1a8d
Merge branch 'master' into AOF_rewrite_fix_and_test
alonre24 May 26, 2021
7f0366d
Merge branch 'AOF_rewrite_fix_and_test' of https://github.com/RedisAI…
alonre24 May 26, 2021
be58e71
Remove leftovers from test
alonre24 May 26, 2021
c0b058d
Extend tests for every backend and config.
alonre24 May 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions src/serialization/AOF/rai_aof_rewrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,10 @@ void RAI_AOFRewriteModel(RedisModuleIO *aof, RedisModuleString *key, void *value
return;
}

// AI.MODELSET model_key backend device [INPUTS name1 name2 ... OUTPUTS name1 name2 ...]
// model_blob

RedisModuleString **inputs_ = array_new(RedisModuleString *, model->ninputs);
RedisModuleString **outputs_ = array_new(RedisModuleString *, model->noutputs);

RedisModuleCtx *ctx = RedisModule_GetContextFromIO(aof);

for (size_t i = 0; i < model->ninputs; i++) {
inputs_ = array_append(
inputs_, RedisModule_CreateString(ctx, model->inputs[i], strlen(model->inputs[i])));
}

for (size_t i = 0; i < model->noutputs; i++) {
outputs_ = array_append(
outputs_, RedisModule_CreateString(ctx, model->outputs[i], strlen(model->outputs[i])));
}
// AI.MODELSTORE model_key backend device [TAG tag]
// [BATCHSIZE n [MINBATCHSIZE m [MINBATCHTIMEOUT t]]]
// [INPUTS <input_count> name1 name2 ... OUTPUTS <output_count> name1 name2 ...]
// BLOB model_blob

long long chunk_size = getModelChunkSize();
const size_t n_chunks = len / chunk_size + 1;
Expand All @@ -66,7 +53,7 @@ void RAI_AOFRewriteModel(RedisModuleIO *aof, RedisModuleString *key, void *value
for (size_t i = 0; i < n_chunks; i++) {
size_t chunk_len = i < n_chunks - 1 ? chunk_size : len % chunk_size;
buffers_ = array_append(buffers_,
RedisModule_CreateString(ctx, buffer + i * chunk_size, chunk_len));
RedisModule_CreateString(NULL, buffer + i * chunk_size, chunk_len));
}

if (buffer) {
Expand All @@ -75,29 +62,54 @@ void RAI_AOFRewriteModel(RedisModuleIO *aof, RedisModuleString *key, void *value

const char *backendstr = RAI_BackendName(model->backend);

RedisModule_EmitAOF(aof, "AI.MODELSET", "slccclclcvcvcv", key, backendstr, model->devicestr,
model->tag, "BATCHSIZE", model->opts.batchsize, "MINBATCHSIZE",
model->opts.minbatchsize, "INPUTS", inputs_, model->ninputs, "OUTPUTS",
outputs_, model->noutputs, "BLOB", buffers_, n_chunks);
if (model->backend != RAI_BACKEND_TENSORFLOW) {

RedisModule_EmitAOF(aof, "AI.MODELSTORE", "scccsclclclcv", key, backendstr,
model->devicestr, "TAG", model->tag, "BATCHSIZE", model->opts.batchsize,
"MINBATCHSIZE", model->opts.minbatchsize, "MINBATCHTIMEOUT",
model->opts.minbatchtimeout, "BLOB", buffers_, n_chunks);
} else {
// For TF backend, the command should contain INPUTS and OUTPUTS names.
// Create RedisModuleString* arrays from the char* arrays, so we can send a proper vector
// to RedisModule_EmitAOF.
array_new_on_stack(RedisModuleString *, 5, inputs_);
array_new_on_stack(RedisModuleString *, 5, outputs_);

for (size_t i = 0; i < model->ninputs; i++) {
inputs_ = array_append(inputs_, RedisModule_CreateString(NULL, model->inputs[i],
strlen(model->inputs[i])));
}
for (size_t i = 0; i < model->noutputs; i++) {
outputs_ = array_append(outputs_, RedisModule_CreateString(NULL, model->outputs[i],
strlen(model->outputs[i])));
}

for (size_t i = 0; i < model->ninputs; i++) {
RedisModule_FreeString(ctx, inputs_[i]);
}
array_free(inputs_);
RedisModule_EmitAOF(aof, "AI.MODELSTORE", "scccsclclclclvclvcv", key, backendstr,
model->devicestr, "TAG", model->tag, "BATCHSIZE", model->opts.batchsize,
"MINBATCHSIZE", model->opts.minbatchsize, "MINBATCHTIMEOUT",
model->opts.minbatchtimeout, "INPUTS", model->ninputs, inputs_,
model->ninputs, "OUTPUTS", model->noutputs, outputs_, model->noutputs,
"BLOB", buffers_, n_chunks);

for (size_t i = 0; i < model->noutputs; i++) {
RedisModule_FreeString(ctx, outputs_[i]);
for (size_t i = 0; i < model->ninputs; i++) {
RedisModule_FreeString(NULL, inputs_[i]);
}
array_free(inputs_);

for (size_t i = 0; i < model->noutputs; i++) {
RedisModule_FreeString(NULL, outputs_[i]);
}
array_free(outputs_);
}
array_free(outputs_);

for (size_t i = 0; i < n_chunks; i++) {
RedisModule_FreeString(ctx, buffers_[i]);
RedisModule_FreeString(NULL, buffers_[i]);
}
array_free(buffers_);
}

void RAI_AOFRewriteScript(RedisModuleIO *aof, RedisModuleString *key, void *value) {
RAI_Script *script = (RAI_Script *)value;
RedisModule_EmitAOF(aof, "AI.SCRIPTSET", "scccc", key, script->devicestr, script->tag, "SOURCE",
script->scriptdef);
RedisModule_EmitAOF(aof, "AI.SCRIPTSET", "sccscc", key, script->devicestr, "TAG", script->tag,
"SOURCE", script->scriptdef);
}
160 changes: 159 additions & 1 deletion tests/flow/test_serializations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def torch_script_run(env, script_key):

con.execute_command('AI.TENSORSET', 'b{1}', 'FLOAT', 2, 2, 'VALUES', 2, 3, 2, 3)

con.execute_command('AI.SCRIPTRUN', script_key, 'bar', 'INPUTS', 'a{1}', 'b{1}', 'OUTPUTS', 'c{1}')
con.execute_command('AI.SCRIPTEXECUTE', script_key, 'bar', 'KEYS', 1, '{1}', 'INPUTS', 2, 'a{1}', 'b{1}',
'OUTPUTS', 1, 'c{1}')

ensureSlaveSynced(con, env)

Expand Down Expand Up @@ -216,3 +217,160 @@ def test_v2_tensor(self):
self.env.assertEqual([tensor_type, tensor_shape], [b"INT32", [2, 1]])
values = con.execute_command('AI.TENSORGET', key_name, 'VALUES')
self.env.assertEqual(values, [1, 2])


class TestAofRewrite:

def __init__(self):
self.env = Env(useAof=True)

def test_aof_rewrite_tf_model(self):
key_name = "tf_graph{1}"
con = self.env.getConnection()
tf_model = load_file_content("graph.pb")
con.execute_command('AI.MODELSTORE', key_name, 'TF', 'CPU', 'TAG', 'TF_GRAPH', 'batchsize', 4, 'minbatchsize', 2,
'minbatchtimeout', 1000, 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', tf_model)

# Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF.
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout\
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"TF", b"CPU", b"TF_GRAPH", 4, 2, 1000, [b"a", b"b"], [b"mul"]])
tf_model_run(self.env, key_name)

# Reinsert the model (without minbatchtimeout)
con.execute_command('AI.MODELSTORE', key_name, 'TF', 'CPU', 'TAG', 'TF_GRAPH1', 'batchsize', 4, 'minbatchsize', 2,
'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', tf_model)
# Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF.
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout\
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"TF", b"CPU", b"TF_GRAPH1", 4, 2, 0, [b"a", b"b"], [b"mul"]])

# Reinsert the model (without minbatch)
con.execute_command('AI.MODELSTORE', key_name, 'TF', 'CPU', 'TAG', 'TF_GRAPH2', 'batchsize', 4,
'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', tf_model)
# Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF.
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"TF", b"CPU", b"TF_GRAPH2", 4, 0, 0, [b"a", b"b"], [b"mul"]])

# Reinsert the model (without batching)
con.execute_command('AI.MODELSTORE', key_name, 'TF', 'CPU', 'TAG', 'TF_GRAPH3',
'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', tf_model)
# Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF.
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"TF", b"CPU", b"TF_GRAPH3", 0, 0, 0, [b"a", b"b"], [b"mul"]])

def test_aof_rewrite_torch_model(self):
key_name = "pt-minimal{1}"
con = self.env.getConnection()
torch_model = load_file_content("pt-minimal.pt")
con.execute_command('AI.MODELSTORE', key_name, 'TORCH', 'CPU', 'TAG', 'PT_MINIMAL', 'batchsize', 4, 'minbatchsize', 2,
'minbatchtimeout', 1000, 'BLOB', torch_model)

# Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF.
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout\
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"TORCH", b"CPU", b"PT_MINIMAL", 4, 2, 1000, [b"a", b"b"], [b'']])
torch_model_run(self.env, key_name)

# Reinsert the model (without minbatchtimeout)
con.execute_command('AI.MODELSTORE', key_name, 'TORCH', 'CPU', 'TAG', 'PT_MINIMAL1', 'batchsize', 4, 'minbatchsize', 2,
'BLOB', torch_model)
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"TORCH", b"CPU", b"PT_MINIMAL1", 4, 2, 0, [b"a", b"b"], [b'']])

# Reinsert the model (without minbatch)
con.execute_command('AI.MODELSTORE', key_name, 'TORCH', 'CPU', 'TAG', 'PT_MINIMAL2', 'batchsize', 4,
'BLOB', torch_model)
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"TORCH", b"CPU", b"PT_MINIMAL2", 4, 0, 0, [b"a", b"b"], [b'']])

# Reinsert the model (without batching)
con.execute_command('AI.MODELSTORE', key_name, 'TORCH', 'CPU', 'TAG', 'PT_MINIMAL3',
'BLOB', torch_model)
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"TORCH", b"CPU", b"PT_MINIMAL3", 0, 0, 0, [b"a", b"b"], [b'']])

def test_aof_rewrite_troch_script(self):
key_name = "torch_script{1}"
con = self.env.getConnection()
torch_script = load_file_content("script.txt")
con.execute_command('AI.SCRIPTSET', key_name, 'CPU', 'TAG', 'TORCH_SCRIPT', 'SOURCE', torch_script)

# Redis should save the stored script by calling the AOF rewrite callback and then reload from AOF.
self.env.restartAndReload()
_, device, _, tag = con.execute_command("AI.SCRIPTGET", key_name, "META")
self.env.assertEqual([device, tag], [b"CPU", b"TORCH_SCRIPT"])
torch_script_run(self.env, key_name)

def test_aof_rewrite_onnx_model(self):
key_name = "linear_iris{1}"
con = self.env.getConnection()
onnx_model = load_file_content("linear_iris.onnx")
con.execute_command('AI.MODELSTORE', key_name, 'ONNX', 'CPU', 'TAG', 'ONNX_LINEAR_IRIS', 'batchsize', 4, 'minbatchsize', 2,
'minbatchtimeout', 1000, 'BLOB', onnx_model)
# Redis should save the stored model by calling the AOF rewrite callback and then reload from AOF.
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout\
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"ONNX", b"CPU", b"ONNX_LINEAR_IRIS", 4, 2, 1000, [b'float_input'], [b'variable']])
onnx_model_run(self.env, key_name)

# Reinsert the model (without minbatchtimeout)
con.execute_command('AI.MODELSTORE', key_name, 'ONNX', 'CPU', 'TAG', 'ONNX_LINEAR_IRIS1', 'batchsize', 4,
'minbatchsize', 2, 'BLOB', onnx_model)
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"ONNX", b"CPU", b"ONNX_LINEAR_IRIS1", 4, 2, 0, [b'float_input'], [b'variable']])

# Reinsert the model (without minbatch)
con.execute_command('AI.MODELSTORE', key_name, 'ONNX', 'CPU', 'TAG', 'ONNX_LINEAR_IRIS2', 'batchsize', 4,
'BLOB', onnx_model)
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"ONNX", b"CPU", b"ONNX_LINEAR_IRIS2", 4, 0, 0, [b'float_input'], [b'variable']])

# Reinsert the model (without batching)
con.execute_command('AI.MODELSTORE', key_name, 'ONNX', 'CPU', 'TAG', 'ONNX_LINEAR_IRIS3',
'BLOB', onnx_model)
self.env.restartAndReload()
_, backend, _, device, _, tag, _, batchsize, _, minbatchsize, _ , inputs, _, outputs, _, minbatchtimeout \
= con.execute_command("AI.MODELGET", key_name, "META")
self.env.assertEqual([backend, device, tag, batchsize, minbatchsize, minbatchtimeout, inputs, outputs],
[b"ONNX", b"CPU", b"ONNX_LINEAR_IRIS3", 0, 0, 0, [b'float_input'], [b'variable']])

def test_aof_rewrite_tensor(self):
key_name = "tensor{1}"
con = self.env.getConnection()
con.execute_command('AI.TENSORSET', key_name, 'INT32', 2, 1, 'VALUES', 1, 2)
# Redis should save the stored tensor by calling the AOF rewrite callback and then reload from AOF.
self.env.restartAndReload()
_, tensor_type, _, tensor_shape = con.execute_command('AI.TENSORGET', key_name, 'META')
self.env.assertEqual([tensor_type, tensor_shape], [b"INT32", [2, 1]])
values = con.execute_command('AI.TENSORGET', key_name, 'VALUES')
self.env.assertEqual(values, [1, 2])