Skip to content

Commit 1e7bf0b

Browse files
committed
Add support of custom sharding func for crud methods
This commit introduces modifications in functions for fetching sharding metadata on storage and router to get sharding function. Function `sharding.key_get_bucket_id` calculates bucket_id using DDL sharding function if sharding function exist for specified space. Description in documentation, integration and unit tests are added as well. Closes #237
1 parent 676ea4c commit 1e7bf0b

17 files changed

+794
-30
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2121
key specified with DDL schema or in `_ddl_sharding_key` space.
2222
NOTE: CRUD methods delete(), get() and update() requires that sharding key
2323
must be a part of primary key.
24+
* Support bucket id calculating using sharding func specified in
25+
DDL schema or in `_ddl_sharding_func` space.
2426

2527
### Fixed
2628

README.md

+19
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,25 @@ documentation). As soon as sharding key for a certain space is available in
7777
automatically. Note that CRUD methods `delete()`, `get()` and `update()`
7878
requires that sharding key must be a part of primary key.
7979

80+
You can specify sharding function to calculate bucket_id with
81+
sharding func definition as a part of [DDL
82+
schema](https://github.com/tarantool/ddl#input-data-format)
83+
or insert manually to the space `_ddl_sharding_func`.
84+
85+
CRUD uses `strcrc32` as sharding function by default.
86+
The reason why using of `strcrc32` is undesirable is that
87+
this sharding function is not consistent for cdata numbers.
88+
In particular, it returns 3 different values for normal Lua
89+
numbers like 123, for `unsigned long long` cdata
90+
(like `123ULL`, or `ffi.cast('unsigned long long',
91+
123)`), and for `signed long long` cdata (like `123LL`, or
92+
`ffi.cast('long long', 123)`).
93+
94+
We cannot change default sharding function `strcrc32`
95+
due to backward compatibility concerns, but please consider
96+
using better alternatives for sharding function.
97+
`mpcrc32` is one of them.
98+
8099
Table below describe what operations supports custom sharding key:
81100

82101
| CRUD method | Sharding key support |

crud/common/sharding/sharding.lua

+14-2
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,27 @@ local errors = require('errors')
44
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
55

66
local utils = require('crud.common.utils')
7+
local dev_checks = require('crud.common.dev_checks')
78
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
89

910
local sharding = {}
1011

11-
function sharding.key_get_bucket_id(key, specified_bucket_id)
12+
function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
13+
dev_checks('string', '?', '?number|cdata')
14+
1215
if specified_bucket_id ~= nil then
1316
return specified_bucket_id
1417
end
1518

19+
local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
20+
if err ~= nil then
21+
return nil, err
22+
end
23+
24+
if sharding_func ~= nil then
25+
return sharding_func(key)
26+
end
27+
1628
return vshard.router.bucket_id_strcrc32(key)
1729
end
1830

@@ -31,7 +43,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
3143
end
3244
local key = utils.extract_key(tuple, sharding_index_parts)
3345

34-
return sharding.key_get_bucket_id(key)
46+
return sharding.key_get_bucket_id(space.name, key)
3547
end
3648

3749
function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)

crud/common/sharding/sharding_func.lua

+25
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,31 @@ function sharding_func_module.extract_function_def(tuple)
9393
return nil
9494
end
9595

96+
function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name)
97+
dev_checks('table', 'string')
98+
99+
local result_err
100+
101+
cache.sharding_func_map = {}
102+
for space_name, metadata in pairs(metadata_map) do
103+
if metadata.sharding_func_def ~= nil then
104+
local sharding_func, err = as_callable_object(metadata.sharding_func_def,
105+
space_name)
106+
if err ~= nil then
107+
if specified_space_name == space_name then
108+
result_err = err
109+
else
110+
log.warn(err)
111+
end
112+
end
113+
114+
cache.sharding_func_map[space_name] = sharding_func
115+
end
116+
end
117+
118+
return result_err
119+
end
120+
96121
sharding_func_module.internal = {
97122
as_callable_object = as_callable_object,
98123
is_callable = is_callable,

crud/common/sharding/sharding_metadata.lua

+61-20
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local call = require('crud.common.call')
55
local const = require('crud.common.const')
66
local dev_checks = require('crud.common.dev_checks')
77
local cache = require('crud.common.sharding.sharding_metadata_cache')
8+
local sharding_func = require('crud.common.sharding.sharding_func')
89
local sharding_key = require('crud.common.sharding.sharding_key')
910

1011
local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false})
@@ -38,25 +39,39 @@ local function locked(f)
3839
end
3940
end
4041

