Skip to content

Commit 8ff2ec3

Browse files
committed
Allow to specify bucket_id
* Bucket ID can be specified using `opts.bucket_id` for all operations * For operations that accepts tuple/object bucket ID can be specified as tuple/object field as well as `opts.bucket_id` value
1 parent bea9a83 commit 8ff2ec3

File tree

11 files changed

+776
-58
lines changed

11 files changed

+776
-58
lines changed

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ to make `crud` functions callable via `net.box`.
1717
**Notes:**
1818

1919
* A space should have a format.
20-
* `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
20+
* By default, `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
2121
where `key` is the primary key value.
22+
Custom bucket ID can be specified as `opts.bucket_id` for each operation.
23+
For operations that accepts tuple/object bucket ID can be specified as
24+
tuple/object field as well as `opts.bucket_id` value.
2225

2326
### Insert
2427

@@ -35,6 +38,7 @@ where:
3538
* `tuple` / `object` (`table`) - tuple/object to insert
3639
* `opts`:
3740
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
41+
* `bucket_id` (`?number|cdata`) - bucket ID
3842

3943
Returns metadata and array contains one inserted row, error.
4044

@@ -77,6 +81,7 @@ where:
7781
* `key` (`any`) - primary key value
7882
* `opts`:
7983
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
84+
* `bucket_id` (`?number|cdata`) - bucket ID
8085

8186
Returns metadata and array contains one row, error.
8287

@@ -108,6 +113,7 @@ where:
108113
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update)
109114
* `opts`:
110115
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
116+
* `bucket_id` (`?number|cdata`) - bucket ID
111117

112118
Returns metadata and array contains one updated row, error.
113119

@@ -138,6 +144,7 @@ where:
138144
* `key` (`any`) - primary key value
139145
* `opts`:
140146
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
147+
* `bucket_id` (`?number|cdata`) - bucket ID
141148

142149
Returns metadata and array contains one deleted row (empty for vinyl), error.
143150

@@ -170,6 +177,7 @@ where:
170177
* `tuple` / `object` (`table`) - tuple/object to insert or replace exist one
171178
* `opts`:
172179
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
180+
* `bucket_id` (`?number|cdata`) - bucket ID
173181

174182
Returns inserted or replaced rows and metadata or nil with error.
175183

@@ -216,6 +224,7 @@ where:
216224
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update) if there is an existing tuple which matches the key fields of tuple
217225
* `opts`:
218226
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
227+
* `bucket_id` (`?number|cdata`) - bucket ID
219228

220229
Returns metadata and empty array of rows or nil, error.
221230

@@ -269,6 +278,8 @@ where:
269278
* `after` (`?table`) - tuple after which objects should be selected
270279
* `batch_size` (`?number`) - number of tuples to process per one request to storage
271280
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
281+
* `bucket_id` (`?number|cdata`) - bucket ID
282+
(is used when select by full primary key is performed)
272283

273284
Returns metadata and array of rows, error.
274285

crud/common/sharding.lua

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
local vshard = require('vshard')
2+
local errors = require('errors')
3+
4+
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
5+
6+
local utils = require('crud.common.utils')
7+
8+
local sharding = {}
9+
10+
function sharding.key_get_bucket_id(key, specified_bucket_id)
11+
if specified_bucket_id ~= nil then
12+
return specified_bucket_id
13+
end
14+
15+
return vshard.router.bucket_id_strcrc32(key)
16+
end
17+
18+
function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
19+
if specified_bucket_id ~= nil then
20+
return specified_bucket_id
21+
end
22+
23+
local key = utils.extract_key(tuple, space.index[0].parts)
24+
return sharding.key_get_bucket_id(key)
25+
end
26+
27+
function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
28+
local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space)
29+
if err ~= nil then
30+
return nil, BucketIDError:new("Failed to get bucket ID fielno:", err)
31+
end
32+
33+
if specified_bucket_id ~= nil then
34+
if tuple[bucket_id_fieldno] == nil then
35+
tuple[bucket_id_fieldno] = specified_bucket_id
36+
else
37+
if tuple[bucket_id_fieldno] ~= specified_bucket_id then
38+
return nil, BucketIDError:new(
39+
"Tuple and opts.bucket_id contain different bucket_id values: %s and %s",
40+
tuple[bucket_id_fieldno], specified_bucket_id
41+
)
42+
end
43+
end
44+
end
45+
46+
if tuple[bucket_id_fieldno] == nil then
47+
tuple[bucket_id_fieldno] = sharding.tuple_get_bucket_id(tuple, space)
48+
end
49+
50+
local bucket_id = tuple[bucket_id_fieldno]
51+
return bucket_id
52+
end
53+
54+
return sharding

