Skip to content

Commit 6d491c4

Browse files
committed
Allow to specify bucket_id
* Bucket ID can be specified using `opts.bucket_id` for all operations * For `insert`/`insert_object` bucket ID can be specified as tuple/object field as well as `opts.bucket_id` value
1 parent aaf4106 commit 6d491c4

File tree

11 files changed

+798
-43
lines changed

11 files changed

+798
-43
lines changed

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ across the cluster.
1515
**Notes:**
1616

1717
* A space should have a format.
18-
* `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
18+
* By default, `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
1919
where `key` is the primary key value.
20+
Custom bucket ID can be specified as `opts.bucket_id` for each operation.
21+
For `insert`/`insert_object` bucket ID can be specified as
22+
tuple/object field as well as `opts.bucket_id` value.
2023

2124
### Insert
2225

@@ -33,6 +36,7 @@ where:
3336
* `tuple` / `object` (`table`) - tuple/object to insert
3437
* `opts`:
3538
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
39+
* `bucket_id` (`?number|cdata`) - bucket ID
3640

3741
Returns metadata and array contains one inserted row, error.
3842

@@ -75,6 +79,7 @@ where:
7579
* `key` (`any`) - primary key value
7680
* `opts`:
7781
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
82+
* `bucket_id` (`?number|cdata`) - bucket ID
7883

7984
Returns metadata and array contains one row, error.
8085

@@ -106,6 +111,7 @@ where:
106111
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update)
107112
* `opts`:
108113
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
114+
* `bucket_id` (`?number|cdata`) - bucket ID
109115

110116
Returns metadata and array contains one updated row, error.
111117

@@ -136,6 +142,7 @@ where:
136142
* `key` (`any`) - primary key value
137143
* `opts`:
138144
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
145+
* `bucket_id` (`?number|cdata`) - bucket ID
139146

140147
Returns metadata and array contains one deleted row (empty for vinyl), error.
141148

@@ -168,6 +175,7 @@ where:
168175
* `tuple` / `object` (`table`) - tuple/object to insert or replace exist one
169176
* `opts`:
170177
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
178+
* `bucket_id` (`?number|cdata`) - bucket ID
171179

172180
Returns inserted or replaced rows and metadata or nil with error.
173181

@@ -214,6 +222,7 @@ where:
214222
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update) if there is an existing tuple which matches the key fields of tuple
215223
* `opts`:
216224
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
225+
* `bucket_id` (`?number|cdata`) - bucket ID
217226

218227
Returns metadata and empty array of rows or nil, error.
219228

@@ -267,6 +276,8 @@ where:
267276
* `after` (`?table`) - tuple after which objects should be selected
268277
* `batch_size` (`?number`) - number of tuples to process per one request to storage
269278
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
279+
* `bucket_id` (`?number|cdata`) - bucket ID
280+
(is used when select by full primary key is performed)
270281

271282
Returns metadata and array of rows, error.
272283

crud/common/sharding.lua

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
local vshard = require('vshard')
2+
3+
local utils = require('crud.common.utils')
4+
5+
local sharding = {}
6+
7+
function sharding.get_bucket_id_by_key(key, specified_bucket_id)
8+
if specified_bucket_id ~= nil then
9+
return specified_bucket_id
10+
end
11+
12+
return vshard.router.bucket_id_strcrc32(key)
13+
end
14+
15+
function sharding.get_bucket_id_by_tuple(tuple, space, specified_bucket_id)
16+
if specified_bucket_id ~= nil then
17+
return specified_bucket_id
18+
end
19+
20+
local key = utils.extract_key(tuple, space.index[0].parts)
21+
return sharding.get_bucket_id_by_key(key)
22+
end
23+
24+
return sharding

crud/delete.lua

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local DeleteError = errors.new_class('Delete', {capture_stack = false})
@@ -44,18 +45,22 @@ end
4445
-- @tparam ?number opts.timeout
4546
-- Function call timeout
4647
--
48+
-- @tparam ?number opts.bucket_id
49+
-- Bucket ID
50+
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
51+
--
4752
-- @return[1] object
4853
-- @treturn[2] nil
4954
-- @treturn[2] table Error description
5055
--
5156
function delete.call(space_name, key, opts)
5257
checks('string', '?', {
5358
timeout = '?number',
59+
bucket_id = '?number|cdata',
5460
})
5561

5662
opts = opts or {}
5763