41-
-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
42-
-- not available on storage.
42+
-- Return a map with metadata or nil when spaces box.space._ddl_sharding_key and
43+
-- box.space._ddl_sharding_func are not available on storage.
4344
function sharding_metadata_module.fetch_on_storage()
4445
local sharding_key_space = box.space._ddl_sharding_key
45-
if sharding_key_space == nil then
46+
local sharding_func_space = box.space._ddl_sharding_func
47+
48+
if sharding_key_space == nil and sharding_func_space == nil then
4649
return nil
4750
end
4851

4952
local SPACE_NAME_FIELDNO = 1
5053
local SPACE_SHARDING_KEY_FIELDNO = 2
5154
local metadata_map = {}
52-
for _, tuple in sharding_key_space:pairs() do
53-
local space_name = tuple[SPACE_NAME_FIELDNO]
54-
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
55-
local space_format = box.space[space_name]:format()
56-
metadata_map[space_name] = {
57-
sharding_key_def = sharding_key_def,
58-
space_format = space_format,
59-
}
55+
56+
if sharding_key_space ~= nil then
57+
for _, tuple in sharding_key_space:pairs() do
58+
local space_name = tuple[SPACE_NAME_FIELDNO]
59+
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
60+
local space_format = box.space[space_name]:format()
61+
metadata_map[space_name] = {
62+
sharding_key_def = sharding_key_def,
63+
space_format = space_format,
64+
}
65+
end
66+
end
67+
68+
if sharding_func_space ~= nil then
69+
for _, tuple in sharding_func_space:pairs() do
70+
local space_name = tuple[SPACE_NAME_FIELDNO]
71+
local sharding_func_def = sharding_func.extract_function_def(tuple)
72+
metadata_map[space_name] = metadata_map[space_name] or {}
73+
metadata_map[space_name].sharding_func_def = sharding_func_def
74+
end
6075
end
6176

6277
return metadata_map
@@ -83,24 +98,21 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
8398
end
8499
if metadata_map == nil then
85100
cache[cache.SHARDING_KEY_MAP_NAME] = {}
101+
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
86102
return
87103
end
88104

89105
local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name)
90106
if err ~= nil then
91107
return err
92108
end
109+
110+
local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name)
111+
if err ~= nil then
112+
return err
113+
end
93114
end)
94115

95-
-- Get sharding index for a certain space.
96-
--
97-
-- Return:
98-
-- - sharding key as index object, when sharding key definition found on
99-
-- storage.
100-
-- - nil, when sharding key definition was not found on storage. Pay attention
101-
-- that nil without error is a successfull return value.
102-
-- - nil and error, when something goes wrong on fetching attempt.
103-
--
104116
local function fetch_on_router(space_name, metadata_map_name, timeout)
105117
if cache[metadata_map_name] ~= nil then
106118
return cache[metadata_map_name][space_name]
@@ -120,17 +132,46 @@ local function fetch_on_router(space_name, metadata_map_name, timeout)
120132
"Fetching sharding key for space '%s' is failed", space_name)
121133
end
122134

135+
-- Get sharding index for a certain space.
136+
--
137+
-- Return:
138+
-- - sharding key as index object, when sharding key definition found on
139+
-- storage.
140+
-- - nil, when sharding key definition was not found on storage. Pay attention
141+
-- that nil without error is a successfull return value.
142+
-- - nil and error, when something goes wrong on fetching attempt.
143+
--
123144
function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout)
124145
dev_checks('string', '?number')
125146

126147
return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
127148
end
128149

150+
-- Get sharding func for a certain space.
151+
--
152+
-- Return:
153+
-- - sharding func as callable object, when sharding func definition found on
154+
-- storage.
155+
-- - nil, when sharding func definition was not found on storage. Pay attention
156+
-- that nil without error is a successfull return value.
157+
-- - nil and error, when something goes wrong on fetching attempt.
158+
--
159+
function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout)
160+
dev_checks('string', '?number')
161+
162+
return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout)
163+
end
164+
129165
function sharding_metadata_module.update_sharding_key_cache(space_name)
130166
cache.drop_caches()
131167
return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
132168
end
133169

170+
function sharding_metadata_module.update_sharding_func_cache(space_name)
171+
cache.drop_caches()
172+
return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
173+
end
174+
134175
function sharding_metadata_module.init()
135176
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
136177
end

crud/common/sharding/sharding_metadata_cache.lua

+3
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ local fiber = require('fiber')
33
local sharding_metadata_cache = {}
44

55
sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
6+
sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"
67
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
8+
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
79
sharding_metadata_cache.fetch_lock = fiber.channel(1)
810
sharding_metadata_cache.is_part_of_pk = {}
911

1012
function sharding_metadata_cache.drop_caches()
1113
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
14+
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
1215
if sharding_metadata_cache.fetch_lock ~= nil then
1316
sharding_metadata_cache.fetch_lock:close()
1417
end

