Skip to content

Commit 34d43df

Browse files
Extract sharding key from conditions
PR #181 introduced support of DDL sharding keys. But if sharding key hasn't got a separate index in schema, select with equal conditions for all required sharding key fields still led to map-reduce instead of a single storage call. This patch introduces impoved support of sharding keys extraction and fixes the issue. Closes #213
1 parent 3ac211c commit 34d43df

File tree

6 files changed

+166
-29
lines changed

6 files changed

+166
-29
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2020

2121
* Use tuple-merger backed select implementation on tarantool 2.10+ (it gives
2222
less pressure on Lua GC).
23+
* DDL sharding key now can be extracted from select conditions even if
24+
there are no separate index.
2325

2426
## [0.9.0] - 20-10-21
2527

README.md

-2
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,6 @@ Current limitations for using custom sharding key:
101101
updated on storages, see [#212](https://github.com/tarantool/crud/issues/212).
102102
However it is possible to do it manually with
103103
`require('crud.sharding_key').update_sharding_keys_cache()`.
104-
- CRUD select may lead map reduce in some cases, see
105-
[#213](https://github.com/tarantool/crud/issues/213).
106104
- No support of JSON path for sharding key, see
107105
[#219](https://github.com/tarantool/crud/issues/219).
108106

crud/select/plan.lua

+61-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
local errors = require('errors')
22

33
local compare_conditions = require('crud.compare.conditions')
4+
local sharding_key_module = require('crud.common.sharding_key')
45
local utils = require('crud.common.utils')
56
local dev_checks = require('crud.common.dev_checks')
67

@@ -48,6 +49,46 @@ local function get_index_for_condition(space_indexes, space_format, condition)
4849
end
4950
end
5051

52+
local function extract_sharding_key_from_conditions(conditions, ddl_sharding_key, space_indexes, space_field_positions)
53+
if ddl_sharding_key == nil then
54+
return nil
55+
end
56+
57+
-- If name is both valid index name and field name,
58+
-- it is interpreted as index name.
59+
local filled_fields = {}
60+
for _, condition in ipairs(conditions) do
61+
if condition.operator ~= compare_conditions.operators.EQ then
62+
goto continue
63+
end
64+
65+
local index = space_indexes[condition.operand]
66+
if index ~= nil then
67+
for i, part in ipairs(index.parts) do
68+
filled_fields[part.fieldno] = condition.values[i]
69+
end
70+
71+
goto continue
72+
end
73+
74+
local fieldno = space_field_positions[condition.operand]
75+
filled_fields[fieldno] = condition.values[1]
76+
77+
::continue::
78+
end
79+
80+
local sharding_key = {}
81+
for i, v in ipairs(ddl_sharding_key.parts) do
82+
if filled_fields[v.fieldno] == nil then
83+
return nil
84+
end
85+
86+
sharding_key[i] = filled_fields[v.fieldno]
87+
end
88+
89+
return sharding_key
90+
end
91+
5192
local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
5293
if #scan_value < #sharding_index.parts then
5394
return nil
@@ -90,7 +131,7 @@ end
90131
-- and these fields are ordered by field_names + primary key + scan key
91132
-- this order can be differ from order in space format
92133
-- so we need to cast after_tuple to space format for scrolling tuples on storage
93-
local function construct_after_tuple_by_fields(space_format, field_names, tuple)
134+
local function construct_after_tuple_by_fields(space_field_positions, field_names, tuple)
94135
if tuple == nil then
95136
return nil
96137
end
@@ -99,15 +140,10 @@ local function construct_after_tuple_by_fields(space_format, field_names, tuple)
99140
return tuple
100141
end
101142

102-
local positions = {}
103143
local transformed_tuple = {}
104144

105-
for i, field in ipairs(space_format) do
106-
positions[field.name] = i
107-
end
108-
109145
for i, field_name in ipairs(field_names) do
110-
local fieldno = positions[field_name]
146+
local fieldno = space_field_positions[field_name]
111147
if fieldno == nil then
112148
return nil, FilterFieldsError:new(
113149
'Space format doesn\'t contain field named %q', field_name
@@ -145,6 +181,12 @@ function select_plan.new(space, conditions, opts)
145181
local scan_value
146182
local scan_condition_num
147183

184+
local space_field_positions = {}
185+
186+
for i, field in ipairs(space_format) do
187+
space_field_positions[field.name] = i
188+
end
189+
148190
-- search index to iterate over
149191
for i, condition in ipairs(conditions) do
150192
scan_index = get_index_for_condition(space_indexes, space_format, condition)
@@ -177,7 +219,7 @@ function select_plan.new(space, conditions, opts)
177219
-- handle opts.first
178220
local total_tuples_count
179221
local scan_after_tuple, err = construct_after_tuple_by_fields(
180-
space_format, field_names, opts.after_tuple
222+
space_field_positions, field_names, opts.after_tuple
181223
)
182224
if err ~= nil then
183225
return nil, err
@@ -235,6 +277,17 @@ function select_plan.new(space, conditions, opts)
235277
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
236278
end
237279

280+
if sharding_key == nil then
281+
-- Ignore possible errors to preserve old behavior
282+
-- since here it affects only extracting sharding_key from
283+
-- conditions and it should not be critical to select call.
284+
local ddl_sharding_key = sharding_key_module.fetch_on_router(space_name)
285+
286+
sharding_key = extract_sharding_key_from_conditions(
287+
conditions, ddl_sharding_key, space_indexes, space_field_positions
288+
)
289+
end
290+
238291
local plan = {
239292
conditions = conditions,
240293
space_name = space_name,

test/entrypoint/srv_ddl.lua

+15
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ package.preload['customers-storage'] = function()
6161
{path = 'name', is_nullable = false, type = 'string'},
6262
},
6363
}
64+
local age_index = {
65+
name = 'age',
66+
type = 'TREE',
67+
unique = false,
68+
parts = {
69+
{path = 'age', is_nullable = false, type = 'number'},
70+
},
71+
}
6472
local secondary_index = {
6573
name = 'secondary',
6674
type = 'TREE',
@@ -100,13 +108,20 @@ package.preload['customers-storage'] = function()
100108
table.insert(customers_age_key_schema.indexes, primary_index)
101109
table.insert(customers_age_key_schema.indexes, bucket_id_index)
102110

111+
local customers_name_age_key_different_indexes_schema = table.deepcopy(customers_schema)
112+
customers_name_age_key_different_indexes_schema.sharding_key = {'name', 'age'}
113+
table.insert(customers_name_age_key_different_indexes_schema.indexes, primary_index)
114+
table.insert(customers_name_age_key_different_indexes_schema.indexes, bucket_id_index)
115+
table.insert(customers_name_age_key_different_indexes_schema.indexes, age_index)
116+
103117
local schema = {
104118
spaces = {
105119
customers_name_key = customers_name_key_schema,
106120
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
107121
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
108122
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
109123
customers_age_key = customers_age_key_schema,
124+
customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema,
110125
}
111126
}
112127

test/helpers/storage_stat.lua

+12
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,16 @@ function storage_stat.diff(a, b)
9595
return diff
9696
end
9797

98+
-- Accepts collect (or diff) return value and returns
99+
-- total number of select requests across all storages.
100+
function storage_stat.total(stats)
101+
local total = 0
102+
103+
for _, stat in pairs(stats) do
104+
total = total + (stat.select_requests or 0)
105+
end
106+
107+
return total
108+
end
109+
98110
return storage_stat

test/integration/ddl_sharding_key_test.lua

+76-19
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pgroup.before_each(function(g)
5151
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_non_uniq_index')
5252
helpers.truncate_space_on_cluster(g.cluster, 'customers_secondary_idx_name_key')
5353
helpers.truncate_space_on_cluster(g.cluster, 'customers_age_key')
54+
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_age_key_different_indexes')
5455
end)
5556

5657
pgroup.test_insert_object = function(g)
@@ -279,13 +280,7 @@ pgroup.test_select = function(g)
279280
t.assert_equals(result.rows[1], tuple)
280281
end
281282

282-
-- TODO: After enabling support of sharding keys that are not equal to primary
283-
-- keys, we should handle it differently: it is not enough to look just on scan
284-
-- value, we should traverse all conditions. Now missed cases lead to
285-
-- map-reduce. Will be resolved in #213.
286-
pgroup.test_select_wont_lead_map_reduce = function(g)
287-
local space_name = 'customers_name_key_uniq_index'
288-
283+
local prepare_data_name_sharding_key = function(g, space_name)
289284
local conn_s1 = g.cluster:server('s1-master').net_box
290285
local conn_s2 = g.cluster:server('s2-master').net_box
291286

@@ -301,12 +296,80 @@ pgroup.test_select_wont_lead_map_reduce = function(g)
301296
-- bucket_id is 1161, storage is s-2
302297
local result = conn_s2.space[space_name]:insert({4, 1161, 'James Joyce', 59})
303298
t.assert_not_equals(result, nil)
299+
end
300+
301+
local prepare_data_name_age_sharding_key = function(g, space_name)
302+
local conn_s1 = g.cluster:server('s1-master').net_box
303+
local conn_s2 = g.cluster:server('s2-master').net_box
304+
305+
-- bucket_id is 2310, storage is s-1
306+
local result = conn_s1.space[space_name]:insert({1, 2310, 'Viktor Pelevin', 58})
307+
t.assert_not_equals(result, nil)
308+
-- bucket_id is 63, storage is s-2
309+
local result = conn_s2.space[space_name]:insert({2, 63, 'Isaac Asimov', 72})
310+
t.assert_not_equals(result, nil)
311+
-- bucket_id is 2901, storage is s-1
312+
local result = conn_s1.space[space_name]:insert({3, 2901, 'Aleksandr Solzhenitsyn', 89})
313+
t.assert_not_equals(result, nil)
314+
-- bucket_id is 1365, storage is s-2
315+
local result = conn_s2.space[space_name]:insert({4, 1365, 'James Joyce', 59})
316+
t.assert_not_equals(result, nil)
317+
end
318+
319+
local cases = {
320+
select_for_indexed_sharding_key = {
321+
space_name = 'customers_name_key_uniq_index',
322+
prepare_data = prepare_data_name_sharding_key,
323+
conditions = {{'==', 'name', 'Viktor Pelevin'}},
324+
},
325+
select_for_sharding_key_as_index_part = {
326+
space_name = 'customers_name_key',
327+
prepare_data = prepare_data_name_sharding_key,
328+
conditions = {{'==', 'name', 'Viktor Pelevin'}},
329+
},
330+
select_for_sharding_key_as_several_indexes_parts = {
331+
space_name = 'customers_name_age_key_different_indexes',
332+
prepare_data = prepare_data_name_age_sharding_key,
333+
conditions = {{'==', 'name', 'Viktor Pelevin'}, {'==', 'age', 58}},
334+
},
335+
select_by_index_cond_for_sharding_key_as_several_indexes_parts = {
336+
space_name = 'customers_name_age_key_different_indexes',
337+
prepare_data = prepare_data_name_age_sharding_key,
338+
conditions = {{'==', 'id', {1, 'Viktor Pelevin'}}, {'==', 'age', 58}},
339+
}
340+
}
341+
342+
for name, case in pairs(cases) do
343+
pgroup[('test_%s_wont_lead_to_map_reduce'):format(name)] = function(g)
344+
case.prepare_data(g, case.space_name)
345+
346+
local stat_a = storage_stat.collect(g.cluster)
347+
348+
local result, err = g.cluster.main_server.net_box:call('crud.select', {
349+
case.space_name, case.conditions
350+
})
351+
t.assert_equals(err, nil)
352+
t.assert_not_equals(result, nil)
353+
t.assert_equals(#result.rows, 1)
354+
355+
local stat_b = storage_stat.collect(g.cluster)
356+
357+
-- Check a number of select() requests made by CRUD on cluster's storages
358+
-- after calling select() on a router. Make sure only a single storage has
359+
-- a single select() request. Otherwise we lead to map-reduce.
360+
local stats = storage_stat.diff(stat_b, stat_a)
361+
t.assert_equals(storage_stat.total(stats), 1, 'Select request was not a map reduce')
362+
end
363+
end
364+
365+
pgroup.test_select_for_part_of_sharding_key_will_lead_to_map_reduce = function(g)
366+
local space_name = 'customers_name_age_key_different_indexes'
367+
prepare_data_name_age_sharding_key(g, space_name)
304368

305369
local stat_a = storage_stat.collect(g.cluster)
306370

307-
-- Select a tuple with name 'Viktor Pelevin'.
308371
local result, err = g.cluster.main_server.net_box:call('crud.select', {
309-
space_name, {{'==', 'name', 'Viktor Pelevin'}}
372+
space_name, {{'==', 'age', 58}},
310373
})
311374
t.assert_equals(err, nil)
312375
t.assert_not_equals(result, nil)
@@ -315,16 +378,10 @@ pgroup.test_select_wont_lead_map_reduce = function(g)
315378
local stat_b = storage_stat.collect(g.cluster)
316379

317380
-- Check a number of select() requests made by CRUD on cluster's storages
318-
-- after calling select() on a router. Make sure only a single storage has
319-
-- a single select() request. Otherwise we lead map-reduce.
320-
t.assert_equals(storage_stat.diff(stat_b, stat_a), {
321-
['s-1'] = {
322-
select_requests = 0,
323-
},
324-
['s-2'] = {
325-
select_requests = 1,
326-
},
327-
})
381+
-- after calling select() on a router. Make sure it was a map-reduce
382+
-- since we do not have sharding key values in conditions.
383+
local stats = storage_stat.diff(stat_b, stat_a)
384+
t.assert_equals(storage_stat.total(stats), 2, 'Select request was a map reduce')
328385
end
329386

330387
pgroup.test_select_secondary_idx = function(g)

0 commit comments

Comments
 (0)