58-
5964
local space = utils.get_space(space_name, vshard.router.routeall())
6065
if space == nil then
6166
return nil, DeleteError:new("Space %q doesn't exist", space_name)
@@ -65,7 +70,7 @@ function delete.call(space_name, key, opts)
6570
key = key:totable()
6671
end
6772

68-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
73+
local bucket_id = sharding.get_bucket_id_by_key(key, opts.bucket_id)
6974
local replicaset, err = vshard.router.route(bucket_id)
7075
if replicaset == nil then
7176
return nil, DeleteError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)

crud/get.lua

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local GetError = errors.new_class('Get', {capture_stack = false})
@@ -44,13 +45,18 @@ end
4445
-- @tparam ?number opts.timeout
4546
-- Function call timeout
4647
--
48+
-- @tparam ?number opts.bucket_id
49+
-- Bucket ID
50+
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
51+
--
4752
-- @return[1] object
4853
-- @treturn[2] nil
4954
-- @treturn[2] table Error description
5055
--
5156
function get.call(space_name, key, opts)
5257
checks('string', '?', {
5358
timeout = '?number',
59+
bucket_id = '?number|cdata',
5460
})
5561

5662
opts = opts or {}
@@ -64,8 +70,7 @@ function get.call(space_name, key, opts)
6470
key = key:totable()
6571
end
6672

67-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
68-
73+
local bucket_id = sharding.get_bucket_id_by_key(key, opts.bucket_id)
6974
local replicaset, err = vshard.router.route(bucket_id)
7075
if replicaset == nil then
7176
return nil, GetError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)

crud/insert.lua

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local InsertError = errors.new_class('Insert', {capture_stack = false})
@@ -43,13 +44,18 @@ end
4344
-- @tparam ?number opts.timeout
4445
-- Function call timeout
4546
--
47+
-- @tparam ?number opts.bucket_id
48+
-- Bucket ID
49+
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
50+
--
4651
-- @return[1] tuple
4752
-- @treturn[2] nil
4853
-- @treturn[2] table Error description
4954
--
5055
function insert.tuple(space_name, tuple, opts)
5156
checks('string', 'table', {
5257
timeout = '?number',
58+
bucket_id = '?number|cdata',
5359
})
5460

5561
opts = opts or {}
@@ -60,21 +66,32 @@ function insert.tuple(space_name, tuple, opts)
6066
end
6167
local space_format = space:format()
6268

63-
local key = utils.extract_key(tuple, space.index[0].parts)
64-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
65-
local replicaset, err = vshard.router.route(bucket_id)
66-
if replicaset == nil then
67-
return nil, InsertError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
68-
end
69-
7069
local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space)
7170
if err ~= nil then
7271
return nil, err
7372
end
7473

75-
if tuple[bucket_id_fieldno] ~= nil then
76-
return nil, InsertError:new("Unexpected value (%s) at field %s (bucket_id)",
77-
tuple[bucket_id_fieldno], bucket_id_fieldno)
74+
if opts.bucket_id ~= nil then
75+
if tuple[bucket_id_fieldno] == nil then
76+
tuple[bucket_id_fieldno] = opts.bucket_id
77+
else
78+
if tuple[bucket_id_fieldno] ~= opts.bucket_id then
79+
return nil, InsertError:new(
80+
"Tuple and opts.bucket_id contain different bucket_id values: %s and %s",
81+
tuple[bucket_id_fieldno], opts.bucket_id
82+
)
83+
end
84+
end
85+
end
86+
87+
if tuple[bucket_id_fieldno] == nil then
88+
tuple[bucket_id_fieldno] = sharding.get_bucket_id_by_tuple(tuple, space)
89+
end
90+
91+
local bucket_id = tuple[bucket_id_fieldno]
92+
local replicaset, err = vshard.router.route(bucket_id)
93+
if replicaset == nil then
94+
return nil, InsertError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
7895
end
7996

8097
tuple[bucket_id_fieldno] = bucket_id
@@ -104,17 +121,15 @@ end
104121
-- @param table obj
105122
-- Object
106123
--
107-
-- @tparam ?number opts.timeout
108-
-- Function call timeout
124+
-- @tparam ?table opts
125+
-- Options of insert.tuple
109126
--
110127
-- @return[1] object
111128
-- @treturn[2] nil
112129
-- @treturn[2] table Error description
113130
--
114131
function insert.object(space_name, obj, opts)
115-
checks('string', 'table', {
116-
timeout = '?number',
117-
})
132+
checks('string', 'table', '?table')
118133

