Skip to content

Commit 98d555b

Browse files
committed
Copy the content of sharding_key.lua file to sharding_metadata.lua file
PR #181 introduced support of DDL sharding keys. Implementation of sharding keys support contains methods that are common to support sharding keys and sharding functions. That's why a separate file `sharding_metadata.lua` was created to contain common methods. In this commit content of `sharding_key.lua` file is coppied to `sharding_metadata.lua` file to simplify a reviewer's life and display the history of changes relative to PR #181 in the following commits. Part of #237
1 parent f6d00a9 commit 98d555b

File tree

11 files changed

+158
-16
lines changed

11 files changed

+158
-16
lines changed

crud.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ local select = require('crud.select')
1212
local truncate = require('crud.truncate')
1313
local len = require('crud.len')
1414
local borders = require('crud.borders')
15-
local sharding_key = require('crud.common.sharding_key')
15+
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
1616
local utils = require('crud.common.utils')
1717

1818
local crud = {}
@@ -114,7 +114,7 @@ function crud.init_storage()
114114
truncate.init()
115115
len.init()
116116
borders.init()
117-
sharding_key.init()
117+
sharding_metadata.init()
118118
end
119119

120120
function crud.init_router()

crud/common/sharding/init.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ local errors = require('errors')
44
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
55

66
local utils = require('crud.common.utils')
7-
local sharding_key_module = require('crud.common.sharding_key')
7+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
88

99
local sharding = {}
1010

@@ -22,7 +22,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
2222
end
2323

2424
local sharding_index_parts = space.index[0].parts
25-
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space.name)
25+
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space.name)
2626
if err ~= nil then
2727
return nil, err
2828
end
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
local fiber = require('fiber')
2+
local errors = require('errors')
3+
4+
local call = require('crud.common.call')
5+
local const = require('crud.common.const')
6+
local dev_checks = require('crud.common.dev_checks')
7+
local cache = require('crud.common.sharding_key_cache')
8+
9+
local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false})
10+
11+
local FETCH_FUNC_NAME = '_crud.fetch_on_storage'
12+
13+
local sharding_key_module = {}
14+
15+
-- Function decorator that is used to prevent _fetch_on_router() from being
16+
-- called concurrently by different fibers.
17+
local function locked(f)
18+
dev_checks('function')
19+
20+
return function(timeout, ...)
21+
local timeout_deadline = fiber.clock() + timeout
22+
local ok = cache.fetch_lock:put(true, timeout)
23+
-- channel:put() returns false in two cases: when timeout is exceeded
24+
-- or channel has been closed. However error message describes only
25+
-- first reason, I'm not sure we need to disclose to users such details
26+
-- like problems with synchronization objects.
27+
if not ok then
28+
return FetchShardingKeyError:new(
29+
"Timeout for fetching sharding key is exceeded")
30+
end
31+
local timeout = timeout_deadline - fiber.clock()
32+
local status, err = pcall(f, timeout, ...)
33+
cache.fetch_lock:get()
34+
if not status or err ~= nil then
35+
return err
36+
end
37+
end
38+
end
39+
40+
-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
41+
-- not available on storage.
42+
function sharding_key_module.fetch_on_storage()
43+
local sharding_key_space = box.space._ddl_sharding_key
44+
if sharding_key_space == nil then
45+
return nil
46+
end
47+
48+
local SPACE_NAME_FIELDNO = 1
49+
local SPACE_SHARDING_KEY_FIELDNO = 2
50+
local metadata_map = {}
51+
for _, tuple in sharding_key_space:pairs() do
52+
local space_name = tuple[SPACE_NAME_FIELDNO]
53+
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
54+
local space_format = box.space[space_name]:format()
55+
metadata_map[space_name] = {
56+
sharding_key_def = sharding_key_def,
57+
space_format = space_format,
58+
}
59+
end
60+
61+
return metadata_map
62+
end
63+
64+
-- Under high load we may get a case when more than one fiber will fetch
65+
-- metadata from storages. It is not good from performance point of view.
66+
-- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches
67+
-- a sharding metadata by a single one, other fibers will wait while
68+
-- cache.fetch_lock become unlocked during timeout passed to
69+
-- _fetch_on_router().
70+
local _fetch_on_router = locked(function(timeout)
71+
dev_checks('number')
72+
73+
if cache.sharding_key_as_index_obj_map ~= nil then
74+
return
75+
end
76+
77+
local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, {
78+
timeout = timeout
79+
})
80+
if err ~= nil then
81+
return err
82+
end
83+
if metadata_map == nil then
84+
cache.sharding_key_as_index_obj_map = {}
85+
return
86+
end
87+
88+
cache.sharding_key_as_index_obj_map = {}
89+
for space_name, metadata in pairs(metadata_map) do
90+
local sharding_key_as_index_obj, err = as_index_object(space_name,
91+
metadata.space_format,
92+
metadata.sharding_key_def)
93+
if err ~= nil then
94+
return err
95+
end
96+
cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj
97+
end
98+
end)
99+
100+
-- Get sharding index for a certain space.
101+
--
102+
-- Return:
103+
-- - sharding key as index object, when sharding key definition found on
104+
-- storage.
105+
-- - nil, when sharding key definition was not found on storage. Pay attention
106+
-- that nil without error is a successfull return value.
107+
-- - nil and error, when something goes wrong on fetching attempt.
108+
--
109+
function sharding_key_module.fetch_on_router(space_name, timeout)
110+
dev_checks('string', '?number')
111+
112+
if cache.sharding_key_as_index_obj_map ~= nil then
113+
return cache.sharding_key_as_index_obj_map[space_name]
114+
end
115+
116+
local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT
117+
local err = _fetch_on_router(timeout)
118+
if err ~= nil then
119+
if cache.sharding_key_as_index_obj_map ~= nil then
120+
return cache.sharding_key_as_index_obj_map[space_name]
121+
end
122+
return nil, err
123+
end
124+
125+
if cache.sharding_key_as_index_obj_map ~= nil then
126+
return cache.sharding_key_as_index_obj_map[space_name]
127+
end
128+
129+
return nil, FetchShardingKeyError:new(
130+
"Fetching sharding key for space '%s' is failed", space_name)
131+
end
132+
133+
function sharding_key_module.update_cache(space_name)
134+
cache.drop_caches()
135+
return sharding_key_module.fetch_on_router(space_name)
136+
end
137+
138+
function sharding_key_module.init()
139+
_G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage
140+
end
141+
142+
return sharding_key_module

