Skip to content

Commit 21fe7bf

Browse files
committed
Add support of custom sharding func for crud methods
This commit introduces modifications in functions for fetching sharding metadata on storage and router to get sharding function. Function `sharding.key_get_bucket_id` calculates bucket_id using DDL sharding function if sharding function exist for specified space. Description in documentation, integration and unit tests are added as well. Closes #237
1 parent f997ae0 commit 21fe7bf

18 files changed

+845
-30
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2121
key specified with DDL schema or in `_ddl_sharding_key` space.
2222
NOTE: CRUD methods delete(), get() and update() requires that sharding key
2323
must be a part of primary key.
24+
* Support bucket id calculating using sharding func specified in
25+
DDL schema or in `_ddl_sharding_func` space.
2426

2527
### Fixed
2628

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,25 @@ documentation). As soon as sharding key for a certain space is available in
7777
automatically. Note that CRUD methods `delete()`, `get()` and `update()`
7878
requires that sharding key must be a part of primary key.
7979

80+
You can specify sharding function to calculate bucket_id with
81+
sharding func definition as a part of [DDL
82+
schema](https://github.com/tarantool/ddl#input-data-format)
83+
or insert manually to the space `_ddl_sharding_func`.
84+
85+
CRUD uses `strcrc32` as sharding function by default.
86+
The reason why using of `strcrc32` is undesirable is that
87+
this sharding function is not consistent for cdata numbers.
88+
In particular, it returns 3 different values for normal Lua
89+
numbers like 123, for `unsigned long long` cdata
90+
(like `123ULL`, or `ffi.cast('unsigned long long',
91+
123)`), and for `signed long long` cdata (like `123LL`, or
92+
`ffi.cast('long long', 123)`).
93+
94+
We cannot change default sharding function `strcrc32`
95+
due to backward compatibility concerns, but please consider
96+
using better alternatives for sharding function.
97+
`mpcrc32` is one of them.
98+
8099
Table below describe what operations supports custom sharding key:
81100

82101
| CRUD method | Sharding key support |
@@ -106,6 +125,13 @@ Current limitations for using custom sharding key:
106125
- `primary_index_fieldno_map` is not cached, see
107126
[#243](https://github.com/tarantool/crud/issues/243).
108127

128+
Current limitations for using custom sharding functions:
129+
130+
- It's not possible to update sharding functions automatically when schema is
131+
updated on storages, see
132+
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
133+
to do it manually with `require('crud.common.sharding_func').update_cache()`.
134+
109135
### Insert
110136

111137
```lua

crud/common/sharding/init.lua

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,27 @@ local errors = require('errors')
44
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
55

66
local utils = require('crud.common.utils')
7+
local dev_checks = require('crud.common.dev_checks')
78
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
89

910
local sharding = {}
1011

11-
function sharding.key_get_bucket_id(key, specified_bucket_id)
12+
function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
13+
dev_checks('string', '?', '?number|cdata')
14+
1215
if specified_bucket_id ~= nil then
1316
return specified_bucket_id
1417
end
1518

19+
local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
20+
if err ~= nil then
21+
return nil, err
22+
end
23+
24+
if sharding_func ~= nil then
25+
return sharding_func(key)
26+
end
27+
1628
return vshard.router.bucket_id_strcrc32(key)
1729
end
1830

@@ -31,7 +43,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
3143
end
3244
local key = utils.extract_key(tuple, sharding_index_parts)
3345

34-
return sharding.key_get_bucket_id(key)
46+
return sharding.key_get_bucket_id(space.name, key)
3547
end
3648

3749
function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)

crud/common/sharding/sharding_func.lua

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
local errors = require('errors')
2+
local log = require('log')
23

4+
local dev_checks = require('crud.common.dev_checks')
5+
local cache = require('crud.common.sharding.sharding_metadata_cache')
36
local utils = require('crud.common.utils')
47

58
local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false})
@@ -74,6 +77,32 @@ local function as_callable_object(sharding_func_def, space_name)
7477
)
7578
end
7679

80+
function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name)
81+
dev_checks('table', 'string')
82+
83+
local result_err
84+
85+
cache.sharding_func_map = {}
86+
for space_name, metadata in pairs(metadata_map) do
87+
if metadata.sharding_func_def ~= nil then
88+
local sharding_func, err = as_callable_object(metadata.sharding_func_def,
89+
space_name)
90+
if err ~= nil then
91+
if specified_space_name == space_name then
92+
result_err = err
93+
log.error(err)
94+
else
95+
log.warn(err)
96+
end
97+
end
98+
99+
cache.sharding_func_map[space_name] = sharding_func
100+
end
101+
end
102+
103+
return result_err
104+
end
105+
77106
sharding_func_module.internal = {
78107
as_callable_object = as_callable_object,
79108
is_callable = is_callable,

crud/common/sharding/sharding_metadata.lua

Lines changed: 92 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local call = require('crud.common.call')
55
local const = require('crud.common.const')
66
local dev_checks = require('crud.common.dev_checks')
77
local cache = require('crud.common.sharding.sharding_metadata_cache')
8+
local sharding_func = require('crud.common.sharding.sharding_func')
89
local sharding_key = require('crud.common.sharding.sharding_key')
910

1011
local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false})
@@ -38,25 +39,58 @@ local function locked(f)
3839
end
3940
end
4041

41-
-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
42-
-- not available on storage.
42+
local function extract_sharding_func_def(tuple)
43+
if not tuple then
44+
return nil
45+
end
46+
47+
local SPACE_SHARDING_FUNC_NAME_FIELDNO = 2
48+
local SPACE_SHARDING_FUNC_BODY_FIELDNO = 3
49+
50+
if tuple[SPACE_SHARDING_FUNC_BODY_FIELDNO] ~= nil then
51+
return {body = tuple[SPACE_SHARDING_FUNC_BODY_FIELDNO]}
52+
end
53+
54+
if tuple[SPACE_SHARDING_FUNC_NAME_FIELDNO] ~= nil then
55+
return tuple[SPACE_SHARDING_FUNC_NAME_FIELDNO]
56+
end
57+
58+
return nil
59+
end
60+
61+
-- Return a map with metadata or nil when spaces box.space._ddl_sharding_key and
62+
-- box.space._ddl_sharding_func are not available on storage.
4363
function sharding_metadata_module.fetch_on_storage()
4464
local sharding_key_space = box.space._ddl_sharding_key
45-
if sharding_key_space == nil then
65+
local sharding_func_space = box.space._ddl_sharding_func
66+
67+
if sharding_key_space == nil and sharding_func_space == nil then
4668
return nil
4769
end
4870

4971
local SPACE_NAME_FIELDNO = 1
5072
local SPACE_SHARDING_KEY_FIELDNO = 2
5173
local metadata_map = {}
52-
for _, tuple in sharding_key_space:pairs() do
53-
local space_name = tuple[SPACE_NAME_FIELDNO]
54-
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
55-
local space_format = box.space[space_name]:format()
56-
metadata_map[space_name] = {
57-
sharding_key_def = sharding_key_def,
58-
space_format = space_format,
59-
}
74+
75+
if sharding_key_space ~= nil then
76+
for _, tuple in sharding_key_space:pairs() do
77+
local space_name = tuple[SPACE_NAME_FIELDNO]
78+
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
79+
local space_format = box.space[space_name]:format()
80+
metadata_map[space_name] = {
81+
sharding_key_def = sharding_key_def,
82+
space_format = space_format,
83+
}
84+
end
85+
end
86+
87+
if sharding_func_space ~= nil then
88+
for _, tuple in sharding_func_space:pairs() do
89+
local space_name = tuple[SPACE_NAME_FIELDNO]
90+
local sharding_func_def = extract_sharding_func_def(tuple)
91+
metadata_map[space_name] = metadata_map[space_name] or {}
92+
metadata_map[space_name].sharding_func_def = sharding_func_def
93+
end
6094
end
6195

6296
return metadata_map
@@ -83,24 +117,21 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
83117
end
84118
if metadata_map == nil then
85119
cache[cache.SHARDING_KEY_MAP_NAME] = {}
120+
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
86121
return
87122
end
88123

89124
local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name)
90125
if err ~= nil then
91126
return err
92127
end
128+
129+
local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name)
130+
if err ~= nil then
131+
return err
132+
end
93133
end)
94134

95-
-- Get sharding index for a certain space.
96-
--
97-
-- Return:
98-
-- - sharding key as index object, when sharding key definition found on
99-
-- storage.
100-
-- - nil, when sharding key definition was not found on storage. Pay attention
101-
-- that nil without error is a successfull return value.
102-
-- - nil and error, when something goes wrong on fetching attempt.
103-
--
104135
local function fetch_on_router(space_name, metadata_map_name, timeout)
105136
if cache[metadata_map_name] ~= nil then
106137
return cache[metadata_map_name][space_name]
@@ -120,17 +151,58 @@ local function fetch_on_router(space_name, metadata_map_name, timeout)
120151
"Fetching sharding key for space '%s' is failed", space_name)
121152
end
122153

154+
-- Get sharding index for a certain space.
155+
--
156+
-- Return:
157+
-- - sharding key as index object, when sharding key definition found on
158+
-- storage.
159+
-- - nil, when sharding key definition was not found on storage. Pay attention
160+
-- that nil without error is a successfull return value.
161+
-- - nil and error, when something goes wrong on fetching attempt.
162+
--
123163
function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout)
124164
dev_checks('string', '?number')
125165

126166
return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
127167
end
128168

169+
-- Get sharding func for a certain space.
170+
--
171+
-- Return:
172+
-- - sharding func as callable object, when sharding func definition found on
173+
-- storage.
174+
-- - nil, when sharding func definition was not found on storage. Pay attention
175+
-- that nil without error is a successfull return value.
176+
-- - nil and error, when something goes wrong on fetching attempt.
177+
--
178+
function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout)
179+
dev_checks('string', '?number')
180+
181+
return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout)
182+
end
183+
129184
function sharding_metadata_module.update_sharding_key_cache(space_name)
130185
cache.drop_caches()
186+
187+
local _, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
188+
if err ~= nil then
189+
return nil, err
190+
end
191+
131192
return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
132193
end
133194

195+
function sharding_metadata_module.update_sharding_func_cache(space_name)
196+
cache.drop_caches()
197+
198+
local _, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
199+
if err ~= nil then
200+
return nil, err
201+
end
202+
203+
return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
204+
end
205+
134206
function sharding_metadata_module.init()
135207
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
136208
end

crud/common/sharding/sharding_metadata_cache.lua

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ local fiber = require('fiber')
33
local sharding_metadata_cache = {}
44

55
sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
6+
sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"
67
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
8+
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
79
sharding_metadata_cache.fetch_lock = fiber.channel(1)
810
sharding_metadata_cache.is_part_of_pk = {}
911

1012
function sharding_metadata_cache.drop_caches()
1113
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
14+
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
1215
if sharding_metadata_cache.fetch_lock ~= nil then
1316
sharding_metadata_cache.fetch_lock:close()
1417
end

crud/common/sharding_func.lua

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
2+
3+
local sharding_func_cache = {}
4+
5+
function sharding_func_cache.update_cache(space_name)
6+
return sharding_metadata_module.update_sharding_func_cache(space_name)
7+
end
8+
9+
return sharding_func_cache

crud/delete.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ local function call_delete_on_router(space_name, key, opts)
7474
end
7575
end
7676

77-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
77+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
78+
if err ~= nil then
79+
return nil, err
80+
end
81+
7882
local call_opts = {
7983
mode = 'write',
8084
timeout = opts.timeout,

crud/get.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ local function call_get_on_router(space_name, key, opts)
7777
end
7878
end
7979

80-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
80+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
81+
if err ~= nil then
82+
return nil, err
83+
end
84+
8185
local call_opts = {
8286
mode = opts.mode or 'read',
8387
prefer_replica = opts.prefer_replica,

crud/select/compat/select.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
103103
local perform_map_reduce = opts.force_map_call == true or
104104
(opts.bucket_id == nil and plan.sharding_key == nil)
105105
if not perform_map_reduce then
106-
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
106+
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
107+
if err ~= nil then
108+
return nil, err
109+
end
110+
107111
assert(bucket_id ~= nil)
108112

109113
local err

crud/select/compat/select_old.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
129129
local perform_map_reduce = opts.force_map_call == true or
130130
(opts.bucket_id == nil and plan.sharding_key == nil)
131131
if not perform_map_reduce then
132-
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
132+
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
133+
if err ~= nil then
134+
return nil, err
135+
end
136+
133137
assert(bucket_id ~= nil)
134138

135139
local err

crud/update.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ local function call_update_on_router(space_name, key, user_operations, opts)
110110
end
111111
end
112112

113-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
113+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
114+
if err ~= nil then
115+
return nil, err
116+
end
117+
114118
local call_opts = {
115119
mode = 'write',
116120
timeout = opts.timeout,

deps.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,9 @@ tarantoolctl rocks install https://github.com/raw/moteus/lua-path/mas
1616
tarantoolctl rocks install https://github.com/raw/moteus/luacov-coveralls/master/rockspecs/luacov-coveralls-scm-0.rockspec
1717

1818
tarantoolctl rocks install cartridge 2.7.1
19+
20+
# cartridge depends on ddl 1.5.0-1 (version without
21+
# sharding func support), install latest version
22+
tarantoolctl rocks install ddl scm-1
23+
1924
tarantoolctl rocks make

0 commit comments

Comments
 (0)