Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added

* `bucket_id` option for all operations to specify custom bucket ID.
For operations that accepts tuple/object bucket ID can be specified as
tuple/object field as well as `opts.bucket_id` value.

### Fixed

* Select by primary index name
Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ to make `crud` functions callable via `net.box`.
**Notes:**

* A space should have a format.
* `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
* By default, `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
where `key` is the primary key value.
Custom bucket ID can be specified as `opts.bucket_id` for each operation.
For operations that accepts tuple/object bucket ID can be specified as
tuple/object field as well as `opts.bucket_id` value.

### Insert

Expand All @@ -35,6 +38,7 @@ where:
* `tuple` / `object` (`table`) - tuple/object to insert
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID

Returns metadata and array contains one inserted row, error.

Expand Down Expand Up @@ -77,6 +81,7 @@ where:
* `key` (`any`) - primary key value
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID

Returns metadata and array contains one row, error.

Expand Down Expand Up @@ -108,6 +113,7 @@ where:
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update)
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID

Returns metadata and array contains one updated row, error.

Expand Down Expand Up @@ -138,6 +144,7 @@ where:
* `key` (`any`) - primary key value
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID

Returns metadata and array contains one deleted row (empty for vinyl), error.

Expand Down Expand Up @@ -170,6 +177,7 @@ where:
* `tuple` / `object` (`table`) - tuple/object to insert or replace exist one
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID

Returns inserted or replaced rows and metadata or nil with error.

Expand Down Expand Up @@ -216,6 +224,7 @@ where:
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update) if there is an existing tuple which matches the key fields of tuple
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID

Returns metadata and empty array of rows or nil, error.

Expand Down Expand Up @@ -269,6 +278,8 @@ where:
* `after` (`?table`) - tuple after which objects should be selected
* `batch_size` (`?number`) - number of tuples to process per one request to storage
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
(is used when select by full primary key is performed)

Returns metadata and array of rows, error.

Expand Down
54 changes: 54 additions & 0 deletions crud/common/sharding.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
local vshard = require('vshard')
local errors = require('errors')

local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})

local utils = require('crud.common.utils')

local sharding = {}

function sharding.key_get_bucket_id(key, specified_bucket_id)
if specified_bucket_id ~= nil then
return specified_bucket_id
end

return vshard.router.bucket_id_strcrc32(key)
end

function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
if specified_bucket_id ~= nil then
return specified_bucket_id
end

local key = utils.extract_key(tuple, space.index[0].parts)
return sharding.key_get_bucket_id(key)
end

function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space)
if err ~= nil then
return nil, BucketIDError:new("Failed to get bucket ID fielno:", err)
end

if specified_bucket_id ~= nil then
if tuple[bucket_id_fieldno] == nil then
tuple[bucket_id_fieldno] = specified_bucket_id
else
if tuple[bucket_id_fieldno] ~= specified_bucket_id then
return nil, BucketIDError:new(
"Tuple and opts.bucket_id contain different bucket_id values: %s and %s",
tuple[bucket_id_fieldno], specified_bucket_id
)
end
end
end

if tuple[bucket_id_fieldno] == nil then
tuple[bucket_id_fieldno] = sharding.tuple_get_bucket_id(tuple, space)
end

local bucket_id = tuple[bucket_id_fieldno]
return bucket_id
end

return sharding
9 changes: 7 additions & 2 deletions crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local vshard = require('vshard')

local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')

local DeleteError = errors.new_class('Delete', {capture_stack = false})
Expand Down Expand Up @@ -41,18 +42,22 @@ end
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
--
-- @return[1] object
-- @treturn[2] nil
-- @treturn[2] table Error description
--
function delete.call(space_name, key, opts)
checks('string', '?', {
timeout = '?number',
bucket_id = '?number|cdata',
})

opts = opts or {}


local space = utils.get_space(space_name, vshard.router.routeall())
if space == nil then
return nil, DeleteError:new("Space %q doesn't exist", space_name)
Expand All @@ -62,7 +67,7 @@ function delete.call(space_name, key, opts)
key = key:totable()
end

