Skip to content

Commit 3074100

Browse files
committed
Implementation of batch upsert
Batch upsert is mostly used for operation with one bucket / one Tarantool node in a transaction. In this case batch upsert is more efficient then upserting tuple-by-tuple. Right now CRUD cannot provide batch upsert with full consistency. CRUD offers batch upsert with partial consistency. That means that full consistency can be provided only on single replicaset using `box` transactions. Part of #193
1 parent d4c8141 commit 3074100

File tree

8 files changed

+2369
-71
lines changed

8 files changed

+2369
-71
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
88
## [Unreleased]
99

1010
### Added
11-
* Batch insert operation `crud.insert_many()`/`crud.insert_object_many()`
11+
* Batch insert/upsert operation
12+
`crud.insert_many()`/`crud.insert_object_many()`/
13+
`crud.upsert_many()`/`crud.upsert_object_many()`
1214
with partial consistency
1315

1416
### Changed

README.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,94 @@ crud.upsert_object('customers',
457457
...
458458
```
459459

460+
### Upsert many
461+
462+
```lua
463+
-- Batch upsert tuples
464+
local result, err = crud.upsert_many(space_name, tuples, operations, opts)
465+
-- Batch upsert objects
466+
local result, err = crud.upsert_object_many(space_name, objects, operations, opts)
467+
```
468+
469+
where:
470+
471+
* `space_name` (`string`) - name of the space to insert an object
472+
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
473+
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update) if there is an existing tuple which matches the key fields of tuple
474+
* `opts`:
475+
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
476+
* `fields` (`?table`) - field names for getting only a subset of fields
477+
* `stop_on_error` (`?boolean`) - stop on a first error and report errors
478+
regarding the failed operation and all not performed ones., default is
479+
`false`
480+
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
481+
rollback on a storage, where the operation is failed, default is
482+
`false`
483+
484+
Returns metadata and array of empty arrays, array of errors
485+
(one error corresponds to one replicaset for which the error occurred).
486+
Error object can contain `tuple` field. This field contains the tuple
487+
for which the error occurred.
488+
489+
Right now CRUD cannot provide batch upsert with full consistency.
490+
CRUD offers batch upsert with partial consistency. That means
491+
that full consistency can be provided only on single replicaset
492+
using `box` transactions.
493+
494+
**Example:**
495+
496+
```lua
497+
crud.upsert_many('customers', {
498+
{1, box.NULL, 'Elizabeth', 23},
499+
{2, box.NULL, 'Anastasia', 22},
500+
},{{'+', 'age', 1}, {'+', 'age', 2}})
501+
---
502+
- metadata:
503+
- {'name': 'id', 'type': 'unsigned'}
504+
- {'name': 'bucket_id', 'type': 'unsigned'}
505+
- {'name': 'name', 'type': 'string'}
506+
- {'name': 'age', 'type': 'number'}
507+
rows:
508+
- []
509+
- []
510+
...
511+
crud.upsert_object_many('customers', {
512+
{id = 3, name = 'Elizabeth', age = 24},
513+
{id = 10, name = 'Anastasia', age = 21},
514+
}, {{'+', 'age', 1}, {'+', 'age', 2}})
515+
---
516+
- metadata:
517+
- {'name': 'id', 'type': 'unsigned'}
518+
- {'name': 'bucket_id', 'type': 'unsigned'}
519+
- {'name': 'name', 'type': 'string'}
520+
- {'name': 'age', 'type': 'number'}
521+
rows:
522+
- []
523+
- []
524+
525+
-- Partial success
526+
local res, errs = crud.upsert_object_many('customers', {
527+
{id = 22, name = 'Alex', age = 34},
528+
{id = 3, name = 'Anastasia', age = 22},
529+
{id = 5, name = 'Sergey', age = 25},
530+
}, {{'+', 'age', 12}, {'=', 'age', 'invalid type'}, {'+', 'age', 10}})
531+
---
532+
res
533+
- metadata:
534+
- {'name': 'id', 'type': 'unsigned'}
535+
- {'name': 'bucket_id', 'type': 'unsigned'}
536+
- {'name': 'name', 'type': 'string'}
537+
- {'name': 'age', 'type': 'number'}
538+
rows:
539+
- [],
540+
- [],
541+
542+
#errs -- 1
543+
errs[1].class_name -- BatchUpsertError
544+
errs[1].err -- 'Tuple field 4 (age) type does not match one required by operation <...>'
545+
errs[1].tuple -- {3, 2804, 'Anastasia', 22}
546+
...
547+
```
460548

461549
### Select
462550

crud.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ local replace = require('crud.replace')
99
local get = require('crud.get')
1010
local update = require('crud.update')
1111
local upsert = require('crud.upsert')
12+
local batch_upsert = require('crud.batch_upsert')
1213
local delete = require('crud.delete')
1314
local select = require('crud.select')
1415
local truncate = require('crud.truncate')
@@ -60,6 +61,14 @@ crud.update = stats.wrap(update.call, stats.op.UPDATE)
6061
-- @function upsert
6162
crud.upsert = stats.wrap(upsert.tuple, stats.op.UPSERT)
6263

64+
-- @refer batch_upsert.tuples_batch
65+
-- @function upsert_many
66+
crud.upsert_many = batch_upsert.tuples_batch
67+
68+
-- @refer batch_upsert.objects_batch
69+
-- @function upsert_object_many
70+
crud.upsert_object_many = batch_upsert.objects_batch
71+
6372
-- @refer upsert.object
6473
-- @function upsert
6574
crud.upsert_object = stats.wrap(upsert.object, stats.op.UPSERT)
@@ -138,6 +147,7 @@ function crud.init_storage()
138147
replace.init()
139148
update.init()
140149
upsert.init()
150+
batch_upsert.init()
141151
delete.init()
142152
select.init()
143153
truncate.init()

crud/batch_upsert.lua

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
local checks = require('checks')
2+
local errors = require('errors')
3+
local vshard = require('vshard')
4+
5+
local call = require('crud.common.call')
6+
local const = require('crud.common.const')
7+
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
9+
local dev_checks = require('crud.common.dev_checks')
10+
local schema = require('crud.common.schema')
11+
12+
local BatchUpsertIterator = require('crud.common.map_call_cases.batch_upsert_iter')
13+
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
14+
15+
local BatchUpsertError = errors.new_class('BatchUpsertError', {capture_stack = false})
16+
17+
local batch_upsert = {}
18+
19+
local BATCH_UPSERT_FUNC_NAME = '_crud.batch_upsert_on_storage'
20+
21+
local function batch_upsert_on_storage(space_name, batch, operations, opts)
22+
dev_checks('string', 'table', 'table', {
23+
add_space_schema_hash = '?boolean',
24+
stop_on_error = '?boolean',
25+
rollback_on_error = '?boolean',
26+
sharding_key_hash = '?number',
27+
sharding_func_hash = '?number',
28+
skip_sharding_hash_check = '?boolean',
29+
})
30+
31+
opts = opts or {}
32+
33+
local space = box.space[space_name]
34+
if space == nil then
35+
return nil, BatchUpsertError:new("Space %q doesn't exist", space_name)
36+
end
37+
38+
local _, err = sharding.check_sharding_hash(space_name,
39+
opts.sharding_func_hash,
40+
opts.sharding_key_hash,
41+
opts.skip_sharding_hash_check)
42+
43+
if err ~= nil then
44+
return nil, {err}
45+
end
46+
47+
local inserted_tuples = {}
48+
local errs = {}
49+
50+
box.begin()
51+
for i, tuple in ipairs(batch) do
52+
-- add_space_schema_hash is true only in case of upsert_object_many
53+
-- the only one case when reloading schema can avoid upsert error
54+
-- is flattening object on router
55+
local insert_result = schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations[i]}, {
56+
add_space_schema_hash = opts.add_space_schema_hash,
57+
})
58+
59+
if insert_result.err ~= nil then
60+
local err = {
61+
err = insert_result.err,
62+
tuple = tuple,
63+
}
64+
65+
if opts.stop_on_error == true then
66+
if opts.rollback_on_error == true then
67+
box.rollback()
68+
return nil, {err}
69+
end
70+
71+
box.commit()
72+
73+
return inserted_tuples, {err}
74+
end
75+
76+
table.insert(errs, err)
77+
else
78+
table.insert(inserted_tuples, {})
79+
end
80+
end
81+
82+
if next(errs) ~= nil then
83+
if opts.rollback_on_error == true then
84+
box.rollback()
85+
return nil, errs
86+
end
87+
88+
box.commit()
89+
90+
return inserted_tuples, errs
91+
end
92+
93+
box.commit()
94+
95+
return inserted_tuples
96+
end
97+
98+
function batch_upsert.init()
99+
_G._crud.batch_upsert_on_storage = batch_upsert_on_storage
100+
end
101+
102+
-- returns result, err, need_reload
103+
-- need_reload indicates if reloading schema could help
104+
-- see crud.common.schema.wrap_func_reload()
105+
local function call_batch_upsert_on_router(space_name, original_tuples, user_operations, opts)
106+
dev_checks('string', 'table', 'table', {
107+
timeout = '?number',
108+
fields = '?table',
109+
add_space_schema_hash = '?boolean',
110+
stop_on_error = '?boolean',
111+
rollback_on_error = '?boolean',
112+
})
113+
114+
opts = opts or {}
115+
116+
local space = utils.get_space(space_name, vshard.router.routeall())
117+
if space == nil then
118+
return nil, {BatchUpsertError:new("Space %q doesn't exist", space_name)}, true
119+
end
120+
121+
local tuples = table.deepcopy(original_tuples)
122+
123+
local space_format = space:format()
124+
local operations = user_operations
125+
if not utils.tarantool_supports_fieldpaths() then
126+
operations = {}
127+
for _, operations_for_tuple in ipairs(user_operations) do
128+
local converted_operations, err = utils.convert_operations(operations_for_tuple, space_format)
129+
if err ~= nil then
130+
return nil, {BatchUpsertError:new("Wrong operations are specified: %s", err)}, true
131+
end
132+
table.insert(operations, converted_operations)
133+
end
134+
end
135+
136+
local batch_upsert_on_storage_opts = {
137+
add_space_schema_hash = opts.add_space_schema_hash,
138+
stop_on_error = opts.stop_on_error,
139+
rollback_on_error = opts.rollback_on_error,
140+
}
141+
142+
local iter, err = BatchUpsertIterator:new({
143+
tuples = tuples,
144+
space = space,
145+
operations = operations,
146+
execute_on_storage_opts = batch_upsert_on_storage_opts,
147+
})
148+
if err ~= nil then
149+
return nil, {err}, const.NEED_SCHEMA_RELOAD
150+
end
151+
152+
local postprocessor = BatchPostprocessor:new()
153+
154+
local rows, errs = call.map(BATCH_UPSERT_FUNC_NAME, nil, {
155+
timeout = opts.timeout,
156+
mode = 'write',
157+
iter = iter,
158+
postprocessor = postprocessor,
159+
})
160+
161+
if next(rows) == nil then
162+
return nil, errs
163+
end
164+
165+
local res, err = utils.format_result(rows, space, opts.fields)
166+
if err ~= nil then
167+
return nil, {err}
168+
end
169+
170+
return res, errs
171+
end
172+
173+
--- Batch update or insert tuples to the specified space
174+
--
175+
-- @function tuples_batch
176+
--
177+
-- @param string space_name
178+
-- A space name
179+
--
180+
-- @param table tuples
181+
-- Tuples
182+
--
183+
-- @tparam ?table opts
184+
-- Options of batch_upsert.tuples_batch
185+
--
186+
-- @return[1] tuples
187+
-- @treturn[2] nil
188+
-- @treturn[2] table of tables Error description
189+
190+
function batch_upsert.tuples_batch(space_name, tuples, user_operations, opts)
191+
checks('string', 'table', 'table', {
192+
timeout = '?number',
193+
fields = '?table',
194+
add_space_schema_hash = '?boolean',
195+
stop_on_error = '?boolean',
196+
rollback_on_error = '?boolean',
197+
})
198+
199+
return schema.wrap_func_reload(call_batch_upsert_on_router, space_name, tuples, user_operations, opts)
200+
end
201+
202+
--- Batch update or insert objects to the specified space
203+
--
204+
-- @function objects_batch
205+
--
206+
-- @param string space_name
207+
-- A space name
208+
--
209+
-- @param table objs
210+
-- Objects
211+
--
212+
-- @tparam ?table opts
213+
-- Options of batch_upsert.tuples_batch
214+
--
215+
-- @return[1] objects
216+
-- @treturn[2] nil
217+
-- @treturn[2] table of tables Error description
218+
219+
function batch_upsert.objects_batch(space_name, objs, user_operations, opts)
220+
checks('string', 'table', 'table', {
221+
timeout = '?number',
222+
fields = '?table',
223+
stop_on_error = '?boolean',
224+
rollback_on_error = '?boolean',
225+
})
226+
227+
-- upsert can fail if router uses outdated schema to flatten object
228+
opts = utils.merge_options(opts, {add_space_schema_hash = true})
229+
230+
local tuples = {}
231+
local errs = {}
232+
233+
for _, obj in ipairs(objs) do
234+
235+
local tuple, err = utils.flatten_obj_reload(space_name, obj)
236+
if err ~= nil then
237+
local err_obj = BatchUpsertError:new("Failed to flatten object: %s", err)
238+
err_obj.tuple = obj
239+
240+
if opts.stop_on_error == true then
241+
return nil, {err_obj}
242+
end
243+
244+
table.insert(errs, err_obj)
245+
end
246+
247+
table.insert(tuples, tuple)
248+
end
249+
250+
if next(errs) ~= nil then
251+
return nil, errs
252+
end
253+
254+
return batch_upsert.tuples_batch(space_name, tuples, user_operations, opts)
255+
end
256+
257+
return batch_upsert

0 commit comments

Comments
 (0)