Skip to content

Commit eb69930

Browse files
AnaNekDifferentialOrange
authored andcommitted
sharding func: fix specifying vshard sharding funcs
Starting from 0.11.0 user can specify sharding func to calculate bucket_id with sharding func definition as a part of DDL schema or insert manually to the space `_ddl_sharding_func`. Right now ddl fails with setting schema with vshard sharding function. But even if this bug is fixed, there is also a bug on CRUD side. Inserting manually to the space `_ddl_sharding_func` showed that CRUD search vshard sharding func in `_G` but this approach doesn't work with vshard case. This patch allows to specify `vshard` sharding func inserting manually to the space `_ddl_sharding_func`. Closes #314
1 parent ddfc251 commit eb69930

File tree

6 files changed

+168
-6
lines changed

6 files changed

+168
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1414

1515
### Fixed
1616

17+
- Fix specifying `vshard` sharding funcs (#314).
18+
1719
## [0.12.1] - 21-07-22
1820

1921
### Fixed

crud/common/sharding/sharding_func.lua

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack
99

1010
local sharding_func_module = {}
1111

12+
local sharding_module_names = {
13+
['vshard'] = true,
14+
}
15+
1216
local function is_callable(object)
1317
if type(object) == 'function' then
1418
return true
@@ -43,13 +47,33 @@ end
4347
local function get_function_from_G(func_name)
4448
local chunks = string.split(func_name, '.')
4549
local sharding_func = _G
50+
local sharding_module = false
51+
local ok
52+
53+
if sharding_module_names[chunks[1]] then
54+
ok, sharding_func = pcall(require, chunks[1])
55+
if not ok then
56+
return nil
57+
end
58+
59+
sharding_module = true
60+
table.remove(chunks, 1)
61+
end
4662

4763
-- check is the each chunk an identifier
4864
for _, chunk in pairs(chunks) do
4965
if not utils.check_name_isident(chunk) or sharding_func == nil then
5066
return nil
5167
end
52-
sharding_func = rawget(sharding_func, chunk)
68+
69+
-- `vshard` store sharding functions in metatable,
70+
-- this metatable is common for all `vshard` routers.
71+
-- That's why for `vshard` case we can't use rawget.
72+
if sharding_module then
73+
sharding_func = sharding_func[chunk]
74+
else
75+
sharding_func = rawget(sharding_func, chunk)
76+
end
5377
end
5478

5579
return sharding_func

deps.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ rm "${LUACOV_COVERALLS_ROCKSPEC_FILE}"
2727
rmdir "${TMPDIR}"
2828

2929
tarantoolctl rocks install cartridge 2.7.4
30-
tarantoolctl rocks install ddl 1.6.0
30+
tarantoolctl rocks install ddl 1.6.2
3131
tarantoolctl rocks install migrations 0.4.2
3232

3333
tarantoolctl rocks make

test/entrypoint/srv_ddl.lua

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,14 @@ package.preload['customers-storage'] = function()
161161
local customers_G_func_schema = table.deepcopy(customers_id_key_schema)
162162
customers_G_func_schema.sharding_func = 'some_module.sharding_func'
163163

164+
local customers_empty_sharding_func_schema = table.deepcopy(customers_id_key_schema)
165+
166+
local customers_vshard_mpcrc32_schema = table.deepcopy(customers_id_key_schema)
167+
customers_vshard_mpcrc32_schema.sharding_func = 'vshard.router.bucket_id_mpcrc32'
168+
169+
local customers_vshard_strcrc32_schema = table.deepcopy(customers_id_key_schema)
170+
customers_vshard_strcrc32_schema.sharding_func = 'vshard.router.bucket_id_strcrc32'
171+
164172
local schema = {
165173
spaces = {
166174
customers = customers_id_schema,
@@ -173,6 +181,9 @@ package.preload['customers-storage'] = function()
173181
customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema,
174182
customers_G_func = customers_G_func_schema,
175183
customers_body_func = customers_body_func_schema,
184+
customers_empty_sharding_func = customers_empty_sharding_func_schema,
185+
customers_vshard_mpcrc32 = customers_vshard_mpcrc32_schema,
186+
customers_vshard_strcrc32 = customers_vshard_strcrc32_schema,
176187
}
177188
}
178189

@@ -188,7 +199,9 @@ package.preload['customers-storage'] = function()
188199
box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}})
189200
end)
190201
rawset(_G, 'set_sharding_func', function(space_name, fieldno_sharding_func, sharding_func_def)
191-
box.space['_ddl_sharding_func']:update(space_name, {{'=', fieldno_sharding_func, sharding_func_def}})
202+
local record = {space_name, box.NULL, box.NULL}
203+
record[fieldno_sharding_func] = sharding_func_def
204+
box.space['_ddl_sharding_func']:replace(record)
192205
end)
193206
end,
194207
}

test/integration/ddl_sharding_func_test.lua

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ local cache_group = t.group('ddl_sharding_func_cache', {
2121
{engine = 'vinyl'},
2222
})
2323