local bucket_id = vshard.router.bucket_id_strcrc32(key)
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local result, err = call.rw_single(
bucket_id, DELETE_FUNC_NAME,
{space_name, key}, {timeout = opts.timeout}
Expand Down
8 changes: 7 additions & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local vshard = require('vshard')

local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')

local GetError = errors.new_class('Get', {capture_stack = false})
Expand Down Expand Up @@ -41,13 +42,18 @@ end
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
--
-- @return[1] object
-- @treturn[2] nil
-- @treturn[2] table Error description
--
function get.call(space_name, key, opts)
checks('string', '?', {
timeout = '?number',
bucket_id = '?number|cdata',
})

opts = opts or {}
Expand All @@ -61,7 +67,7 @@ function get.call(space_name, key, opts)
key = key:totable()
end

local bucket_id = vshard.router.bucket_id_strcrc32(key)
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
-- We don't use callro() here, because if the replication is
-- async, there could be a lag between master and replica, so a
-- connector which sequentially calls put() and then get() may get
Expand Down
26 changes: 11 additions & 15 deletions crud/insert.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local vshard = require('vshard')

local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')

local InsertError = errors.new_class('Insert', {capture_stack = false})
Expand Down Expand Up @@ -40,13 +41,18 @@ end
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
--
-- @return[1] tuple
-- @treturn[2] nil
-- @treturn[2] table Error description
--
function insert.tuple(space_name, tuple, opts)
checks('string', 'table', {
timeout = '?number',
bucket_id = '?number|cdata',
})

opts = opts or {}
Expand All @@ -57,19 +63,11 @@ function insert.tuple(space_name, tuple, opts)
end
local space_format = space:format()

local key = utils.extract_key(tuple, space.index[0].parts)
local bucket_id = vshard.router.bucket_id_strcrc32(key)
local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space)
local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id)
if err ~= nil then
return nil, err
end

if tuple[bucket_id_fieldno] ~= nil then
return nil, InsertError:new("Unexpected value (%s) at field %s (bucket_id)",
tuple[bucket_id_fieldno], bucket_id_fieldno)
return nil, InsertError:new("Failed to get bucket ID: %s", err)
end

tuple[bucket_id_fieldno] = bucket_id
local result, err = call.rw_single(
bucket_id, INSERT_FUNC_NAME,
{space_name, tuple}, {timeout=opts.timeout})
Expand All @@ -94,17 +92,15 @@ end
-- @param table obj
-- Object
--
-- @tparam ?number opts.timeout
-- Function call timeout
-- @tparam ?table opts
-- Options of insert.tuple
--
-- @return[1] object
-- @treturn[2] nil
-- @treturn[2] table Error description
--
function insert.object(space_name, obj, opts)
checks('string', 'table', {
timeout = '?number',
})
checks('string', 'table', '?table')

opts = opts or {}

Expand Down
27 changes: 11 additions & 16 deletions crud/replace.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local vshard = require('vshard')

local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')

local ReplaceError = errors.new_class('Replace', { capture_stack = false })
Expand Down Expand Up @@ -40,13 +41,18 @@ end
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
--
-- @return[1] object
-- @treturn[2] nil
-- @treturn[2] table Error description
--
function replace.tuple(space_name, tuple, opts)
checks('string', 'table', {
timeout = '?number',
bucket_id = '?number|cdata',
})

opts = opts or {}
Expand All @@ -56,20 +62,11 @@ function replace.tuple(space_name, tuple, opts)
return nil, ReplaceError:new("Space %q doesn't exist", space_name)
end

local key = utils.extract_key(tuple, space.index[0].parts)

local bucket_id = vshard.router.bucket_id_strcrc32(key)
local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space)
local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id)
if err ~= nil then
return nil, err
return nil, ReplaceError:new("Failed to get bucket ID: %s", err)
end

if tuple[bucket_id_fieldno] ~= nil then
return nil, ReplaceError:new("Unexpected value (%s) at field %s (bucket_id)",
tuple[bucket_id_fieldno], bucket_id_fieldno)
end

tuple[bucket_id_fieldno] = bucket_id
local result, err = call.rw_single(
bucket_id, REPLACE_FUNC_NAME,
{space_name, tuple}, {timeout=opts.timeout})
Expand All @@ -95,17 +92,15 @@ end
-- @param table obj
-- Object
--
-- @tparam ?number opts.timeout
-- Function call timeout
-- @tparam ?table opts
-- Options of replace.tuple
--
-- @return[1] object
-- @treturn[2] nil
-- @treturn[2] table Error description
--
function replace.object(space_name, obj, opts)
checks('string', 'table', {
timeout = '?number',
})
checks('string', 'table', '?table')

opts = opts or {}

Expand Down
Loading