1
- local fiber = require (' fiber' )
2
1
local errors = require (' errors' )
3
2
4
- local call = require (' crud.common.call' )
5
- local const = require (' crud.common.const' )
6
3
local dev_checks = require (' crud.common.dev_checks' )
7
- local cache = require (' crud.common.sharding_key_cache ' )
4
+ local cache = require (' crud.common.sharding.sharding_metadata_cache ' )
8
5
local utils = require (' crud.common.utils' )
9
6
10
7
local ShardingKeyError = errors .new_class (" ShardingKeyError" , {capture_stack = false })
11
- local FetchShardingKeyError = errors .new_class (' FetchShardingKeyError' , {capture_stack = false })
12
8
local WrongShardingConfigurationError = errors .new_class (' WrongShardingConfigurationError' , {capture_stack = false })
13
9
14
- local FETCH_FUNC_NAME = ' _crud.fetch_on_storage'
15
-
16
10
local sharding_key_module = {}
17
11
18
- -- Function decorator that is used to prevent _fetch_on_router() from being
19
- -- called concurrently by different fibers.
20
- local function locked (f )
21
- dev_checks (' function' )
22
-
23
- return function (timeout , ...)
24
- local timeout_deadline = fiber .clock () + timeout
25
- local ok = cache .fetch_lock :put (true , timeout )
26
- -- channel:put() returns false in two cases: when timeout is exceeded
27
- -- or channel has been closed. However error message describes only
28
- -- first reason, I'm not sure we need to disclose to users such details
29
- -- like problems with synchronization objects.
30
- if not ok then
31
- return FetchShardingKeyError :new (
32
- " Timeout for fetching sharding key is exceeded" )
33
- end
34
- local timeout = timeout_deadline - fiber .clock ()
35
- local status , err = pcall (f , timeout , ... )
36
- cache .fetch_lock :get ()
37
- if not status or err ~= nil then
38
- return err
39
- end
40
- end
41
- end
42
-
43
12
-- Build a structure similar to index, but it is not a real index object,
44
13
-- it contains only parts key with fieldno's.
45
14
local function as_index_object (space_name , space_format , sharding_key_def )
@@ -59,104 +28,6 @@ local function as_index_object(space_name, space_format, sharding_key_def)
59
28
return {parts = fieldnos }
60
29
end
61
30
62
- -- Return a map with metadata or nil when space box.space._ddl_sharding_key is
63
- -- not available on storage.
64
- function sharding_key_module .fetch_on_storage ()
65
- local sharding_key_space = box .space ._ddl_sharding_key
66
- if sharding_key_space == nil then
67
- return nil
68
- end
69
-
70
- local SPACE_NAME_FIELDNO = 1
71
- local SPACE_SHARDING_KEY_FIELDNO = 2
72
- local metadata_map = {}
73
- for _ , tuple in sharding_key_space :pairs () do
74
- local space_name = tuple [SPACE_NAME_FIELDNO ]
75
- local sharding_key_def = tuple [SPACE_SHARDING_KEY_FIELDNO ]
76
- local space_format = box .space [space_name ]:format ()
77
- metadata_map [space_name ] = {
78
- sharding_key_def = sharding_key_def ,
79
- space_format = space_format ,
80
- }
81
- end
82
-
83
- return metadata_map
84
- end
85
-
86
- -- Under high load we may get a case when more than one fiber will fetch
87
- -- metadata from storages. It is not good from performance point of view.
88
- -- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches
89
- -- a sharding metadata by a single one, other fibers will wait while
90
- -- cache.fetch_lock become unlocked during timeout passed to
91
- -- _fetch_on_router().
92
- local _fetch_on_router = locked (function (timeout )
93
- dev_checks (' number' )
94
-
95
- if cache .sharding_key_as_index_obj_map ~= nil then
96
- return
97
- end
98
-
99
- local metadata_map , err = call .any (FETCH_FUNC_NAME , {}, {
100
- timeout = timeout
101
- })
102
- if err ~= nil then
103
- return err
104
- end
105
- if metadata_map == nil then
106
- cache .sharding_key_as_index_obj_map = {}
107
- return
108
- end
109
-
110
- cache .sharding_key_as_index_obj_map = {}
111
- for space_name , metadata in pairs (metadata_map ) do
112
- local sharding_key_as_index_obj , err = as_index_object (space_name ,
113
- metadata .space_format ,
114
- metadata .sharding_key_def )
115
- if err ~= nil then
116
- return err
117
- end
118
- cache .sharding_key_as_index_obj_map [space_name ] = sharding_key_as_index_obj
119
- end
120
- end )
121
-
122
- -- Get sharding index for a certain space.
123
- --
124
- -- Return:
125
- -- - sharding key as index object, when sharding key definition found on
126
- -- storage.
127
- -- - nil, when sharding key definition was not found on storage. Pay attention
128
- -- that nil without error is a successfull return value.
129
- -- - nil and error, when something goes wrong on fetching attempt.
130
- --
131
- function sharding_key_module .fetch_on_router (space_name , timeout )
132
- dev_checks (' string' , ' ?number' )
133
-
134
- if cache .sharding_key_as_index_obj_map ~= nil then
135
- return cache .sharding_key_as_index_obj_map [space_name ]
136
- end
137
-
138
- local timeout = timeout or const .FETCH_SHARDING_KEY_TIMEOUT
139
- local err = _fetch_on_router (timeout )
140
- if err ~= nil then
141
- if cache .sharding_key_as_index_obj_map ~= nil then
142
- return cache .sharding_key_as_index_obj_map [space_name ]
143
- end
144
- return nil , err
145
- end
146
-
147
- if cache .sharding_key_as_index_obj_map ~= nil then
148
- return cache .sharding_key_as_index_obj_map [space_name ]
149
- end
150
-
151
- return nil , FetchShardingKeyError :new (
152
- " Fetching sharding key for space '%s' is failed" , space_name )
153
- end
154
-
155
- function sharding_key_module .update_cache (space_name )
156
- cache .drop_caches ()
157
- return sharding_key_module .fetch_on_router (space_name )
158
- end
159
-
160
31
-- Make sure sharding key definition is a part of primary key.
161
32
local function is_part_of_pk (space_name , primary_index_parts , sharding_key_as_index_obj )
162
33
dev_checks (' string' , ' table' , ' table' )
204
75
205
76
-- Extract sharding key from pk.
206
77
-- Returns a table with sharding key or pair of nil and error.
207
- function sharding_key_module .extract_from_pk (space_name , primary_index_parts , primary_key , timeout )
208
- dev_checks (' string' , ' table' , ' ? ' , ' ?number ' )
78
+ function sharding_key_module .extract_from_pk (space_name , sharding_key_as_index_obj , primary_index_parts , primary_key )
79
+ dev_checks (' string' , ' ? table' , ' table ' , ' ?' )
209
80
210
- local sharding_key_as_index_obj , err = sharding_key_module .fetch_on_router (space_name , timeout )
211
- if err ~= nil then
212
- return nil , err
213
- end
214
81
if sharding_key_as_index_obj == nil then
215
82
return primary_key
216
83
end
@@ -229,8 +96,22 @@ function sharding_key_module.extract_from_pk(space_name, primary_index_parts, pr
229
96
return extract_from_index (primary_key , primary_index_parts , sharding_key_as_index_obj )
230
97
end
231
98
232
- function sharding_key_module .init ()
233
- _G ._crud .fetch_on_storage = sharding_key_module .fetch_on_storage
99
+ function sharding_key_module .construct_as_index_obj_cache (metadata_map )
100
+ dev_checks (' table' )
101
+
102
+ cache .sharding_key_as_index_obj_map = {}
103
+ for space_name , metadata in pairs (metadata_map ) do
104
+ if metadata .sharding_key_def ~= nil then
105
+ local sharding_key_as_index_obj , err = as_index_object (space_name ,
106
+ metadata .space_format ,
107
+ metadata .sharding_key_def )
108
+ if err ~= nil then
109
+ return err
110
+ end
111
+
112
+ cache .sharding_key_as_index_obj_map [space_name ] = sharding_key_as_index_obj
113
+ end
114
+ end
234
115
end
235
116
236
117
sharding_key_module .internal = {
0 commit comments