Skip to content

Commit fc54ef4

Browse files
committed
Support sharding by custom bucket_id
1 parent aaf4106 commit fc54ef4

File tree

11 files changed

+601
-35
lines changed

11 files changed

+601
-35
lines changed

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ across the cluster.
1515
**Notes:**
1616

1717
* A space should have a format.
18-
* `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
18+
* By default, `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
1919
where `key` is the primary key value.
20+
Custom bucket ID can be specified as `opts.bucket_id` for each operation.
2021

2122
### Insert
2223

@@ -33,6 +34,7 @@ where:
3334
* `tuple` / `object` (`table`) - tuple/object to insert
3435
* `opts`:
3536
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
37+
* `bucket_id` (`?number|cdata`) - bucket ID
3638

3739
Returns metadata and array contains one inserted row, error.
3840

@@ -75,6 +77,7 @@ where:
7577
* `key` (`any`) - primary key value
7678
* `opts`:
7779
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
80+
* `bucket_id` (`?number|cdata`) - bucket ID
7881

7982
Returns metadata and array contains one row, error.
8083

@@ -106,6 +109,7 @@ where:
106109
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update)
107110
* `opts`:
108111
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
112+
* `bucket_id` (`?number|cdata`) - bucket ID
109113

110114
Returns metadata and array contains one updated row, error.
111115

@@ -136,6 +140,7 @@ where:
136140
* `key` (`any`) - primary key value
137141
* `opts`:
138142
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
143+
* `bucket_id` (`?number|cdata`) - bucket ID
139144

140145
Returns metadata and array contains one deleted row (empty for vinyl), error.
141146

@@ -168,6 +173,7 @@ where:
168173
* `tuple` / `object` (`table`) - tuple/object to insert or replace exist one
169174
* `opts`:
170175
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
176+
* `bucket_id` (`?number|cdata`) - bucket ID
171177

172178
Returns inserted or replaced rows and metadata or nil with error.
173179

@@ -214,6 +220,7 @@ where:
214220
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update) if there is an existing tuple which matches the key fields of tuple
215221
* `opts`:
216222
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
223+
* `bucket_id` (`?number|cdata`) - bucket ID
217224

218225
Returns metadata and empty array of rows or nil, error.
219226

@@ -267,6 +274,8 @@ where:
267274
* `after` (`?table`) - tuple after which objects should be selected
268275
* `batch_size` (`?number`) - number of tuples to process per one request to storage
269276
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
277+
* `bucket_id` (`?number|cdata`) - bucket ID
278+
(is used when select by full primary key is performed)
270279

271280
Returns metadata and array of rows, error.
272281

crud/common/sharding.lua

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
local vshard = require('vshard')
2+
3+
local utils = require('crud.common.utils')
4+
5+
local sharding = {}
6+
7+
function sharding.get_bucket_id_by_key(key, _)
8+
return vshard.router.bucket_id_strcrc32(key)
9+
end
10+
11+
function sharding.get_bucket_id_by_tuple(tuple, space)
12+
local key = utils.extract_key(tuple, space.index[0].parts)
13+
return sharding.get_bucket_id_by_key(key)
14+
end
15+
16+
return sharding

crud/delete.lua

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local DeleteError = errors.new_class('Delete', {capture_stack = false})
@@ -51,11 +52,11 @@ end
5152
function delete.call(space_name, key, opts)
5253
checks('string', '?', {
5354
timeout = '?number',
55+
bucket_id = '?number|cdata',
5456
})
5557

5658
opts = opts or {}
5759

58-
5960
local space = utils.get_space(space_name, vshard.router.routeall())
6061
if space == nil then
6162
return nil, DeleteError:new("Space %q doesn't exist", space_name)
@@ -65,7 +66,7 @@ function delete.call(space_name, key, opts)
6566
key = key:totable()
6667
end
6768

68-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
69+
local bucket_id = opts.bucket_id or sharding.get_bucket_id_by_key(key, space)
6970
local replicaset, err = vshard.router.route(bucket_id)
7071
if replicaset == nil then
7172
return nil, DeleteError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)

crud/get.lua

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local GetError = errors.new_class('Get', {capture_stack = false})
@@ -51,6 +52,7 @@ end
5152
function get.call(space_name, key, opts)
5253
checks('string', '?', {
5354
timeout = '?number',
55+
bucket_id = '?number|cdata',
5456
})
5557

