Skip to content

Commit 630b8fd

Browse files
committed
select: first dirty merger integraion attempt
In scope of #33
1 parent c7dfdbd commit 630b8fd

File tree

2 files changed

+145
-257
lines changed

2 files changed

+145
-257
lines changed

crud/select.lua

+48-76
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@ local checks = require('checks')
22
local errors = require('errors')
33
local vshard = require('vshard')
44

5-
local call = require('crud.common.call')
65
local registry = require('crud.common.registry')
76
local utils = require('crud.common.utils')
87

98
local select_conditions = require('crud.select.conditions')
109
local select_plan = require('crud.select.plan')
1110
local select_executor = require('crud.select.executor')
12-
local select_comparators = require('crud.select.comparators')
1311
local select_filters = require('crud.select.filters')
1412

1513
local Iterator = require('crud.select.iterator')
@@ -22,9 +20,20 @@ local GetReplicasetsError = errors.new_class('GetReplicasetsError')
2220
local select_module = {}
2321

2422
local SELECT_FUNC_NAME = '__select'
25-
2623
local DEFAULT_BATCH_SIZE = 100
2724

25+
local function make_cursor(opts, data)
26+
local last_tuple = data[#data]
27+
28+
return {
29+
scan_value = opts.scan_value,
30+
after_tuple = last_tuple,
31+
iter = opts.iter,
32+
limit = opts.limit,
33+
scan_condition_num = opts.limit,
34+
}
35+
end
36+
2837
local function call_select_on_storage(space_name, index_id, conditions, opts)
2938
checks('string', 'number', '?table', {
3039
scan_value = 'table',
@@ -63,7 +72,14 @@ local function call_select_on_storage(space_name, index_id, conditions, opts)
6372
return nil, SelectError:new("Failed to execute select: %s", err)
6473
end
6574

66-
return tuples
75+
local cursor
76+
if #tuples < opts.limit or opts.limit == 0 then
77+
cursor = {is_end = true}
78+
else
79+
cursor = make_cursor(opts, tuples)
80+
end
81+
82+
return cursor, tuples
6783
end
6884

6985
function select_module.init()
@@ -72,39 +88,6 @@ function select_module.init()
7288
})
7389
end
7490

75-
local function select_iteration(space_name, plan, opts)
76-
checks('string', '?table', {
77-
after_tuple = '?table',
78-
replicasets = 'table',
79-
timeout = '?number',
80-
limit = 'number',
81-
})
82-
83-
-- call select on storages
84-
local storage_select_opts = {
85-
scan_value = plan.scan_value,
86-
after_tuple = opts.after_tuple,
87-
iter = plan.iter,
88-
limit = opts.limit,
89-
scan_condition_num = plan.scan_condition_num,
90-
}
91-
92-
local storage_select_args = {
93-
space_name, plan.index_id, plan.conditions, storage_select_opts,
94-
}
95-
96-
local results, err = call.ro(SELECT_FUNC_NAME, storage_select_args, {
97-
replicasets = opts.replicasets,
98-
timeout = opts.timeout,
99-
})
100-
101-
if err ~= nil then
102-
return nil, err
103-
end
104-
105-
return results
106-
end
107-
10891
local function get_replicasets_by_sharding_key(sharding_key)
10992
local bucket_id = vshard.router.bucket_id_strcrc32(sharding_key)
11093
local replicaset, err = vshard.router.route(bucket_id)
@@ -131,8 +114,6 @@ local function build_select_iterator(space_name, user_conditions, opts)
131114
return nil, SelectError:new("batch_size should be > 0")
132115
end
133116

134-
local batch_size = opts.batch_size or DEFAULT_BATCH_SIZE
135-
136117
if opts.limit ~= nil and opts.limit < 0 then
137118
return nil, SelectError:new("limit should be >= 0")
138119
end
@@ -173,34 +154,21 @@ local function build_select_iterator(space_name, user_conditions, opts)
173154
-- set after tuple
174155
local after_tuple = utils.flatten(opts.after, space_format)
175156

176-
-- generate tuples comparator
177-
local scan_index = space.index[plan.index_id]
178-
local primary_index = space.index[0]
179-
local cmp_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts)
180-
local cmp_operator = select_comparators.get_cmp_operator(plan.iter)
181-
local tuples_comparator, err = select_comparators.gen_tuples_comparator(
182-
cmp_operator, cmp_key_parts
183-
)
184-
if err ~= nil then
185-
return nil, SelectError:new("Failed to generate comparator function: %s", err)
186-
end
187-
188-
local iter = Iterator.new({
189-
space_name = space_name,
190-
space_format = space_format,
191-
iteration_func = select_iteration,
192-
comparator = tuples_comparator,
193-
194-
plan = plan,
157+
local select_opts = {
158+
scan_value = plan.scan_value,
195159
after_tuple = after_tuple,
160+
iter = plan.iter,
161+
limit = opts.batch_size or DEFAULT_BATCH_SIZE,
162+
scan_condition_num = plan.scan_condition_num,
163+
}
196164

197-
batch_size = batch_size,
198-
replicasets = replicasets_to_select,
165+
local iter = Iterator.new(replicasets_to_select,
166+
space_name, plan.index_id, plan.conditions, select_opts)
199167

200-
timeout = opts.timeout,
201-
})
202-
203-
return iter
168+
return {
169+
iter = iter,
170+
space_format = space_format,
171+
}
204172
end
205173

206174
function select_module.pairs(space_name, user_conditions, opts)
@@ -224,20 +192,23 @@ function select_module.pairs(space_name, user_conditions, opts)
224192
error(string.format("Failed to generate iterator: %s", err))
225193
end
226194

227-
local gen = function(_, iter)
228-
if not iter:has_next() then
195+
local tuple, _
196+
local merger_gen = iter.iter:pairs()
197+
local gen = function()
198+
_, tuple = merger_gen.gen(nil, merger_gen.state)
199+
if tuple == nil then
229200
return nil
230201
end
231202

232-
local obj, err = iter:get()
203+
local obj, err = utils.unflatten(tuple, iter.space_format)
233204
if err ~= nil then
234205
error(string.format("Failed to get next object: %s", err))
235206
end
236207

237208
return iter, obj
238209
end
239210

240-
return gen, nil, iter
211+
return gen
241212
end
242213

243214
function select_module.call(space_name, user_conditions, opts)
@@ -263,17 +234,18 @@ function select_module.call(space_name, user_conditions, opts)
263234

264235
local objects = {}
265236

266-
while iter:has_next() do
267-
local obj, err = iter:get()
268-
if err ~= nil then
269-
return nil, SelectError:new("Failed to get next object: %s", err)
270-
end
271-
272-
if obj == nil then
237+
local count = 0
238+
for _, tuple in iter.iter:pairs() do
239+
if opts.limit ~= nil and count >= opts.limit then
273240
break
274241
end
275242

276-
table.insert(objects, obj)
243+
local object, err = utils.unflatten(tuple, iter.space_format)
244+
if err ~= nil then
245+
return nil, err
246+
end
247+
table.insert(objects, object)
248+
count = count + 1
277249
end
278250

279251
return objects

0 commit comments

Comments
 (0)