Skip to content

Commit 5388986

Browse files
committed
select: first dirty merger integraion attempt
In scope of #33
1 parent c5fc183 commit 5388986

File tree

4 files changed

+152
-504
lines changed

4 files changed

+152
-504
lines changed

crud/common/heap.lua

-118
This file was deleted.

crud/select.lua

+57-86
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ local checks = require('checks')
22
local errors = require('errors')
33
local vshard = require('vshard')
44

5-
local call = require('crud.common.call')
65
local utils = require('crud.common.utils')
76
local sharding = require('crud.common.sharding')
87
local dev_checks = require('crud.common.dev_checks')
98

109
local select_conditions = require('crud.select.conditions')
1110
local select_plan = require('crud.select.plan')
1211
local select_executor = require('crud.select.executor')
13-
local select_comparators = require('crud.select.comparators')
1412
local select_filters = require('crud.select.filters')
1513

1614
local Iterator = require('crud.select.iterator')
@@ -20,10 +18,20 @@ local GetReplicasetsError = errors.new_class('GetReplicasetsError')
2018

2119
local select_module = {}
2220

23-
local SELECT_FUNC_NAME = '_crud.select_on_storage'
24-
2521
local DEFAULT_BATCH_SIZE = 100
2622

23+
local function make_cursor(opts, data)
24+
local last_tuple = data[#data]
25+
26+
return {
27+
scan_value = opts.scan_value,
28+
after_tuple = last_tuple,
29+
iter = opts.iter,
30+
limit = opts.limit,
31+
scan_condition_num = opts.scan_condition_num,
32+
}
33+
end
34+
2735
local function select_on_storage(space_name, index_id, conditions, opts)
2836
dev_checks('string', 'number', '?table', {
2937
scan_value = 'table',
@@ -62,46 +70,20 @@ local function select_on_storage(space_name, index_id, conditions, opts)
6270
return nil, SelectError:new("Failed to execute select: %s", err)
6371
end
6472

65-
return tuples
73+
local cursor
74+
if #tuples < opts.limit or opts.limit == 0 then
75+
cursor = {is_end = true}
76+
else
77+
cursor = make_cursor(opts, tuples)
78+
end
79+
80+
return cursor, tuples
6681
end
6782

6883
function select_module.init()
6984
_G._crud.select_on_storage = select_on_storage
7085
end
7186

72-
local function select_iteration(space_name, plan, opts)
73-
dev_checks('string', '?table', {
74-
after_tuple = '?table',
75-
replicasets = 'table',
76-
timeout = '?number',
77-
limit = 'number',
78-
})
79-
80-
-- call select on storages
81-
local storage_select_opts = {
82-
scan_value = plan.scan_value,
83-
after_tuple = opts.after_tuple,
84-
iter = plan.iter,
85-
limit = opts.limit,
86-
scan_condition_num = plan.scan_condition_num,
87-
}
88-
89-
local storage_select_args = {
90-
space_name, plan.index_id, plan.conditions, storage_select_opts,
91-
}
92-
93-
local results, err = call.ro(SELECT_FUNC_NAME, storage_select_args, {
94-
replicasets = opts.replicasets,
95-
timeout = opts.timeout,
96-
})
97-
98-
if err ~= nil then
99-
return nil, err
100-
end
101-
102-
return results
103-
end
104-
10587
local function get_replicasets_by_sharding_key(bucket_id)
10688
local replicaset, err = vshard.router.route(bucket_id)
10789
if replicaset == nil then
@@ -128,8 +110,6 @@ local function build_select_iterator(space_name, user_conditions, opts)
128110
return nil, SelectError:new("batch_size should be > 0")
129111
end
130112

131-
local batch_size = opts.batch_size or DEFAULT_BATCH_SIZE
132-
133113
-- check conditions
134114
local conditions, err = select_conditions.parse(user_conditions)
135115
if err ~= nil then
@@ -165,33 +145,21 @@ local function build_select_iterator(space_name, user_conditions, opts)
165145
replicasets_to_select = get_replicasets_by_sharding_key(bucket_id)
166146
end
167147

168-
-- generate tuples comparator
169-
local scan_index = space.index[plan.index_id]
170-
local primary_index = space.index[0]
171-
local cmp_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts)
172-
local cmp_operator = select_comparators.get_cmp_operator(plan.iter)
173-
local tuples_comparator, err = select_comparators.gen_tuples_comparator(
174-
cmp_operator, cmp_key_parts
175-
)
176-
if err ~= nil then
177-
return nil, SelectError:new("Failed to generate comparator function: %s", err)
178-
end
179-
180-
local iter = Iterator.new({
181-
space_name = space_name,
182-
space_format = space_format,
183-
iteration_func = select_iteration,
184-
comparator = tuples_comparator,
185-
186-
plan = plan,
187-
188-
batch_size = batch_size,
189-
replicasets = replicasets_to_select,
148+
local select_opts = {
149+
scan_value = plan.scan_value,
150+
after_tuple = plan.after_tuple,
151+
iter = plan.iter,
152+
limit = opts.batch_size or DEFAULT_BATCH_SIZE,
153+
scan_condition_num = plan.scan_condition_num,
154+
}
190155

191-
timeout = opts.timeout,
192-
})
156+
local iter = Iterator.new(replicasets_to_select,
157+
space_name, plan.index_id, plan.conditions, select_opts)
193158

194-
return iter
159+
return {
160+
iter = iter,
161+
space_format = space_format,
162+
}
195163
end
196164

197165
function select_module.pairs(space_name, user_conditions, opts)
@@ -222,31 +190,32 @@ function select_module.pairs(space_name, user_conditions, opts)
222190
error(string.format("Failed to generate iterator: %s", err))
223191
end
224192

225-
local gen = function(_, iter)
226-
if not iter:has_next() then
193+
local tuple, _
194+
195+
if opts.use_tomap ~= true then
196+
return iter.iter:pairs()
197+
end
198+
199+
local merger_gen = iter.iter:pairs()
200+
local gen = function()
201+
_, tuple = merger_gen.gen(nil, merger_gen.state)
202+
if tuple == nil then
227203
return nil
228204
end
229205

230-
local tuple, err = iter:get()
206+
local result
207+
result, err = utils.unflatten(tuple, iter.space_format)
231208
if err ~= nil then
232-
error(string.format("Failed to get next object: %s", err))
233-
end
234-
235-
local result = tuple
236-
if opts.use_tomap == true then
237-
result, err = utils.unflatten(tuple, iter.space_format)
238-
if err ~= nil then
239-
error(string.format("Failed to unflatten next object: %s", err))
240-
end
209+
error(string.format("Failed to unflatten next object: %s", err))
241210
end
242211

243212
return iter, result
244213
end
245214

246-
return gen, nil, iter
215+
return gen
247216
end
248217

249-
function select_module.call(space_name, user_conditions, opts)
218+
local function select_module_call_xc(space_name, user_conditions, opts)
250219
checks('string', '?table', {
251220
after = '?table',
252221
first = '?number',
@@ -277,17 +246,15 @@ function select_module.call(space_name, user_conditions, opts)
277246

278247
local tuples = {}
279248

280-
while iter:has_next() do
281-
local tuple, err = iter:get()
282-
if err ~= nil then
283-
return nil, SelectError:new("Failed to get next object: %s", err)
284-
end
285-
286-
if tuple == nil then
249+
local count = 0
250+
local first = opts.first and math.abs(opts.first)
251+
for _, tuple in iter.iter:pairs() do
252+
if first ~= nil and count >= first then
287253
break
288254
end
289255

290256
table.insert(tuples, tuple)
257+
count = count + 1
291258
end
292259

293260
if opts.first ~= nil and opts.first < 0 then
@@ -300,4 +267,8 @@ function select_module.call(space_name, user_conditions, opts)
300267
}
301268
end
302269

270+
function select_module.call(space_name, user_conditions, opts)
271+
return SelectError:pcall(select_module_call_xc, space_name, user_conditions, opts)
272+
end
273+
303274
return select_module

0 commit comments

Comments
 (0)