5658
opts = opts or {}
@@ -64,8 +66,7 @@ function get.call(space_name, key, opts)
6466
key = key:totable()
6567
end
6668

67-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
68-
69+
local bucket_id = opts.bucket_id or sharding.get_bucket_id_by_key(key, space)
6970
local replicaset, err = vshard.router.route(bucket_id)
7071
if replicaset == nil then
7172
return nil, GetError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)

crud/insert.lua

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local InsertError = errors.new_class('Insert', {capture_stack = false})
@@ -50,6 +51,7 @@ end
5051
function insert.tuple(space_name, tuple, opts)
5152
checks('string', 'table', {
5253
timeout = '?number',
54+
bucket_id = '?number|cdata',
5355
})
5456

5557
opts = opts or {}
@@ -60,8 +62,7 @@ function insert.tuple(space_name, tuple, opts)
6062
end
6163
local space_format = space:format()
6264

63-
local key = utils.extract_key(tuple, space.index[0].parts)
64-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
65+
local bucket_id = opts.bucket_id or sharding.get_bucket_id_by_tuple(tuple, space)
6566
local replicaset, err = vshard.router.route(bucket_id)
6667
if replicaset == nil then
6768
return nil, InsertError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
@@ -104,17 +105,15 @@ end
104105
-- @param table obj
105106
-- Object
106107
--
107-
-- @tparam ?number opts.timeout
108-
-- Function call timeout
108+
-- @tparam ?table opts
109+
-- Options of insert.tuple
109110
--
110111
-- @return[1] object
111112
-- @treturn[2] nil
112113
-- @treturn[2] table Error description
113114
--
114115
function insert.object(space_name, obj, opts)
115-
checks('string', 'table', {
116-
timeout = '?number',
117-
})
116+
checks('string', 'table', '?table')
118117

119118
opts = opts or {}
120119

crud/replace.lua

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local ReplaceError = errors.new_class('Replace', { capture_stack = false })
@@ -50,6 +51,7 @@ end
5051
function replace.tuple(space_name, tuple, opts)
5152
checks('string', 'table', {
5253
timeout = '?number',
54+
bucket_id = '?number|cdata',
5355
})
5456

5557
opts = opts or {}
@@ -59,9 +61,7 @@ function replace.tuple(space_name, tuple, opts)
5961
return nil, ReplaceError:new("Space %q doesn't exist", space_name)
6062
end
6163

62-
local key = utils.extract_key(tuple, space.index[0].parts)
63-
64-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
64+
local bucket_id = opts.bucket_id or sharding.get_bucket_id_by_tuple(tuple, space)
6565
local replicaset, err = vshard.router.route(bucket_id)
6666
if replicaset == nil then
6767
return nil, ReplaceError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
@@ -104,17 +104,15 @@ end
104104
-- @param table obj
105105
-- Object
106106
--
107-
-- @tparam ?number opts.timeout
108-
-- Function call timeout
107+
-- @tparam ?table opts
108+
-- Options of replace.tuple
109109
--
110110
-- @return[1] object
111111
-- @treturn[2] nil
112112
-- @treturn[2] table Error description
113113
--
114114
function replace.object(space_name, obj, opts)
115-
checks('string', 'table', {
116-
timeout = '?number',
117-
})
115+
checks('string', 'table', '?table')
118116

119117
opts = opts or {}
120118

crud/select.lua

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local select_conditions = require('crud.select.conditions')
@@ -104,8 +105,7 @@ local function select_iteration(space_name, plan, opts)
104105
return results
105106
end
106107

107-
local function get_replicasets_by_sharding_key(sharding_key)
108-
local bucket_id = vshard.router.bucket_id_strcrc32(sharding_key)
108+
local function get_replicasets_by_sharding_key(bucket_id)
109109
local replicaset, err = vshard.router.route(bucket_id)
110110
if replicaset == nil then
111111
return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
@@ -122,6 +122,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
122122
first = '?number',
123123
timeout = '?number',
124124
batch_size = '?number',
125+
bucket_id = '?number|cdata',
125126
})
126127

127128
opts = opts or {}
@@ -163,7 +164,8 @@ local function build_select_iterator(space_name, user_conditions, opts)
163164
local replicasets_to_select = replicasets
164165