crud/delete.lua

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local vshard = require('vshard')
44

55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
7+
local sharding = require('crud.common.sharding')
78
local dev_checks = require('crud.common.dev_checks')
89

910
local DeleteError = errors.new_class('Delete', {capture_stack = false})
@@ -41,18 +42,22 @@ end
4142
-- @tparam ?number opts.timeout
4243
-- Function call timeout
4344
--
45+
-- @tparam ?number opts.bucket_id
46+
-- Bucket ID
47+
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
48+
--
4449
-- @return[1] object
4550
-- @treturn[2] nil
4651
-- @treturn[2] table Error description
4752
--
4853
function delete.call(space_name, key, opts)
4954
checks('string', '?', {
5055
timeout = '?number',
56+
bucket_id = '?number|cdata',
5157
})
5258

5359
opts = opts or {}
5460

55-
5661
local space = utils.get_space(space_name, vshard.router.routeall())
5762
if space == nil then
5863
return nil, DeleteError:new("Space %q doesn't exist", space_name)
@@ -62,7 +67,7 @@ function delete.call(space_name, key, opts)
6267
key = key:totable()
6368
end
6469

65-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
70+
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
6671
local result, err = call.rw_single(
6772
bucket_id, DELETE_FUNC_NAME,
6873
{space_name, key}, {timeout = opts.timeout}

crud/get.lua

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local vshard = require('vshard')
44

55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
7+
local sharding = require('crud.common.sharding')
78
local dev_checks = require('crud.common.dev_checks')
89

910
local GetError = errors.new_class('Get', {capture_stack = false})
@@ -41,13 +42,18 @@ end
4142
-- @tparam ?number opts.timeout
4243
-- Function call timeout
4344
--
45+
-- @tparam ?number opts.bucket_id
46+
-- Bucket ID
47+
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
48+
--
4449
-- @return[1] object
4550
-- @treturn[2] nil
4651
-- @treturn[2] table Error description
4752
--
4853
function get.call(space_name, key, opts)
4954
checks('string', '?', {
5055
timeout = '?number',
56+
bucket_id = '?number|cdata',
5157
})
5258

5359
opts = opts or {}
@@ -61,7 +67,7 @@ function get.call(space_name, key, opts)
6167
key = key:totable()
6268
end
6369

64-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
70+
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
6571
-- We don't use callro() here, because if the replication is
6672
-- async, there could be a lag between master and replica, so a
6773
-- connector which sequentially calls put() and then get() may get

crud/insert.lua

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local vshard = require('vshard')
44

55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
7+
local sharding = require('crud.common.sharding')
78
local dev_checks = require('crud.common.dev_checks')
89

910
local InsertError = errors.new_class('Insert', {capture_stack = false})
@@ -40,13 +41,18 @@ end
4041
-- @tparam ?number opts.timeout
4142
-- Function call timeout
4243
--
44+
-- @tparam ?number opts.bucket_id
45+
-- Bucket ID
46+
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
47+
--
4348
-- @return[1] tuple
4449
-- @treturn[2] nil
4550
-- @treturn[2] table Error description
4651
--
4752
function insert.tuple(space_name, tuple, opts)
4853
checks('string', 'table', {
4954
timeout = '?number',
55+
bucket_id = '?number|cdata',
5056
})
5157

5258
opts = opts or {}
@@ -57,19 +63,11 @@ function insert.tuple(space_name, tuple, opts)
5763
end
5864
local space_format = space:format()
5965

60-
local key = utils.extract_key(tuple, space.index[0].parts)
61-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
62-
local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space)
66+
local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id)
6367
if err ~= nil then
64-
return nil, err
65-
end
66-
67-
if tuple[bucket_id_fieldno] ~= nil then
68-
return nil, InsertError:new("Unexpected value (%s) at field %s (bucket_id)",
69-
tuple[bucket_id_fieldno], bucket_id_fieldno)
68+
return nil, InsertError:new("Failed to get bucket ID: %s", err)
7069
end
7170

