Skip to content

Commit 0bbcf76

Browse files
committed
Provide an API to get storages initialization state
There is an issue with using CRUD functionality if not all storages are up. New function is added to get an information about storages state: initialized or not. So, a user can wait for storages to be initalized before making CRUD calls. Resolves #229
1 parent 1635f16 commit 0bbcf76

File tree

7 files changed

+151
-28
lines changed

7 files changed

+151
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
88
## [Unreleased]
99

1010
### Added
11+
* `crud.get_storages_state` function to get storages initialization state.
1112

1213
### Changed
1314

crud.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ crud.stats = stats.get
111111
-- @function reset_stats
112112
crud.reset_stats = stats.reset
113113

114+
-- @refer utils.get_storages_state
115+
-- @function get_storages_state
116+
crud.get_storages_state = utils.get_storages_state
117+
114118
--- Initializes crud on node
115119
--
116120
-- Exports all functions that are used for calls

crud/common/call.lua

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@ local dev_checks = require('crud.common.dev_checks')
55
local utils = require('crud.common.utils')
66
local sharding_utils = require('crud.common.sharding.utils')
77
local fiber_clock = require('fiber').clock
8+
local const = require('crud.common.const')
89

910
local CallError = errors.new_class('CallError')
1011
local NotInitializedError = errors.new_class('NotInitialized')
1112

1213
local call = {}
1314

14-
call.DEFAULT_VSHARD_CALL_TIMEOUT = 2
15-
1615
function call.get_vshard_call_name(mode, prefer_replica, balance)
1716
dev_checks('string', '?boolean', '?boolean')
1817

@@ -89,7 +88,7 @@ function call.map(func_name, func_args, opts)
8988
return nil, err
9089
end
9190

92-
local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
91+
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
9392

9493
local replicasets, err
9594
if opts.replicasets ~= nil then
@@ -144,7 +143,7 @@ function call.single(bucket_id, func_name, func_args, opts)
144143
return nil, err
145144
end
146145

147-
local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
146+
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
148147

149148
local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, {
150149
timeout = timeout,
@@ -166,7 +165,7 @@ function call.any(func_name, func_args, opts)
166165
timeout = '?number',
167166
})
168167

169-
local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
168+
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
170169

171170
local replicasets, err = vshard.router.routeall()
172171
if replicasets == nil then

crud/common/const.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1
77

88
const.NEED_SCHEMA_RELOAD = 0x0001000
99
const.NEED_SHARDING_RELOAD = 0x0001001
10+
const.DEFAULT_VSHARD_CALL_TIMEOUT = 2
1011

1112
return const

crud/common/state.lua

Lines changed: 0 additions & 23 deletions
This file was deleted.

crud/common/utils.lua

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ local ParseOperationsError = errors.new_class('ParseOperationsError', {capture_s
1414
local ShardingError = errors.new_class('ShardingError', {capture_stack = false})
1515
local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false})
1616
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
17+
local GetReplicaStateError = errors.new_class('GetStorageStateError')
18+
local fiber_clock = require('fiber').clock
1719

1820
local utils = {}
1921

@@ -692,4 +694,54 @@ function utils.check_name_isident(name)
692694
return true
693695
end
694696