crud/delete.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ local function call_delete_on_router(space_name, key, opts)
7474
end
7575
end
7676

77-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
77+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
78+
if err ~= nil then
79+
return nil, err
80+
end
81+
7882
local call_opts = {
7983
mode = 'write',
8084
timeout = opts.timeout,

crud/get.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ local function call_get_on_router(space_name, key, opts)
7777
end
7878
end
7979

80-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
80+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
81+
if err ~= nil then
82+
return nil, err
83+
end
84+
8185
local call_opts = {
8286
mode = opts.mode or 'read',
8387
prefer_replica = opts.prefer_replica,

crud/select/compat/select.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
103103
local perform_map_reduce = opts.force_map_call == true or
104104
(opts.bucket_id == nil and plan.sharding_key == nil)
105105
if not perform_map_reduce then
106-
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
106+
local bucket_id = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
107+
if err ~= nil then
108+
return nil, err
109+
end
110+
107111
assert(bucket_id ~= nil)
108112

109113
local err

crud/select/compat/select_old.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
129129
local perform_map_reduce = opts.force_map_call == true or
130130
(opts.bucket_id == nil and plan.sharding_key == nil)
131131
if not perform_map_reduce then
132-
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
132+
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
133+
if err ~= nil then
134+
return nil, err
135+
end
136+
133137
assert(bucket_id ~= nil)
134138

135139
local err

crud/update.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ local function call_update_on_router(space_name, key, user_operations, opts)
110110
end
111111
end
112112

113-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
113+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
114+
if err ~= nil then
115+
return nil, err
116+
end
117+
114118
local call_opts = {
115119
mode = 'write',
116120
timeout = opts.timeout,

deps.sh

+5
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,9 @@ tarantoolctl rocks install https://github.com/raw/moteus/lua-path/mas
1616
tarantoolctl rocks install https://github.com/raw/moteus/luacov-coveralls/master/rockspecs/luacov-coveralls-scm-0.rockspec
1717

1818
tarantoolctl rocks install cartridge 2.7.1
19+
20+
# cartridge depends on ddl 1.5.0-1 (version without
21+
# sharding func support), install latest version
22+
tarantoolctl rocks install ddl scm-1
23+
1924
tarantoolctl rocks make

test/entrypoint/srv_ddl.lua

+29
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,18 @@ local cartridge = require('cartridge')
99
local ddl = require('ddl')
1010

1111
package.preload['customers-storage'] = function()
12+
-- set sharding func in dot.notation
13+
-- in _G for sharding func tests
14+
local some_module = {
15+
sharding_func =
16+
function(key)
17+
if key ~= nil and key[1] ~= nil then
18+
return key[1] % 10
19+
end
20+
end
21+
}
22+
rawset(_G, 'some_module', some_module)
23+
1224
return {
1325
role_name = 'customers-storage',
1426
init = function()
@@ -131,6 +143,18 @@ package.preload['customers-storage'] = function()
131143
table.insert(customers_name_age_key_three_fields_index_schema.indexes, bucket_id_index)
132144
table.insert(customers_name_age_key_three_fields_index_schema.indexes, three_fields_index)
133145

146+
local customers_id_key_schema = table.deepcopy(customers_schema)
147+
customers_id_key_schema.sharding_key = {'id'}
148+
table.insert(customers_id_key_schema.indexes, primary_index)
149+
table.insert(customers_id_key_schema.indexes, bucket_id_index)
150+
table.insert(customers_id_key_schema.indexes, name_index)
151+
152+
local customers_body_func_schema = table.deepcopy(customers_id_key_schema)
153+
customers_body_func_schema.sharding_func = { body = 'function(key) return key[1] % 10 end' }
154+
155+
local customers_G_func_schema = table.deepcopy(customers_id_key_schema)
156+
customers_G_func_schema.sharding_func = 'some_module.sharding_func'
157+
134158
local schema = {
135159
spaces = {
136160
customers_name_key = customers_name_key_schema,
@@ -140,6 +164,8 @@ package.preload['customers-storage'] = function()
140164
customers_age_key = customers_age_key_schema,
141165
customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema,
142166
customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema,
167+
customers_G_func = customers_G_func_schema,
168+
customers_body_func = customers_body_func_schema,
143169
}
144170
}
145171

@@ -154,6 +180,9 @@ package.preload['customers-storage'] = function()
154180
local fieldno_sharding_key = 2
155181
box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}})
156182
end)
183+
rawset(_G, 'set_sharding_func', function(space_name, fieldno_sharding_func, sharding_func_def)
184+
box.space['_ddl_sharding_func']:update(space_name, {{'=', fieldno_sharding_func, sharding_func_def}})
185+
end)
157186
end,
158187
}
159188
end

0 commit comments

Comments
 (0)