165166
if plan.sharding_key ~= nil then
166-
replicasets_to_select = get_replicasets_by_sharding_key(plan.sharding_key)
167+
local bucket_id = opts.bucket_id or sharding.get_bucket_id_by_key(plan.sharding_key, space)
168+
replicasets_to_select = get_replicasets_by_sharding_key(bucket_id)
167169
end
168170

169171
-- generate tuples comparator
@@ -202,6 +204,7 @@ function select_module.pairs(space_name, user_conditions, opts)
202204
timeout = '?number',
203205
batch_size = '?number',
204206
use_tomap = '?boolean',
207+
bucket_id = '?number|cdata',
205208
})
206209

207210
opts = opts or {}
@@ -215,6 +218,7 @@ function select_module.pairs(space_name, user_conditions, opts)
215218
first = opts.first,
216219
timeout = opts.timeout,
217220
batch_size = opts.batch_size,
221+
bucket_id = opts.bucket_id,
218222
})
219223

220224
if err ~= nil then
@@ -251,6 +255,7 @@ function select_module.call(space_name, user_conditions, opts)
251255
first = '?number',
252256
timeout = '?number',
253257
batch_size = '?number',
258+
bucket_id = '?number|cdata',
254259
})
255260

256261
opts = opts or {}
@@ -266,6 +271,7 @@ function select_module.call(space_name, user_conditions, opts)
266271
first = opts.first,
267272
timeout = opts.timeout,
268273
batch_size = opts.batch_size,
274+
bucket_id = opts.bucket_id,
269275
})
270276

271277
if err ~= nil then

crud/update.lua

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local UpdateError = errors.new_class('Update', {capture_stack = false})
@@ -55,6 +56,7 @@ end
5556
function update.call(space_name, key, user_operations, opts)
5657
checks('string', '?', 'table', {
5758
timeout = '?number',
59+
bucket_id = '?number|cdata',
5860
})
5961

6062
opts = opts or {}
@@ -74,7 +76,7 @@ function update.call(space_name, key, user_operations, opts)
7476
return nil, UpdateError:new("Wrong operations are specified: %s", err)
7577
end
7678

77-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
79+
local bucket_id = opts.bucket_id or sharding.get_bucket_id_by_key(key, space)
7880
local replicaset, err = vshard.router.route(bucket_id)
7981
if replicaset == nil then
8082
return nil, UpdateError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)

crud/upsert.lua

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local registry = require('crud.common.registry')
77
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
89
local dev_checks = require('crud.common.dev_checks')
910

1011
local UpsertError = errors.new_class('UpsertError', { capture_stack = false})
@@ -54,6 +55,7 @@ end
5455
function upsert.tuple(space_name, tuple, user_operations, opts)
5556
checks('string', '?', 'table', {
5657
timeout = '?number',
58+
bucket_id = '?number|cdata',
5759
})
5860

5961
opts = opts or {}
@@ -69,9 +71,7 @@ function upsert.tuple(space_name, tuple, user_operations, opts)
6971
return nil, UpsertError:new("Wrong operations are specified: %s", err)
7072
end
7173

72-
local key = utils.extract_key(tuple, space.index[0].parts)
73-
74-
local bucket_id = vshard.router.bucket_id_strcrc32(key)
74+
local bucket_id = opts.bucket_id or sharding.get_bucket_id_by_tuple(tuple, space)
7575
local replicaset, err = vshard.router.route(bucket_id)
7676
if replicaset == nil then
7777
return nil, UpsertError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
@@ -118,17 +118,15 @@ end
118118
-- user_operations to be performed.
119119
-- See `space:upsert()` operations in Tarantool doc
120120
--
121-
-- @tparam ?number opts.timeout
122-
-- Function call timeout
121+
-- @tparam ?table opts
122+
-- Options of upsert.tuple
123123
--
124124
-- @return[1] object
125125
-- @treturn[2] nil
126126
-- @treturn[2] table Error description
127127
--
128128
function upsert.object(space_name, obj, user_operations, opts)
129-
checks('string', '?', 'table', {
130-
timeout = '?number',
131-
})
129+
checks('string', 'table', 'table', '?table')
132130

133131
opts = opts or {}
134132

0 commit comments

Comments
 (0)