From 3858db92fee64cdd358c59dd06c0a1fb1f79acf8 Mon Sep 17 00:00:00 2001 From: Ilya Grishnov Date: Fri, 20 Jan 2023 15:57:35 +0300 Subject: [PATCH] feature: master presence timout for get space Added timeout condition for the validation of master presence in replicaset and for the master connection to the `utils.get_space` method. Closes #95 --- CHANGELOG.md | 2 ++ README.md | 33 ++++++++++++++++++++---------- crud/borders.lua | 7 +++++-- crud/common/utils.lua | 34 +++++++++++++++++++++++-------- crud/count.lua | 12 +++++------ crud/delete.lua | 2 +- crud/get.lua | 2 +- crud/insert.lua | 2 +- crud/insert_many.lua | 2 +- crud/len.lua | 2 +- crud/replace.lua | 2 +- crud/replace_many.lua | 2 +- crud/select/compat/select.lua | 12 +++++------ crud/select/compat/select_old.lua | 12 +++++------ crud/stats/init.lua | 2 +- crud/update.lua | 2 +- crud/upsert.lua | 2 +- crud/upsert_many.lua | 2 +- test/helper.lua | 2 +- 19 files changed, 82 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a6b8696..526dcf75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +* Added timeout condition for the validation of master presence in + replicaset and for the master connection (#95). ### Changed diff --git a/README.md b/README.md index c37abafc..69eebe93 100644 --- a/README.md +++ b/README.md @@ -204,7 +204,8 @@ where: * `space_name` (`string`) - name of the space to insert an object * `tuple` / `object` (`table`) - tuple/object to insert * `opts`: - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `bucket_id` (`?number|cdata`) - bucket ID * `fields` (`?table`) - field names for getting only a subset of fields * `vshard_router` (`?string|table`) - Cartridge vshard group name or @@ -263,7 +264,8 @@ where: * `space_name` (`string`) - name of the space to insert an object * `tuples` / `objects` (`table`) - array of tuples/objects to insert * `opts`: - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `fields` (`?table`) - field names for getting only a subset of fields * `stop_on_error` (`?boolean`) - stop on a first error and report error regarding the failed operation and error about what tuples were not @@ -411,7 +413,8 @@ where: * `opts`: * `fields` (`?table`) - field names for getting only a subset of fields * `bucket_id` (`?number|cdata`) - bucket ID - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `mode` (`?string`, `read` or `write`) - if `write` is specified then `get` is performed on master, default value is `read` * `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of @@ -450,7 +453,8 @@ where: * `key` (`any`) - primary key value * `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/) * `opts`: - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `bucket_id` (`?number|cdata`) - bucket ID * `fields` (`?table`) - field names for getting only a subset of fields * `vshard_router` (`?string|table`) - Cartridge vshard group name or @@ -485,7 +489,8 @@ where: * `space_name` (`string`) - name of the space * `key` (`any`) - primary key value * `opts`: - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `bucket_id` (`?number|cdata`) - bucket ID * `fields` (`?table`) - field names for getting only a subset of fields * `vshard_router` (`?string|table`) - Cartridge vshard group name or @@ -522,7 +527,8 @@ where: * `space_name` (`string`) - name of the space * `tuple` / `object` (`table`) - tuple/object to insert or replace exist one * `opts`: - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `bucket_id` (`?number|cdata`) - bucket ID * `fields` (`?table`) - field names for getting only a subset of fields * `vshard_router` (`?string|table`) - Cartridge vshard group name or @@ -581,7 +587,8 @@ where: * `space_name` (`string`) - name of the space to insert/replace an object * `tuples` / `objects` (`table`) - array of tuples/objects to insert * `opts`: - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `fields` (`?table`) - field names for getting only a subset of fields * `stop_on_error` (`?boolean`) - stop on a first error and report error regarding the failed operation and error about what tuples were not @@ -730,7 +737,8 @@ where: * `tuple` / `object` (`table`) - tuple/object to insert if there is no existing tuple which matches the key fields * `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/) if there is an existing tuple which matches the key fields of tuple * `opts`: - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `bucket_id` (`?number|cdata`) - bucket ID * `fields` (`?table`) - field names for getting only a subset of fields * `vshard_router` (`?string|table`) - Cartridge vshard group name or @@ -785,7 +793,8 @@ where: if there is tuple with duplicate key then existing tuple will be updated with update operations * `opts`: - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `fields` (`?table`) - field names for getting only a subset of fields * `stop_on_error` (`?boolean`) - stop on a first error and report error regarding the failed operation and error about what tuples were not @@ -1108,7 +1117,8 @@ where: * `space_name` (`string`) - name of the space * `opts`: - * `timeout` (`?number`) - `vshard.call` timeout (in seconds) + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `vshard_router` (`?string|table`) - Cartridge vshard group name or vshard router instance. Set this parameter if your space is not a part of the default vshard cluster @@ -1220,7 +1230,8 @@ where: * `opts`: * `yield_every` (`?number`) - number of tuples processed to yield after, `yield_every` should be > 0, default value is 100 - * `timeout` (`?number`) - `vshard.call` timeout (in seconds), default value is 2 + * `timeout` (`?number`) - `vshard.call` timeout and vshard master + discovery timeout (in seconds), default value is 2 * `bucket_id` (`?number|cdata`) - bucket ID * `force_map_call` (`?boolean`) - if `true` then the map call is performed without any optimizations even, diff --git a/crud/borders.lua b/crud/borders.lua index ef2a4121..c716a930 100644 --- a/crud/borders.lua +++ b/crud/borders.lua @@ -73,8 +73,7 @@ local function call_get_border_on_router(vshard_router, border_name, space_name, vshard_router = '?string|table', }) - local replicasets = vshard_router:routeall() - local space, err = utils.get_space(space_name, replicasets) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, BorderError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -100,6 +99,10 @@ local function call_get_border_on_router(vshard_router, border_name, space_name, local cmp_key_parts = utils.merge_primary_key_parts(index.parts, primary_index.parts) local field_names = utils.enrich_field_names_with_cmp_key(opts.fields, cmp_key_parts, space:format()) + local replicasets, err = vshard_router:routeall() + if err ~= nil then + return nil, BorderError:new("Failed to get router replicasets: %s", err) + end local call_opts = { mode = 'read', replicasets = replicasets, diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 3f25199e..e1e3075f 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -21,7 +21,7 @@ local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = local NotInitializedError = errors.new_class('NotInitialized') local StorageInfoError = errors.new_class('StorageInfoError') local VshardRouterError = errors.new_class('VshardRouterError', {capture_stack = false}) -local fiber_clock = require('fiber').clock +local fiber = require('fiber') local utils = {} @@ -96,8 +96,25 @@ function utils.format_replicaset_error(replicaset_uuid, msg, ...) ) end -function utils.get_space(space_name, replicasets) - local replicaset = select(2, next(replicasets)) +function utils.get_space(space_name, vshard_router, timeout) + local replicasets, replicaset + timeout = timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT + local deadline = fiber.clock() + timeout + while ( + -- Break if the deadline condition is exceeded. + -- Handling for deadline errors are below in the code. + fiber.clock() < deadline + ) do + -- Try to get master with timeout. + fiber.yield() + replicasets = vshard_router:routeall() + replicaset = select(2, next(replicasets)) + if replicaset ~= nil and + replicaset.master ~= nil and + replicaset.master.conn.error == nil then + break + end + end if replicaset == nil then return nil, GetSpaceError:new( @@ -119,13 +136,14 @@ function utils.get_space(space_name, replicasets) replicaset.uuid, replicaset.master.conn.error) return nil, GetSpaceError:new(error_msg) end + local space = replicaset.master.conn.space[space_name] return space end -function utils.get_space_format(space_name, replicasets) - local space, err = utils.get_space(space_name, replicasets) +function utils.get_space_format(space_name, vshard_router) + local space, err = utils.get_space(space_name, vshard_router) if err ~= nil then return nil, GetSpaceFormatError:new("An error occurred during the operation: %s", err) end @@ -664,7 +682,7 @@ function utils.cut_rows(rows, metadata, field_names) end local function flatten_obj(vshard_router, space_name, obj, skip_nullability_check) - local space_format, err = utils.get_space_format(space_name, vshard_router:routeall()) + local space_format, err = utils.get_space_format(space_name, vshard_router) if err ~= nil then return nil, FlattenError:new("Failed to get space format: %s", err), const.NEED_SCHEMA_RELOAD end @@ -835,9 +853,9 @@ function utils.storage_info(opts) end end - local deadline = fiber_clock() + timeout + local deadline = fiber.clock() + timeout for replica_uuid, future in pairs(futures_by_replicas) do - local wait_timeout = deadline - fiber_clock() + local wait_timeout = deadline - fiber.clock() if wait_timeout < 0 then wait_timeout = 0 end diff --git a/crud/count.lua b/crud/count.lua index 70f9b714..a95657ad 100644 --- a/crud/count.lua +++ b/crud/count.lua @@ -132,12 +132,7 @@ local function call_count_on_router(vshard_router, space_name, user_conditions, return nil, CountError:new("Failed to parse conditions: %s", err) end - local replicasets, err = vshard_router:routeall() - if err ~= nil then - return nil, CountError:new("Failed to get router replicasets: %s", err) - end - - local space, err = utils.get_space(space_name, replicasets) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, CountError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -169,7 +164,10 @@ local function call_count_on_router(vshard_router, space_name, user_conditions, check_count_safety(space_name, plan, opts) -- set replicasets to count from - local replicasets_to_count = replicasets + local replicasets_to_count, err = vshard_router:routeall() + if err ~= nil then + return nil, CountError:new("Failed to get router replicasets: %s", err) + end -- Whether to call one storage replicaset or perform -- map-reduce? diff --git a/crud/delete.lua b/crud/delete.lua index 3151c166..5ec51e3e 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -62,7 +62,7 @@ local function call_delete_on_router(vshard_router, space_name, key, opts) vshard_router = '?string|table', }) - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, DeleteError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end diff --git a/crud/get.lua b/crud/get.lua index 82bd830f..46e1d871 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -65,7 +65,7 @@ local function call_get_on_router(vshard_router, space_name, key, opts) vshard_router = '?string|table', }) - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, GetError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end diff --git a/crud/insert.lua b/crud/insert.lua index 636ede78..7660457c 100644 --- a/crud/insert.lua +++ b/crud/insert.lua @@ -64,7 +64,7 @@ local function call_insert_on_router(vshard_router, space_name, original_tuple, vshard_router = '?string|table', }) - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, InsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end diff --git a/crud/insert_many.lua b/crud/insert_many.lua index 2751e8ff..f7fc54e6 100644 --- a/crud/insert_many.lua +++ b/crud/insert_many.lua @@ -131,7 +131,7 @@ local function call_insert_many_on_router(vshard_router, space_name, original_tu vshard_router = '?string|table', }) - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, { InsertManyError:new("An error occurred during the operation: %s", err) diff --git a/crud/len.lua b/crud/len.lua index bf07e8eb..930fe5de 100644 --- a/crud/len.lua +++ b/crud/len.lua @@ -64,7 +64,7 @@ function len.call(space_name, opts) return nil, LenError:new(err) end - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, LenError:new("An error occurred during the operation: %s", err) end diff --git a/crud/replace.lua b/crud/replace.lua index 4d18df1c..3ffc0548 100644 --- a/crud/replace.lua +++ b/crud/replace.lua @@ -64,7 +64,7 @@ local function call_replace_on_router(vshard_router, space_name, original_tuple, vshard_router = '?string|table', }) - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, ReplaceError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end diff --git a/crud/replace_many.lua b/crud/replace_many.lua index c6c3ccb1..73f44b9a 100644 --- a/crud/replace_many.lua +++ b/crud/replace_many.lua @@ -133,7 +133,7 @@ local function call_replace_many_on_router(vshard_router, space_name, original_t vshard_router = '?string|table', }) - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, { ReplaceManyError:new("An error occurred during the operation: %s", err) diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 54ed3662..a7cc9473 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -42,12 +42,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, return nil, SelectError:new("Failed to parse conditions: %s", err) end - local replicasets, err = vshard_router:routeall() - if err ~= nil then - return nil, SelectError:new("Failed to get router replicasets: %s", err) - end - - local space, err = utils.get_space(space_name, replicasets) + local space, err = utils.get_space(space_name, vshard_router) if err ~= nil then return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -84,7 +79,10 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, end -- set replicasets to select from - local replicasets_to_select = replicasets + local replicasets_to_select, err = vshard_router:routeall() + if err ~= nil then + return nil, SelectError:new("Failed to get router replicasets: %s", err) + end -- Whether to call one storage replicaset or perform -- map-reduce? diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 2c164b32..d36132e6 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -107,12 +107,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, return nil, SelectError:new("Failed to parse conditions: %s", err) end - local replicasets, err = vshard_router:routeall() - if err ~= nil then - return nil, SelectError:new("Failed to get router replicasets: %s", err) - end - - local space, err = utils.get_space(space_name, replicasets) + local space, err = utils.get_space(space_name, vshard_router) if err ~= nil then return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end @@ -151,7 +146,10 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, end -- set replicasets to select from - local replicasets_to_select = replicasets + local replicasets_to_select, err = vshard_router:routeall() + if err ~= nil then + return nil, SelectError:new("Failed to get router replicasets: %s", err) + end -- See explanation of this logic in -- crud/select/compat/select.lua. diff --git a/crud/stats/init.lua b/crud/stats/init.lua index 41dcaf85..a8114350 100644 --- a/crud/stats/init.lua +++ b/crud/stats/init.lua @@ -264,7 +264,7 @@ local function resolve_space_name(space_id) return nil end - local space, err = utils.get_space(space_id, replicasets) + local space, err = utils.get_space(space_id, vshard_router) if err ~= nil then log.warn("An error occurred during getting space: %s", err) return nil diff --git a/crud/update.lua b/crud/update.lua index 620ffd73..cfb39431 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -84,7 +84,7 @@ local function call_update_on_router(vshard_router, space_name, key, user_operat vshard_router = '?string|table', }) - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, UpdateError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end diff --git a/crud/upsert.lua b/crud/upsert.lua index 96752f92..30a7efa0 100644 --- a/crud/upsert.lua +++ b/crud/upsert.lua @@ -62,7 +62,7 @@ local function call_upsert_on_router(vshard_router, space_name, original_tuple, vshard_router = '?string|table', }) - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, UpsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD end diff --git a/crud/upsert_many.lua b/crud/upsert_many.lua index 958218bc..c27294de 100644 --- a/crud/upsert_many.lua +++ b/crud/upsert_many.lua @@ -129,7 +129,7 @@ local function call_upsert_many_on_router(vshard_router, space_name, original_tu vshard_router = '?string|table', }) - local space, err = utils.get_space(space_name, vshard_router:routeall()) + local space, err = utils.get_space(space_name, vshard_router, opts.timeout) if err ~= nil then return nil, { UpsertManyError:new("An error occurred during the operation: %s", err) diff --git a/test/helper.lua b/test/helper.lua index 6d1436ce..ededa70c 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -452,7 +452,7 @@ function helpers.is_space_exist(router, space_name) local vshard = require('vshard') local utils = require('crud.common.utils') - local space, err = utils.get_space(..., vshard.router.routeall()) + local space, err = utils.get_space(..., vshard.router) if err ~= nil then return nil, err end