@@ -2,15 +2,13 @@ local checks = require('checks')
2
2
local errors = require (' errors' )
3
3
local vshard = require (' vshard' )
4
4
5
- local call = require (' crud.common.call' )
6
5
local utils = require (' crud.common.utils' )
7
6
local sharding = require (' crud.common.sharding' )
8
7
local dev_checks = require (' crud.common.dev_checks' )
9
8
10
9
local select_conditions = require (' crud.select.conditions' )
11
10
local select_plan = require (' crud.select.plan' )
12
11
local select_executor = require (' crud.select.executor' )
13
- local select_comparators = require (' crud.select.comparators' )
14
12
local select_filters = require (' crud.select.filters' )
15
13
16
14
local Iterator = require (' crud.select.iterator' )
@@ -20,10 +18,16 @@ local GetReplicasetsError = errors.new_class('GetReplicasetsError')
20
18
21
19
local select_module = {}
22
20
23
- local SELECT_FUNC_NAME = ' _crud.select_on_storage'
24
-
25
21
local DEFAULT_BATCH_SIZE = 100
26
22
23
+ local function make_cursor (data )
24
+ local last_tuple = data [# data ]
25
+
26
+ return {
27
+ after_tuple = last_tuple ,
28
+ }
29
+ end
30
+
27
31
local function select_on_storage (space_name , index_id , conditions , opts )
28
32
dev_checks (' string' , ' number' , ' ?table' , {
29
33
scan_value = ' table' ,
@@ -62,46 +66,20 @@ local function select_on_storage(space_name, index_id, conditions, opts)
62
66
return nil , SelectError :new (" Failed to execute select: %s" , err )
63
67
end
64
68
65
- return tuples
69
+ local cursor
70
+ if # tuples < opts .limit or opts .limit == 0 then
71
+ cursor = {is_end = true }
72
+ else
73
+ cursor = make_cursor (tuples )
74
+ end
75
+
76
+ return cursor , tuples
66
77
end
67
78
68
79
function select_module .init ()
69
80
_G ._crud .select_on_storage = select_on_storage
70
81
end
71
82
72
- local function select_iteration (space_name , plan , opts )
73
- dev_checks (' string' , ' ?table' , {
74
- after_tuple = ' ?table' ,
75
- replicasets = ' table' ,
76
- timeout = ' ?number' ,
77
- limit = ' number' ,
78
- })
79
-
80
- -- call select on storages
81
- local storage_select_opts = {
82
- scan_value = plan .scan_value ,
83
- after_tuple = opts .after_tuple ,
84
- iter = plan .iter ,
85
- limit = opts .limit ,
86
- scan_condition_num = plan .scan_condition_num ,
87
- }
88
-
89
- local storage_select_args = {
90
- space_name , plan .index_id , plan .conditions , storage_select_opts ,
91
- }
92
-
93
- local results , err = call .ro (SELECT_FUNC_NAME , storage_select_args , {
94
- replicasets = opts .replicasets ,
95
- timeout = opts .timeout ,
96
- })
97
-
98
- if err ~= nil then
99
- return nil , err
100
- end
101
-
102
- return results
103
- end
104
-
105
83
local function get_replicasets_by_sharding_key (bucket_id )
106
84
local replicaset , err = vshard .router .route (bucket_id )
107
85
if replicaset == nil then
@@ -128,8 +106,6 @@ local function build_select_iterator(space_name, user_conditions, opts)
128
106
return nil , SelectError :new (" batch_size should be > 0" )
129
107
end
130
108
131
- local batch_size = opts .batch_size or DEFAULT_BATCH_SIZE
132
-
133
109
-- check conditions
134
110
local conditions , err = select_conditions .parse (user_conditions )
135
111
if err ~= nil then
@@ -165,33 +141,33 @@ local function build_select_iterator(space_name, user_conditions, opts)
165
141
replicasets_to_select = get_replicasets_by_sharding_key (bucket_id )
166
142
end
167
143
168
- -- generate tuples comparator
169
- local scan_index = space . index [ plan . index_id ]
170
- local primary_index = space . index [ 0 ]
171
- local cmp_key_parts = utils . merge_primary_key_parts ( scan_index . parts , primary_index . parts )
172
- local cmp_operator = select_comparators . get_cmp_operator ( plan . iter )
173
- local tuples_comparator , err = select_comparators . gen_tuples_comparator (
174
- cmp_operator , cmp_key_parts
175
- )
176
- if err ~= nil then
177
- return nil , SelectError : new ( " Failed to generate comparator function: %s " , err )
144
+ -- If opts.batch_size is missed we should specify it to min(first, DEFAULT_BATCH_SIZE)
145
+ local batch_size
146
+ if opts . batch_size == nil then
147
+ if opts . first ~= nil and opts . first < DEFAULT_BATCH_SIZE then
148
+ batch_size = opts . first
149
+ else
150
+ batch_size = DEFAULT_BATCH_SIZE
151
+ end
152
+ else
153
+ batch_size = opts . batch_size
178
154
end
179
155
180
- local iter = Iterator .new ({
181
- space_name = space_name ,
182
- space_format = space_format ,
183
- iteration_func = select_iteration ,
184
- comparator = tuples_comparator ,
185
-
186
- plan = plan ,
187
-
188
- batch_size = batch_size ,
189
- replicasets = replicasets_to_select ,
156
+ local select_opts = {
157
+ scan_value = plan .scan_value ,
158
+ after_tuple = plan .after_tuple ,
159
+ iter = plan .iter ,
160
+ limit = batch_size ,
161
+ scan_condition_num = plan .scan_condition_num ,
162
+ }
190
163
191
- timeout = opts . timeout ,
192
- } )
164
+ local iter = Iterator . new ( replicasets_to_select ,
165
+ space_name , plan . index_id , plan . conditions , select_opts )
193
166
194
- return iter
167
+ return {
168
+ iter = iter ,
169
+ space_format = space_format ,
170
+ }
195
171
end
196
172
197
173
function select_module .pairs (space_name , user_conditions , opts )
@@ -222,31 +198,32 @@ function select_module.pairs(space_name, user_conditions, opts)
222
198
error (string.format (" Failed to generate iterator: %s" , err ))
223
199
end
224
200
225
- local gen = function (_ , iter )
226
- if not iter :has_next () then
201
+ local tuple , _
202
+
203
+ if opts .use_tomap ~= true then
204
+ return iter .iter :pairs ()
205
+ end
206
+
207
+ local merger_gen = iter .iter :pairs ()
208
+ local gen = function ()
209
+ _ , tuple = merger_gen .gen (nil , merger_gen .state )
210
+ if tuple == nil then
227
211
return nil
228
212
end
229
213
230
- local tuple , err = iter :get ()
214
+ local result
215
+ result , err = utils .unflatten (tuple , iter .space_format )
231
216
if err ~= nil then
232
- error (string.format (" Failed to get next object: %s" , err ))
233
- end
234
-
235
- local result = tuple
236
- if opts .use_tomap == true then
237
- result , err = utils .unflatten (tuple , iter .space_format )
238
- if err ~= nil then
239
- error (string.format (" Failed to unflatten next object: %s" , err ))
240
- end
217
+ error (string.format (" Failed to unflatten next object: %s" , err ))
241
218
end
242
219
243
220
return iter , result
244
221
end
245
222
246
- return gen , nil , iter
223
+ return gen
247
224
end
248
225
249
- function select_module . call (space_name , user_conditions , opts )
226
+ local function select_module_call_xc (space_name , user_conditions , opts )
250
227
checks (' string' , ' ?table' , {
251
228
after = ' ?table' ,
252
229
first = ' ?number' ,
@@ -277,17 +254,15 @@ function select_module.call(space_name, user_conditions, opts)
277
254
278
255
local tuples = {}
279
256
280
- while iter :has_next () do
281
- local tuple , err = iter :get ()
282
- if err ~= nil then
283
- return nil , SelectError :new (" Failed to get next object: %s" , err )
284
- end
285
-
286
- if tuple == nil then
257
+ local count = 0
258
+ local first = opts .first and math.abs (opts .first )
259
+ for _ , tuple in iter .iter :pairs () do
260
+ if first ~= nil and count >= first then
287
261
break
288
262
end
289
263
290
264
table.insert (tuples , tuple )
265
+ count = count + 1
291
266
end
292
267
293
268
if opts .first ~= nil and opts .first < 0 then
@@ -300,4 +275,8 @@ function select_module.call(space_name, user_conditions, opts)
300
275
}
301
276
end
302
277
278
+ function select_module .call (space_name , user_conditions , opts )
279
+ return SelectError :pcall (select_module_call_xc , space_name , user_conditions , opts )
280
+ end
281
+
303
282
return select_module
0 commit comments