697+
--- Polls replicas for storage state
698+
--
699+
-- @function get_storages_state
700+
--
701+
-- @tparam ?number opts.timeout
702+
-- Function call timeout
703+
--
704+
-- @return a table of storage states by replica uuid.
705+
function utils.get_storages_state(opts)
706+
local replicasets, err = vshard.router.routeall()
707+
if replicasets == nil then
708+
return nil, GetReplicaStateError:new("Failed to get all replicasets: %s", err.err)
709+
end
710+
711+
opts = opts or {}
712+
713+
local futures_by_replicas = {}
714+
local replicas_state = {}
715+
local async_opts = {is_async = true}
716+
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
717+
718+
for _, replicasets in pairs(replicasets) do
719+
for replica_uuid, replica in pairs(replicasets.replicas) do
720+
replicas_state[replica_uuid] = {name = replica.name, storage_initialized = false}
721+
local ok, future = pcall(replica.conn.eval, replica.conn, [[return rawget(_G, "_crud") ~= nil]],
722+
{}, async_opts)
723+
if ok then
724+
futures_by_replicas[replica_uuid] = future
725+
end
726+
end
727+
end
728+
729+
local deadline = fiber_clock() + timeout
730+
for replica_uuid, future in pairs(futures_by_replicas) do
731+
local wait_timeout = deadline - fiber_clock()
732+
if wait_timeout < 0 then
733+
wait_timeout = 0
734+
end
735+
736+
local result = future:wait_result(wait_timeout)
737+
if result then
738+
replicas_state[replica_uuid].storage_initialized = result[1]
739+
else
740+
future:discard()
741+
end
742+
end
743+
744+
return replicas_state
745+
end
746+
695747
return utils
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
local fio = require('fio')
2+
3+
local t = require('luatest')
4+
5+
local helpers = require('test.helper')
6+
7+
local fiber = require("fiber")
8+
9+
local pgroup = t.group('replicas_state', {
10+
{engine = 'memtx'}
11+
})
12+
13+
local all_storages_initialized = false
14+
15+
local function wait_storages_init(g)
16+
local storages_initialized = false
17+
local attempts_left = 5
18+
local wait_for_init_timeout = 1
19+
while (attempts_left > 0 and not storages_initialized) do
20+
local results, err = g.cluster.main_server.net_box:call("crud.get_storages_state", {})
21+
t.assert_equals(err, nil, "Error getting storags states")
22+
storages_initialized = true
23+
for _,v in pairs(results) do
24+
if not v.storage_initialized then
25+
storages_initialized = false
26+
end
27+
end
28+
if not storages_initialized then
29+
fiber.sleep(wait_for_init_timeout)
30+
attempts_left = attempts_left-1
31+
end
32+
end
33+
return storages_initialized
34+
end
35+
36+
pgroup.before_all(function(g)
37+
g.cluster = helpers.Cluster:new({
38+
datadir = fio.tempdir(),
39+
server_command = helpers.entrypoint('srv_select'),
40+
use_vshard = true,
41+
replicasets = helpers.get_test_replicasets(),
42+
env = {
43+
['ENGINE'] = g.params.engine,
44+
},
45+
})
46+
g.cluster:start()
47+
48+
-- wait for storages to initialize
49+
all_storages_initialized = wait_storages_init(g)
50+
end)
51+
52+
pgroup.after_all(function(g)
53+
helpers.stop_cluster(g.cluster)
54+
fio.rmtree(g.cluster.datadir)
55+
end)
56+
57+
pgroup.test_crud_storage_status_of_stopped_servers = function(g)
58+
t.assert_equals(all_storages_initialized, true)
59+
60+
g.cluster:server("s2-replica"):stop()
61+
62+
local results, err = g.cluster.main_server.net_box:call("crud.get_storages_state", {})
63+
t.assert_equals(err, nil, "Error getting storags states")
64+
65+
t.assert_equals(results[helpers.uuid('b', 1)].storage_initialized, true)
66+
t.assert_equals(results[helpers.uuid('c', 1)].storage_initialized, true)
67+
t.assert_equals(results[helpers.uuid('c', 2)].storage_initialized, false)
68+
end
69+
70+
pgroup.test_disabled_storage_role = function(g)
71+
t.assert_equals(all_storages_initialized, true)
72+
73+
-- stop crud storage role on one replica
74+
local server = g.cluster:server("s1-replica")
75+
local results = server.net_box:eval([[
76+
local serviceregistry = require("cartridge.service-registry")
77+
serviceregistry.get("crud-storage").stop()
78+
return true
79+
]])
80+
81+
t.assert_not_equals(results, nil, "Fail to disable storage role")
82+
83+
local results, err = g.cluster.main_server.net_box:call("crud.get_storages_state", {})
84+
t.assert_equals(err, nil, "Error getting storags states")
85+
86+
t.assert_equals(results[helpers.uuid('b', 1)].storage_initialized, true)
87+
t.assert_equals(results[helpers.uuid('b', 2)].storage_initialized, false)
88+
t.assert_equals(results[helpers.uuid('c', 1)].storage_initialized, true)
89+
end

0 commit comments

Comments
 (0)