crud/delete.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding.init')
8-
local sharding_key_module = require('crud.common.sharding_key')
8+
local sharding_key_module = require('crud.common.sharding.sharding_key')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
1111

crud/get.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding.init')
8-
local sharding_key_module = require('crud.common.sharding_key')
8+
local sharding_key_module = require('crud.common.sharding.sharding_key')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
1111

crud/select/compat/select.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ local sharding = require('crud.common.sharding.init')
77
local dev_checks = require('crud.common.dev_checks')
88
local common = require('crud.select.compat.common')
99
local schema = require('crud.common.schema')
10-
local sharding_key_module = require('crud.common.sharding_key')
10+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
1111

1212
local compare_conditions = require('crud.compare.conditions')
1313
local select_plan = require('crud.select.plan')
@@ -51,7 +51,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
5151
return nil, SelectError:new("Space %q doesn't exist", space_name), true
5252
end
5353
local space_format = space:format()
54-
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
54+
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name)
5555
if err ~= nil then
5656
return nil, err
5757
end

crud/select/compat/select_old.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ local utils = require('crud.common.utils')
88
local sharding = require('crud.common.sharding.init')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
11-
local sharding_key_module = require('crud.common.sharding_key')
11+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
1212

1313
local compare_conditions = require('crud.compare.conditions')
1414
local select_plan = require('crud.select.plan')
@@ -103,7 +103,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
103103
return nil, SelectError:new("Space %q doesn't exist", space_name), true
104104
end
105105
local space_format = space:format()
106-
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
106+
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name)
107107
if err ~= nil then
108108
return nil, err
109109
end

crud/update.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding.init')
8-
local sharding_key_module = require('crud.common.sharding_key')
8+
local sharding_key_module = require('crud.common.sharding.sharding_key')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
1111

test/helper.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,10 @@ end
325325

326326
function helpers.update_cache(cluster, space_name)
327327
return cluster.main_server.net_box:eval([[
328-
local sharding_key = require('crud.common.sharding_key')
328+
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
329329
330330
local space_name = ...
331-
return sharding_key.update_cache(space_name)
331+
return sharding_metadata.update_cache(space_name)
332332
]], {space_name})
333333
end
334334

test/unit/sharding_key_test.lua renamed to test/unit/sharding_metadata_test.lua

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
local t = require('luatest')
2-
local sharding_key_module = require('crud.common.sharding_key')
3-
local cache = require('crud.common.sharding_key_cache')
2+
local sharding_key_module = require('crud.common.sharding.sharding_metadata')
3+
local cache = require('crud.common.sharding.sharding_metadata_cache')
44
local utils = require('crud.common.utils')
55

66
local helpers = require('test.helper')
77

8-
local g = t.group('sharding_key')
8+
local g = t.group('sharding_metadata')
99

1010
g.before_each(function()
1111
local sharding_key_format = {

0 commit comments

Comments
 (0)