119134
opts = opts or {}
120135

crud/replace.lua

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local ReplaceError = errors.new_class('Replace', { capture_stack = false })
@@ -43,13 +44,18 @@ end
4344
-- @tparam ?number opts.timeout
4445
-- Function call timeout
4546
--
47+
-- @tparam ?number opts.bucket_id
48+
-- Bucket ID
49+
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
50+
--
4651
-- @return[1] object
4752
-- @treturn[2] nil
4853
-- @treturn[2] table Error description
4954
--
5055
function replace.tuple(space_name, tuple, opts)
5156
checks('string', 'table', {
5257
timeout = '?number',
58+
bucket_id = '?number|cdata',
5359
})
5460

5561
opts = opts or {}
@@ -59,9 +65,7 @@ function replace.tuple(space_name, tuple, opts)
5965
return nil, ReplaceError:new("Space %q doesn't exist", space_name)
6066
end
6167

62-
local key = utils.extract_key(tuple, space.index[0].parts)
63-
64-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
68+
local bucket_id = sharding.get_bucket_id_by_tuple(tuple, space, opts.bucket_id)
6569
local replicaset, err = vshard.router.route(bucket_id)
6670
if replicaset == nil then
6771
return nil, ReplaceError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
@@ -104,17 +108,15 @@ end
104108
-- @param table obj
105109
-- Object
106110
--
107-
-- @tparam ?number opts.timeout
108-
-- Function call timeout
111+
-- @tparam ?table opts
112+
-- Options of replace.tuple
109113
--
110114
-- @return[1] object
111115
-- @treturn[2] nil
112116
-- @treturn[2] table Error description
113117
--
114118
function replace.object(space_name, obj, opts)
115-
checks('string', 'table', {
116-
timeout = '?number',
117-
})
119+
checks('string', 'table', '?table')
118120

119121
opts = opts or {}
120122

crud/select.lua

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local select_conditions = require('crud.select.conditions')
@@ -104,8 +105,7 @@ local function select_iteration(space_name, plan, opts)
104105
return results
105106
end
106107

107-
local function get_replicasets_by_sharding_key(sharding_key)
108-
local bucket_id = vshard.router.bucket_id_strcrc32(sharding_key)
108+
local function get_replicasets_by_sharding_key(bucket_id)
109109
local replicaset, err = vshard.router.route(bucket_id)
110110
if replicaset == nil then
111111
return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
@@ -122,6 +122,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
122122
first = '?number',
123123
timeout = '?number',
124124
batch_size = '?number',
125+
bucket_id = '?number|cdata',
125126
})
126127

127128
opts = opts or {}
@@ -163,7 +164,8 @@ local function build_select_iterator(space_name, user_conditions, opts)
163164
local replicasets_to_select = replicasets
164165

165166
if plan.sharding_key ~= nil then
166-
replicasets_to_select = get_replicasets_by_sharding_key(plan.sharding_key)
167+
local bucket_id = sharding.get_bucket_id_by_key(plan.sharding_key, opts.bucket_id)
168+
replicasets_to_select = get_replicasets_by_sharding_key(bucket_id)
167169
end
168170

169171
-- generate tuples comparator
@@ -202,6 +204,7 @@ function select_module.pairs(space_name, user_conditions, opts)
202204
timeout = '?number',
203205
batch_size = '?number',
204206
use_tomap = '?boolean',
207+
bucket_id = '?number|cdata',
205208
})
206209

207210
opts = opts or {}
@@ -215,6 +218,7 @@ function select_module.pairs(space_name, user_conditions, opts)
215218
first = opts.first,
216219
timeout = opts.timeout,
217220
batch_size = opts.batch_size,
221+
bucket_id = opts.bucket_id,
218222
})
219223

220224
if err ~= nil then
@@ -251,6 +255,7 @@ function select_module.call(space_name, user_conditions, opts)
251255
first = '?number',
252256
timeout = '?number',
253257
batch_size = '?number',
258+
bucket_id = '?number|cdata',
254259
})
255260

256261
opts = opts or {}
@@ -266,6 +271,7 @@ function select_module.call(space_name, user_conditions, opts)
266271
first = opts.first,
267272
timeout = opts.timeout,
268273
batch_size = opts.batch_size,
274+
bucket_id = opts.bucket_id,
269275
})
270276

271277
if err ~= nil then

0 commit comments

Comments
 (0)