@@ -2,15 +2,13 @@ local checks = require('checks')
2
2
local errors = require (' errors' )
3
3
local vshard = require (' vshard' )
4
4
5
- local call = require (' crud.common.call' )
6
5
local registry = require (' crud.common.registry' )
7
6
local utils = require (' crud.common.utils' )
8
7
local dev_checks = require (' crud.common.dev_checks' )
9
8
10
9
local select_conditions = require (' crud.select.conditions' )
11
10
local select_plan = require (' crud.select.plan' )
12
11
local select_executor = require (' crud.select.executor' )
13
- local select_comparators = require (' crud.select.comparators' )
14
12
local select_filters = require (' crud.select.filters' )
15
13
16
14
local Iterator = require (' crud.select.iterator' )
@@ -21,9 +19,20 @@ local GetReplicasetsError = errors.new_class('GetReplicasetsError')
21
19
local select_module = {}
22
20
23
21
local SELECT_FUNC_NAME = ' __select'
24
-
25
22
local DEFAULT_BATCH_SIZE = 100
26
23
24
+ local function make_cursor (opts , data )
25
+ local last_tuple = data [# data ]
26
+
27
+ return {
28
+ scan_value = opts .scan_value ,
29
+ after_tuple = last_tuple ,
30
+ iter = opts .iter ,
31
+ limit = opts .limit ,
32
+ scan_condition_num = opts .scan_condition_num ,
33
+ }
34
+ end
35
+
27
36
local function call_select_on_storage (space_name , index_id , conditions , opts )
28
37
dev_checks (' string' , ' number' , ' ?table' , {
29
38
scan_value = ' table' ,
@@ -62,7 +71,14 @@ local function call_select_on_storage(space_name, index_id, conditions, opts)
62
71
return nil , SelectError :new (" Failed to execute select: %s" , err )
63
72
end
64
73
65
- return tuples
74
+ local cursor
75
+ if # tuples < opts .limit or opts .limit == 0 then
76
+ cursor = {is_end = true }
77
+ else
78
+ cursor = make_cursor (opts , tuples )
79
+ end
80
+
81
+ return cursor , tuples
66
82
end
67
83
68
84
function select_module .init ()
@@ -71,39 +87,6 @@ function select_module.init()
71
87
})
72
88
end
73
89
74
- local function select_iteration (space_name , plan , opts )
75
- dev_checks (' string' , ' ?table' , {
76
- after_tuple = ' ?table' ,
77
- replicasets = ' table' ,
78
- timeout = ' ?number' ,
79
- limit = ' number' ,
80
- })
81
-
82
- -- call select on storages
83
- local storage_select_opts = {
84
- scan_value = plan .scan_value ,
85
- after_tuple = opts .after_tuple ,
86
- iter = plan .iter ,
87
- limit = opts .limit ,
88
- scan_condition_num = plan .scan_condition_num ,
89
- }
90
-
91
- local storage_select_args = {
92
- space_name , plan .index_id , plan .conditions , storage_select_opts ,
93
- }
94
-
95
- local results , err = call .ro (SELECT_FUNC_NAME , storage_select_args , {
96
- replicasets = opts .replicasets ,
97
- timeout = opts .timeout ,
98
- })
99
-
100
- if err ~= nil then
101
- return nil , err
102
- end
103
-
104
- return results
105
- end
106
-
107
90
local function get_replicasets_by_sharding_key (sharding_key )
108
91
local bucket_id = vshard .router .bucket_id_strcrc32 (sharding_key )
109
92
local replicaset , err = vshard .router .route (bucket_id )
@@ -130,8 +113,6 @@ local function build_select_iterator(space_name, user_conditions, opts)
130
113
return nil , SelectError :new (" batch_size should be > 0" )
131
114
end
132
115
133
- local batch_size = opts .batch_size or DEFAULT_BATCH_SIZE
134
-
135
116
if opts .limit ~= nil and opts .limit < 0 then
136
117
return nil , SelectError :new (" limit should be >= 0" )
137
118
end
@@ -172,34 +153,21 @@ local function build_select_iterator(space_name, user_conditions, opts)
172
153
-- set after tuple
173
154
local after_tuple = utils .flatten (opts .after , space_format )
174
155
175
- -- generate tuples comparator
176
- local scan_index = space .index [plan .index_id ]
177
- local primary_index = space .index [0 ]
178
- local cmp_key_parts = utils .merge_primary_key_parts (scan_index .parts , primary_index .parts )
179
- local cmp_operator = select_comparators .get_cmp_operator (plan .iter )
180
- local tuples_comparator , err = select_comparators .gen_tuples_comparator (
181
- cmp_operator , cmp_key_parts
182
- )
183
- if err ~= nil then
184
- return nil , SelectError :new (" Failed to generate comparator function: %s" , err )
185
- end
186
-
187
- local iter = Iterator .new ({
188
- space_name = space_name ,
189
- space_format = space_format ,
190
- iteration_func = select_iteration ,
191
- comparator = tuples_comparator ,
192
-
193
- plan = plan ,
156
+ local select_opts = {
157
+ scan_value = plan .scan_value ,
194
158
after_tuple = after_tuple ,
159
+ iter = plan .iter ,
160
+ limit = opts .batch_size or DEFAULT_BATCH_SIZE ,
161
+ scan_condition_num = plan .scan_condition_num ,
162
+ }
195
163
196
- batch_size = batch_size ,
197
- replicasets = replicasets_to_select ,
164
+ local iter = Iterator . new ( replicasets_to_select ,
165
+ space_name , plan . index_id , plan . conditions , select_opts )
198
166
199
- timeout = opts . timeout ,
200
- })
201
-
202
- return iter
167
+ return {
168
+ iter = iter ,
169
+ space_format = space_format ,
170
+ }
203
171
end
204
172
205
173
function select_module .pairs (space_name , user_conditions , opts )
@@ -223,20 +191,23 @@ function select_module.pairs(space_name, user_conditions, opts)
223
191
error (string.format (" Failed to generate iterator: %s" , err ))
224
192
end
225
193
226
- local gen = function (_ , iter )
227
- if not iter :has_next () then
194
+ local tuple , _
195
+ local merger_gen = iter .iter :pairs ()
196
+ local gen = function ()
197
+ _ , tuple = merger_gen .gen (nil , merger_gen .state )
198
+ if tuple == nil then
228
199
return nil
229
200
end
230
201
231
- local obj , err = iter : get ( )
202
+ local obj , err = utils . unflatten ( tuple , iter . space_format )
232
203
if err ~= nil then
233
204
error (string.format (" Failed to get next object: %s" , err ))
234
205
end
235
206
236
207
return iter , obj
237
208
end
238
209
239
- return gen , nil , iter
210
+ return gen
240
211
end
241
212
242
213
function select_module .call (space_name , user_conditions , opts )
@@ -262,17 +233,18 @@ function select_module.call(space_name, user_conditions, opts)
262
233
263
234
local objects = {}
264
235
265
- while iter :has_next () do
266
- local obj , err = iter :get ()
267
- if err ~= nil then
268
- return nil , SelectError :new (" Failed to get next object: %s" , err )
269
- end
270
-
271
- if obj == nil then
236
+ local count = 0
237
+ for _ , tuple in iter .iter :pairs () do
238
+ if opts .limit ~= nil and count >= opts .limit then
272
239
break
273
240
end
274
241
275
- table.insert (objects , obj )
242
+ local object , err = utils .unflatten (tuple , iter .space_format )
243
+ if err ~= nil then
244
+ return nil , err
245
+ end
246
+ table.insert (objects , object )
247
+ count = count + 1
276
248
end
277
249
278
250
return objects
0 commit comments