72-
tuple[bucket_id_fieldno] = bucket_id
7371
local result, err = call.rw_single(
7472
bucket_id, INSERT_FUNC_NAME,
7573
{space_name, tuple}, {timeout=opts.timeout})
@@ -94,17 +92,15 @@ end
9492
-- @param table obj
9593
-- Object
9694
--
97-
-- @tparam ?number opts.timeout
98-
-- Function call timeout
95+
-- @tparam ?table opts
96+
-- Options of insert.tuple
9997
--
10098
-- @return[1] object
10199
-- @treturn[2] nil
102100
-- @treturn[2] table Error description
103101
--
104102
function insert.object(space_name, obj, opts)
105-
checks('string', 'table', {
106-
timeout = '?number',
107-
})
103+
checks('string', 'table', '?table')
108104

109105
opts = opts or {}
110106

crud/replace.lua

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local vshard = require('vshard')
44

55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
7+
local sharding = require('crud.common.sharding')
78
local dev_checks = require('crud.common.dev_checks')
89

910
local ReplaceError = errors.new_class('Replace', { capture_stack = false })
@@ -40,13 +41,18 @@ end
4041
-- @tparam ?number opts.timeout
4142
-- Function call timeout
4243
--
44+
-- @tparam ?number opts.bucket_id
45+
-- Bucket ID
46+
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
47+
--
4348
-- @return[1] object
4449
-- @treturn[2] nil
4550
-- @treturn[2] table Error description
4651
--
4752
function replace.tuple(space_name, tuple, opts)
4853
checks('string', 'table', {
4954
timeout = '?number',
55+
bucket_id = '?number|cdata',
5056
})
5157

5258
opts = opts or {}
@@ -56,20 +62,11 @@ function replace.tuple(space_name, tuple, opts)
5662
return nil, ReplaceError:new("Space %q doesn't exist", space_name)
5763
end
5864

59-
local key = utils.extract_key(tuple, space.index[0].parts)
60-
61-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
62-
local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space)
65+
local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id)
6366
if err ~= nil then
64-
return nil, err
67+
return nil, ReplaceError:new("Failed to get bucket ID: %s", err)
6568
end
6669

67-
if tuple[bucket_id_fieldno] ~= nil then
68-
return nil, ReplaceError:new("Unexpected value (%s) at field %s (bucket_id)",
69-
tuple[bucket_id_fieldno], bucket_id_fieldno)
70-
end
71-
72-
tuple[bucket_id_fieldno] = bucket_id
7370
local result, err = call.rw_single(
7471
bucket_id, REPLACE_FUNC_NAME,
7572
{space_name, tuple}, {timeout=opts.timeout})
@@ -95,17 +92,15 @@ end
9592
-- @param table obj
9693
-- Object
9794
--
98-
-- @tparam ?number opts.timeout
99-
-- Function call timeout
95+
-- @tparam ?table opts
96+
-- Options of replace.tuple
10097
--
10198
-- @return[1] object
10299
-- @treturn[2] nil
103100
-- @treturn[2] table Error description
104101
--
105102
function replace.object(space_name, obj, opts)
106-
checks('string', 'table', {
107-
timeout = '?number',
108-
})
103+
checks('string', 'table', '?table')
109104

110105
opts = opts or {}
111106

0 commit comments

Comments
 (0)