24+
local vshard_group = t.group('ddl_vshard_sharding_func', {
25+
{engine = 'memtx'},
26+
{engine = 'vinyl'},
27+
})
28+
2429
pgroup.before_all(function(g)
2530
g.cluster = helpers.Cluster:new({
2631
datadir = fio.tempdir(),
@@ -77,6 +82,35 @@ cache_group.before_each(function(g)
7782
helpers.truncate_space_on_cluster(g.cluster, 'customers_body_func')
7883
end)
7984

85+
vshard_group.before_all(function(g)
86+
g.cluster = helpers.Cluster:new({
87+
datadir = fio.tempdir(),
88+
server_command = helpers.entrypoint('srv_ddl'),
89+
use_vshard = true,
90+
replicasets = helpers.get_test_replicasets(),
91+
env = {
92+
['ENGINE'] = g.params.engine,
93+
},
94+
})
95+
g.cluster:start()
96+
local result, err = g.cluster.main_server.net_box:eval([[
97+
local ddl = require('ddl')
98+
99+
local ok, err = ddl.get_schema()
100+
return ok, err
101+
]])
102+
t.assert_equals(type(result), 'table')
103+
t.assert_equals(err, nil)
104+
end)
105+
106+
vshard_group.after_all(function(g) helpers.stop_cluster(g.cluster) end)
107+
108+
vshard_group.before_each(function(g)
109+
helpers.truncate_space_on_cluster(g.cluster, 'customers_vshard_mpcrc32')
110+
helpers.truncate_space_on_cluster(g.cluster, 'customers_vshard_strcrc32')
111+
helpers.truncate_space_on_cluster(g.cluster, 'customers_empty_sharding_func')
112+
end)
113+
80114
pgroup.test_insert_object = function(g)
81115
local result, err = g.cluster.main_server.net_box:call(
82116
'crud.insert_object', {g.params.space_name, {id = 158, name = 'Augustus', age = 48}})
@@ -703,7 +737,7 @@ cache_group.test_update_cache_with_incorrect_func = function(g)
703737

704738
-- records for all spaces exist
705739
local cache_size = helpers.get_sharding_func_cache_size(g.cluster)
706-
t.assert_equals(cache_size, 2)
740+
t.assert_equals(cache_size, 4)
707741

708742
-- no error just warning
709743
local space_name = 'customers_G_func'
@@ -719,7 +753,7 @@ cache_group.test_update_cache_with_incorrect_func = function(g)
719753
-- cache['customers_G_func'] == nil (space with incorrect func)
720754
-- other records for correct spaces exist in cache
721755
cache_size = helpers.get_sharding_func_cache_size(g.cluster)
722-
t.assert_equals(cache_size, 1)
756+
t.assert_equals(cache_size, 3)
723757

724758
-- get data from cache for space with incorrect sharding func
725759
local space_name = 'customers_G_func'
@@ -736,7 +770,7 @@ cache_group.test_update_cache_with_incorrect_func = function(g)
736770
-- cache['customers_G_func'] == nil (space with incorrect func)
737771
-- other records for correct spaces exist in cache
738772
cache_size = helpers.get_sharding_func_cache_size(g.cluster)
739-
t.assert_equals(cache_size, 1)
773+
t.assert_equals(cache_size, 3)
740774
end
741775

742776

@@ -938,3 +972,83 @@ pgroup.test_gh_278_count_with_explicit_bucket_id_and_ddl = function(g)
938972
t.assert_is_not(obj, nil)
939973
t.assert_equals(obj, 1)
940974
end
975+
976+
local vshard_cases = {
977+
mpcrc32_not_depends_on_ddl = {
978+
set_sharding_func_to_ddl_space = true,
979+
space_name = 'customers_empty_sharding_func',
980+
sharding_func_name = 'vshard.router.bucket_id_mpcrc32',
981+
bucket_id = 1614,
982+
srv_with_data = 's1-master',
983+
srv_without_data = 's2-master',
984+
},
985+
strcrc32_not_depends_on_ddl = {
986+
set_sharding_func_to_ddl_space = true,
987+
space_name = 'customers_empty_sharding_func',
988+
sharding_func_name = 'vshard.router.bucket_id_strcrc32',
989+
bucket_id = 477,
990+
srv_with_data = 's2-master',
991+
srv_without_data = 's1-master',
992+
},
993+
mpcrc32_depends_on_ddl = {
994+
space_name = 'customers_vshard_mpcrc32',
995+
sharding_func_name = 'vshard.router.bucket_id_mpcrc32',
996+
bucket_id = 1614,
997+
srv_with_data = 's1-master',
998+
srv_without_data = 's2-master',
999+
},
1000+
strcrc32_depends_on_ddl = {
1001+
space_name = 'customers_vshard_strcrc32',
1002+
sharding_func_name = 'vshard.router.bucket_id_strcrc32',
1003+
bucket_id = 477,
1004+
srv_with_data = 's2-master',
1005+
srv_without_data = 's1-master',
1006+
}
1007+
}
1008+
1009+
for name, case in pairs(vshard_cases) do
1010+
local test_name = ('test_vshard_%s'):format(name)
1011+
1012+
vshard_group[test_name] = function(g)
1013+
local space_name = case.space_name
1014+
1015+
if case.set_sharding_func_to_ddl_space then
1016+
local fieldno_sharding_func_name = 2
1017+
1018+
helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server)
1019+
server.net_box:call('set_sharding_func',
1020+
{space_name, fieldno_sharding_func_name, case.sharding_func_name})
1021+
end)
1022+
1023+
local record_exist, err = helpers.update_sharding_func_cache(g.cluster, space_name)
1024+
t.assert_equals(err, nil)
1025+
t.assert_equals(record_exist, true)
1026+
end
1027+
1028+
-- Insert a tuple.
1029+
local result, err = g.cluster.main_server.net_box:call(
1030+
'crud.insert', {space_name, {1, box.NULL, 'Ivan', 25}})
1031+
t.assert_equals(err, nil)
1032+
t.assert_equals(#result.rows, 1)
1033+
t.assert_equals(result.rows[1], {1, case.bucket_id, 'Ivan', 25})
1034+
1035+
-- There is a tuple on server that we inserted with crud.insert().
1036+
local conn_srv_with_data = g.cluster:server(case.srv_with_data).net_box
1037+
local result = conn_srv_with_data.space[space_name]:get({1})
1038+
t.assert_equals(result, {1, case.bucket_id, 'Ivan', 25})
1039+
1040+
-- There is no tuple on server that we inserted with crud.insert().
1041+
local conn_srv_without_data = g.cluster:server(case.srv_without_data).net_box
1042+
local result = conn_srv_without_data.space[space_name]:get({1})
1043+
t.assert_equals(result, nil)
1044+
1045+
local conditions = {{'==', 'id', 1}}
1046+
local result, err = g.cluster.main_server.net_box:call('crud.select', {
1047+
space_name, conditions,
1048+
})
1049+
1050+
t.assert_equals(err, nil)
1051+
t.assert_equals(#result.rows, 1)
1052+
t.assert_equals(result.rows[1], {1, case.bucket_id, 'Ivan', 25})
1053+
end
1054+
end

test/integration/ddl_sharding_key_test.lua

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -898,13 +898,16 @@ pgroup.test_update_cache_with_incorrect_key = function(g)
898898
customers = {parts = {{fieldno = 1}}},
899899
customers_G_func = {parts = {{fieldno = 1}}},
900900
customers_body_func = {parts = {{fieldno = 1}}},
901+
customers_empty_sharding_func = {parts = {{fieldno = 1}}},
901902
customers_age_key = {parts = {{fieldno = 4}}},
902903
customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}},
903904
customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}},
904905
customers_name_key = {parts = {{fieldno = 3}}},
905906
customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}},
906907
customers_name_key_uniq_index = {parts = {{fieldno = 3}}},
907908
customers_secondary_idx_name_key = {parts = {{fieldno = 3}}},
909+
customers_vshard_mpcrc32 = {parts = {{fieldno = 1}}},
910+
customers_vshard_strcrc32 = {parts = {{fieldno = 1}}}
908911
})
909912

