Skip to content

Commit 45f9a7d

Browse files
committed
Add support of custom sharding func for crud methods
This commit introduces modifications in functions for fetching sharding metadata on storage and router to get sharding function. Function `sharding.key_get_bucket_id` calculates bucket_id using DDL sharding function if sharding function exist for specified space. Description in documentation, integration and unit tests are added as well. Closes #237
1 parent ef1bc5b commit 45f9a7d

18 files changed

+823
-30
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2121
key specified with DDL schema or in `_ddl_sharding_key` space.
2222
NOTE: CRUD methods delete(), get() and update() requires that sharding key
2323
must be a part of primary key.
24+
* Support bucket id calculating using sharding func specified in
25+
DDL schema or in `_ddl_sharding_func` space.
2426

2527
### Fixed
2628

README.md

+26
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,25 @@ documentation). As soon as sharding key for a certain space is available in
7777
automatically. Note that CRUD methods `delete()`, `get()` and `update()`
7878
requires that sharding key must be a part of primary key.
7979

80+
You can specify sharding function to calculate bucket_id with
81+
sharding func definition as a part of [DDL
82+
schema](https://github.com/tarantool/ddl#input-data-format)
83+
or insert manually to the space `_ddl_sharding_func`.
84+
85+
CRUD uses `strcrc32` as sharding function by default.
86+
The reason why using of `strcrc32` is undesirable is that
87+
this sharding function is not consistent for cdata numbers.
88+
In particular, it returns 3 different values for normal Lua
89+
numbers like 123, for `unsigned long long` cdata
90+
(like `123ULL`, or `ffi.cast('unsigned long long',
91+
123)`), and for `signed long long` cdata (like `123LL`, or
92+
`ffi.cast('long long', 123)`).
93+
94+
We cannot change default sharding function `strcrc32`
95+
due to backward compatibility concerns, but please consider
96+
using better alternatives for sharding function.
97+
`mpcrc32` is one of them.
98+
8099
Table below describe what operations supports custom sharding key:
81100

82101
| CRUD method | Sharding key support |
@@ -106,6 +125,13 @@ Current limitations for using custom sharding key:
106125
- `primary_index_fieldno_map` is not cached, see
107126
[#243](https://github.com/tarantool/crud/issues/243).
108127

128+
Current limitations for using custom sharding functions:
129+
130+
- It's not possible to update sharding functions automatically when schema is
131+
updated on storages, see
132+
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
133+
to do it manually with `require('crud.common.sharding_func').update_cache()`.
134+
109135
### Insert
110136

111137
```lua

crud/common/sharding/init.lua

+14-2
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,27 @@ local errors = require('errors')
44
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
55

66
local utils = require('crud.common.utils')
7+
local dev_checks = require('crud.common.dev_checks')
78
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
89

910
local sharding = {}
1011

11-
function sharding.key_get_bucket_id(key, specified_bucket_id)
12+
function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
13+
dev_checks('string', '?', '?number|cdata')
14+
1215
if specified_bucket_id ~= nil then
1316
return specified_bucket_id
1417
end
1518

19+
local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
20+
if err ~= nil then
21+
return nil, err
22+
end
23+
24+
if sharding_func ~= nil then
25+
return sharding_func(key)
26+
end
27+
1628
return vshard.router.bucket_id_strcrc32(key)
1729
end
1830

@@ -31,7 +43,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
3143
end
3244
local key = utils.extract_key(tuple, sharding_index_parts)
3345

34-
return sharding.key_get_bucket_id(key)
46+
return sharding.key_get_bucket_id(space.name, key)
3547
end
3648

3749
function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)

crud/common/sharding/sharding_func.lua

+26
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,32 @@ local function as_callable_object(sharding_func_def, space_name)
7777
)
7878
end
7979

80+
function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name)
81+
dev_checks('table', 'string')
82+
83+
local result_err
84+
85+
cache.sharding_func_map = {}
86+
for space_name, metadata in pairs(metadata_map) do
87+
if metadata.sharding_func_def ~= nil then
88+
local sharding_func, err = as_callable_object(metadata.sharding_func_def,
89+
space_name)
90+
if err ~= nil then
91+
if specified_space_name == space_name then
92+
result_err = err
93+
log.error(err)
94+
else
95+
log.warn(err)
96+
end
97+
end
98+
99+
cache.sharding_func_map[space_name] = sharding_func
100+
end
101+
end
102+
103+
return result_err
104+
end
105+
80106
sharding_func_module.internal = {
81107
as_callable_object = as_callable_object,
82108
is_callable = is_callable,

crud/common/sharding/sharding_metadata.lua

+73-20
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local call = require('crud.common.call')
55
local const = require('crud.common.const')
66
local dev_checks = require('crud.common.dev_checks')
77
local cache = require('crud.common.sharding.sharding_metadata_cache')
8+
local sharding_func = require('crud.common.sharding.sharding_func')
89
local sharding_key = require('crud.common.sharding.sharding_key')
910

1011
local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false})
@@ -57,25 +58,39 @@ local function extract_sharding_func_def(tuple)
5758
return nil
5859
end
5960

60-
-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
61-
-- not available on storage.
61+
-- Return a map with metadata or nil when spaces box.space._ddl_sharding_key and
62+
-- box.space._ddl_sharding_func are not available on storage.
6263
function sharding_metadata_module.fetch_on_storage()
6364
local sharding_key_space = box.space._ddl_sharding_key
64-
if sharding_key_space == nil then
65+
local sharding_func_space = box.space._ddl_sharding_func
66+
67+
if sharding_key_space == nil and sharding_func_space == nil then
6568
return nil
6669
end
6770

6871
local SPACE_NAME_FIELDNO = 1
6972
local SPACE_SHARDING_KEY_FIELDNO = 2
7073
local metadata_map = {}
71-
for _, tuple in sharding_key_space:pairs() do
72-
local space_name = tuple[SPACE_NAME_FIELDNO]
73-
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
74-
local space_format = box.space[space_name]:format()
75-
metadata_map[space_name] = {
76-
sharding_key_def = sharding_key_def,
77-
space_format = space_format,
78-
}
74+
75+
if sharding_key_space ~= nil then
76+
for _, tuple in sharding_key_space:pairs() do
77+
local space_name = tuple[SPACE_NAME_FIELDNO]
78+
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
79+
local space_format = box.space[space_name]:format()
80+
metadata_map[space_name] = {
81+
sharding_key_def = sharding_key_def,
82+
space_format = space_format,
83+
}
84+
end
85+
end
86+
87+
if sharding_func_space ~= nil then
88+
for _, tuple in sharding_func_space:pairs() do
89+
local space_name = tuple[SPACE_NAME_FIELDNO]
90+
local sharding_func_def = extract_sharding_func_def(tuple)
91+
metadata_map[space_name] = metadata_map[space_name] or {}
92+
metadata_map[space_name].sharding_func_def = sharding_func_def
93+
end
7994
end
8095

8196
return metadata_map
@@ -102,24 +117,21 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
102117
end
103118
if metadata_map == nil then
104119
cache[cache.SHARDING_KEY_MAP_NAME] = {}
120+
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
105121
return
106122
end
107123

108124
local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name)
109125
if err ~= nil then
110126
return err
111127
end
128+
129+
local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name)
130+
if err ~= nil then
131+
return err
132+
end
112133
end)
113134

114-
-- Get sharding index for a certain space.
115-
--
116-
-- Return:
117-
-- - sharding key as index object, when sharding key definition found on
118-
-- storage.
119-
-- - nil, when sharding key definition was not found on storage. Pay attention
120-
-- that nil without error is a successfull return value.
121-
-- - nil and error, when something goes wrong on fetching attempt.
122-
--
123135
local function fetch_on_router(space_name, metadata_map_name, timeout)
124136
if cache[metadata_map_name] ~= nil then
125137
return cache[metadata_map_name][space_name]
@@ -139,17 +151,58 @@ local function fetch_on_router(space_name, metadata_map_name, timeout)
139151
"Fetching sharding key for space '%s' is failed", space_name)
140152
end
141153

154+
-- Get sharding index for a certain space.
155+
--
156+
-- Return:
157+
-- - sharding key as index object, when sharding key definition found on
158+
-- storage.
159+
-- - nil, when sharding key definition was not found on storage. Pay attention
160+
-- that nil without error is a successfull return value.
161+
-- - nil and error, when something goes wrong on fetching attempt.
162+
--
142163
function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout)
143164
dev_checks('string', '?number')
144165

145166
return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
146167
end
147168

169+
-- Get sharding func for a certain space.
170+
--
171+
-- Return:
172+
-- - sharding func as callable object, when sharding func definition found on
173+
-- storage.
174+
-- - nil, when sharding func definition was not found on storage. Pay attention
175+
-- that nil without error is a successfull return value.
176+
-- - nil and error, when something goes wrong on fetching attempt.
177+
--
178+
function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout)
179+
dev_checks('string', '?number')
180+
181+
return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout)
182+
end
183+
148184
function sharding_metadata_module.update_sharding_key_cache(space_name)
149185
cache.drop_caches()
186+
187+
local _, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
188+
if err ~= nil then
189+
return nil, err
190+
end
191+
150192
return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
151193
end
152194

195+
function sharding_metadata_module.update_sharding_func_cache(space_name)
196+
cache.drop_caches()
197+
198+
local _, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
199+
if err ~= nil then
200+
return nil, err
201+
end
202+
203+
return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
204+
end
205+
153206
function sharding_metadata_module.init()
154207
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
155208
end

crud/common/sharding/sharding_metadata_cache.lua

+3
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ local fiber = require('fiber')
33
local sharding_metadata_cache = {}
44

55
sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
6+
sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"
67
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
8+
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
79
sharding_metadata_cache.fetch_lock = fiber.channel(1)
810
sharding_metadata_cache.is_part_of_pk = {}
911

1012
function sharding_metadata_cache.drop_caches()
1113
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
14+
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
1215
if sharding_metadata_cache.fetch_lock ~= nil then
1316
sharding_metadata_cache.fetch_lock:close()
1417
end

crud/common/sharding_func.lua

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
2+
3+
local sharding_func_cache = {}
4+
5+
function sharding_func_cache.update_cache(space_name)
6+
return sharding_metadata_module.update_sharding_func_cache(space_name)
7+
end
8+
9+
return sharding_func_cache

crud/delete.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ local function call_delete_on_router(space_name, key, opts)
7474
end
7575
end
7676

77-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
77+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
78+
if err ~= nil then
79+
return nil, err
80+
end
81+
7882
local call_opts = {
7983
mode = 'write',
8084
timeout = opts.timeout,

crud/get.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ local function call_get_on_router(space_name, key, opts)
7777
end
7878
end
7979

80-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
80+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
81+
if err ~= nil then
82+
return nil, err
83+
end
84+
8185
local call_opts = {
8286
mode = opts.mode or 'read',
8387
prefer_replica = opts.prefer_replica,

crud/select/compat/select.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
103103
local perform_map_reduce = opts.force_map_call == true or
104104
(opts.bucket_id == nil and plan.sharding_key == nil)
105105
if not perform_map_reduce then
106-
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
106+
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
107+
if err ~= nil then
108+
return nil, err
109+
end
110+
107111
assert(bucket_id ~= nil)
108112

109113
local err

crud/select/compat/select_old.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
129129
local perform_map_reduce = opts.force_map_call == true or
130130
(opts.bucket_id == nil and plan.sharding_key == nil)
131131
if not perform_map_reduce then
132-
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
132+
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
133+
if err ~= nil then
134+
return nil, err
135+
end
136+
133137
assert(bucket_id ~= nil)
134138

135139
local err

crud/update.lua

+5-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ local function call_update_on_router(space_name, key, user_operations, opts)
110110
end
111111
end
112112

113-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
113+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
114+
if err ~= nil then
115+
return nil, err
116+
end
117+
114118
local call_opts = {
115119
mode = 'write',
116120
timeout = opts.timeout,

deps.sh

+5
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,9 @@ tarantoolctl rocks install https://github.com/raw/moteus/lua-path/mas
1616
tarantoolctl rocks install https://github.com/raw/moteus/luacov-coveralls/master/rockspecs/luacov-coveralls-scm-0.rockspec
1717

1818
tarantoolctl rocks install cartridge 2.7.1
19+
20+
# cartridge depends on ddl 1.5.0-1 (version without
21+
# sharding func support), install latest version
22+
tarantoolctl rocks install ddl scm-1
23+
1924
tarantoolctl rocks make

0 commit comments

Comments
 (0)