Skip to content

Commit 5e97f88

Browse files
committed
select: first dirty merger integraion attempt
In scope of #33
1 parent dcb04ff commit 5e97f88

File tree

2 files changed

+141
-248
lines changed

2 files changed

+141
-248
lines changed

crud/select.lua

+44-75
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ local checks = require('checks')
22
local errors = require('errors')
33
local vshard = require('vshard')
44

5-
local call = require('crud.common.call')
65
local registry = require('crud.common.registry')
76
local utils = require('crud.common.utils')
87
local dev_checks = require('crud.common.dev_checks')
98

109
local select_conditions = require('crud.select.conditions')
1110
local select_plan = require('crud.select.plan')
1211
local select_executor = require('crud.select.executor')
13-
local select_comparators = require('crud.select.comparators')
1412
local select_filters = require('crud.select.filters')
1513

1614
local Iterator = require('crud.select.iterator')
@@ -21,9 +19,20 @@ local GetReplicasetsError = errors.new_class('GetReplicasetsError')
2119
local select_module = {}
2220

2321
local SELECT_FUNC_NAME = '__select'
24-
2522
local DEFAULT_BATCH_SIZE = 100
2623

24+
local function make_cursor(opts, data)
25+
local last_tuple = data[#data]
26+
27+
return {
28+
scan_value = opts.scan_value,
29+
after_tuple = last_tuple,
30+
iter = opts.iter,
31+
limit = opts.limit,
32+
scan_condition_num = opts.scan_condition_num,
33+
}
34+
end
35+
2736
local function call_select_on_storage(space_name, index_id, conditions, opts)
2837
dev_checks('string', 'number', '?table', {
2938
scan_value = 'table',
@@ -62,7 +71,14 @@ local function call_select_on_storage(space_name, index_id, conditions, opts)
6271
return nil, SelectError:new("Failed to execute select: %s", err)
6372
end
6473

65-
return tuples
74+
local cursor
75+
if #tuples < opts.limit or opts.limit == 0 then
76+
cursor = {is_end = true}
77+
else
78+
cursor = make_cursor(opts, tuples)
79+
end
80+
81+
return cursor, tuples
6682
end
6783

6884
function select_module.init()
@@ -71,39 +87,6 @@ function select_module.init()
7187
})
7288
end
7389

74-
local function select_iteration(space_name, plan, opts)
75-
dev_checks('string', '?table', {
76-
after_tuple = '?table',
77-
replicasets = 'table',
78-
timeout = '?number',
79-
limit = 'number',
80-
})
81-
82-
-- call select on storages
83-
local storage_select_opts = {
84-
scan_value = plan.scan_value,
85-
after_tuple = opts.after_tuple,
86-
iter = plan.iter,
87-
limit = opts.limit,
88-
scan_condition_num = plan.scan_condition_num,
89-
}
90-
91-
local storage_select_args = {
92-
space_name, plan.index_id, plan.conditions, storage_select_opts,
93-
}
94-
95-
local results, err = call.ro(SELECT_FUNC_NAME, storage_select_args, {
96-
replicasets = opts.replicasets,
97-
timeout = opts.timeout,
98-
})
99-
100-
if err ~= nil then
101-
return nil, err
102-
end
103-
104-
return results
105-
end
106-
10790
local function get_replicasets_by_sharding_key(sharding_key)
10891
local bucket_id = vshard.router.bucket_id_strcrc32(sharding_key)
10992
local replicaset, err = vshard.router.route(bucket_id)
@@ -130,8 +113,6 @@ local function build_select_iterator(space_name, user_conditions, opts)
130113
return nil, SelectError:new("batch_size should be > 0")
131114
end
132115

133-
local batch_size = opts.batch_size or DEFAULT_BATCH_SIZE
134-
135116
-- check conditions
136117
local conditions, err = select_conditions.parse(user_conditions)
137118
if err ~= nil then
@@ -169,33 +150,21 @@ local function build_select_iterator(space_name, user_conditions, opts)
169150
replicasets_to_select = get_replicasets_by_sharding_key(plan.sharding_key)
170151
end
171152

172-
-- generate tuples comparator
173-
local scan_index = space.index[plan.index_id]
174-
local primary_index = space.index[0]
175-
local cmp_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts)
176-
local cmp_operator = select_comparators.get_cmp_operator(plan.iter)
177-
local tuples_comparator, err = select_comparators.gen_tuples_comparator(
178-
cmp_operator, cmp_key_parts
179-
)
180-
if err ~= nil then
181-
return nil, SelectError:new("Failed to generate comparator function: %s", err)
182-
end
183-
184-
local iter = Iterator.new({
185-
space_name = space_name,
186-
space_format = space_format,
187-
iteration_func = select_iteration,
188-
comparator = tuples_comparator,
189-
190-
plan = plan,
191-
192-
batch_size = batch_size,
193-
replicasets = replicasets_to_select,
153+
local select_opts = {
154+
scan_value = plan.scan_value,
155+
after_tuple = plan.after_tuple,
156+
iter = plan.iter,
157+
limit = opts.batch_size or DEFAULT_BATCH_SIZE,
158+
scan_condition_num = plan.scan_condition_num,
159+
}
194160

195-
timeout = opts.timeout,
196-
})
161+
local iter = Iterator.new(replicasets_to_select,
162+
space_name, plan.index_id, plan.conditions, select_opts)
197163

198-
return iter
164+
return {
165+
iter = iter,
166+
space_format = space_format,
167+
}
199168
end
200169

201170
function select_module.pairs(space_name, user_conditions, opts)
@@ -223,12 +192,14 @@ function select_module.pairs(space_name, user_conditions, opts)
223192
error(string.format("Failed to generate iterator: %s", err))
224193
end
225194

226-
local gen = function(_, iter)
227-
if not iter:has_next() then
195+
local tuple, _
196+
local merger_gen = iter.iter:pairs()
197+
local gen = function()
198+
_, tuple = merger_gen.gen(nil, merger_gen.state)
199+
if tuple == nil then
228200
return nil
229201
end
230202

231-
local tuple, err = iter:get()
232203
if err ~= nil then
233204
error(string.format("Failed to get next object: %s", err))
234205
end
@@ -241,7 +212,7 @@ function select_module.pairs(space_name, user_conditions, opts)
241212
return iter, obj
242213
end
243214

244-
return gen, nil, iter
215+
return gen
245216
end
246217

247218
function select_module.call(space_name, user_conditions, opts)
@@ -273,17 +244,15 @@ function select_module.call(space_name, user_conditions, opts)
273244

274245
local tuples = {}
275246

276-
while iter:has_next() do
277-
local tuple, err = iter:get()
278-
if err ~= nil then
279-
return nil, SelectError:new("Failed to get next object: %s", err)
280-
end
281-
282-
if tuple == nil then
247+
local count = 0
248+
local first = opts.first and math.abs(opts.first)
249+
for _, tuple in iter.iter:pairs() do
250+
if first ~= nil and count >= first then
283251
break
284252
end
285253

286254
table.insert(tuples, tuple)
255+
count = count + 1
287256
end
288257

289258
if opts.first ~= nil and opts.first < 0 then

0 commit comments

Comments
 (0)