910913
-- no error just warning
@@ -925,12 +928,15 @@ pgroup.test_update_cache_with_incorrect_key = function(g)
925928
customers = {parts = {{fieldno = 1}}},
926929
customers_G_func = {parts = {{fieldno = 1}}},
927930
customers_body_func = {parts = {{fieldno = 1}}},
931+
customers_empty_sharding_func = {parts = {{fieldno = 1}}},
928932
customers_age_key = {parts = {{fieldno = 4}}},
929933
customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}},
930934
customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}},
931935
customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}},
932936
customers_name_key_uniq_index = {parts = {{fieldno = 3}}},
933937
customers_secondary_idx_name_key = {parts = {{fieldno = 3}}},
938+
customers_vshard_mpcrc32 = {parts = {{fieldno = 1}}},
939+
customers_vshard_strcrc32 = {parts = {{fieldno = 1}}}
934940
})
935941

936942
-- get data from cache for space with incorrect sharding key
@@ -951,12 +957,15 @@ pgroup.test_update_cache_with_incorrect_key = function(g)
951957
customers = {parts = {{fieldno = 1}}},
952958
customers_G_func = {parts = {{fieldno = 1}}},
953959
customers_body_func = {parts = {{fieldno = 1}}},
960+
customers_empty_sharding_func = {parts = {{fieldno = 1}}},
954961
customers_age_key = {parts = {{fieldno = 4}}},
955962
customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}},
956963
customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}},
957964
customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}},
958965
customers_name_key_uniq_index = {parts = {{fieldno = 3}}},
959966
customers_secondary_idx_name_key = {parts = {{fieldno = 3}}},
967+
customers_vshard_mpcrc32 = {parts = {{fieldno = 1}}},
968+
customers_vshard_strcrc32 = {parts = {{fieldno = 1}}}
960969
})
961970
end
962971

0 commit comments

Comments
 (0)