Skip to content

Commit d4c8141

Browse files
committed
Implementation of batch insert
Batch insert is mostly used for operation with one bucket / one Tarantool node in a transaction. In this case batch insert is more efficient then inserting tuple-by-tuple. Right now CRUD cannot provide batch insert with full consistency. CRUD offers batch insert with partial consistency. That means that full consistency can be provided only on single replicaset using `box` transactions. Part of #193
1 parent 52b2b4a commit d4c8141

File tree

13 files changed

+2432
-18
lines changed

13 files changed

+2432
-18
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
88
## [Unreleased]
99

1010
### Added
11+
* Batch insert operation `crud.insert_many()`/`crud.insert_object_many()`
12+
with partial consistency
1113

1214
### Changed
1315

README.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,94 @@ crud.insert_object('customers', {
173173
...
174174
```
175175

176+
### Insert many
177+
178+
```lua
179+
-- Batch insert tuples
180+
local result, err = crud.insert_many(space_name, tuples, opts)
181+
-- Batch insert objects
182+
local result, err = crud.insert_object_many(space_name, objects, opts)
183+
```
184+
185+
where:
186+
187+
* `space_name` (`string`) - name of the space to insert an object
188+
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
189+
* `opts`:
190+
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
191+
* `fields` (`?table`) - field names for getting only a subset of fields
192+
* `stop_on_error` (`?boolean`) - stop on a first error and report errors
193+
regarding the failed operation and all not performed ones., default is
194+
`false`
195+
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
196+
rollback on a storage, where the operation is failed, default is
197+
`false`
198+
199+
Returns metadata and array contains inserted rows, array of errors
200+
(one error corresponds to one replicaset for which the error occurred).
201+
Error object can contain `tuple` field. This field contains the tuple
202+
for which the error occurred.
203+
204+
Right now CRUD cannot provide batch insert with full consistency.
205+
CRUD offers batch insert with partial consistency. That means
206+
that full consistency can be provided only on single replicaset
207+
using `box` transactions.
208+
209+
**Example:**
210+
211+
```lua
212+
crud.insert_many('customers', {
213+
{1, box.NULL, 'Elizabeth', 23},
214+
{2, box.NULL, 'Anastasia', 22},
215+
})
216+
---
217+
- metadata:
218+
- {'name': 'id', 'type': 'unsigned'}
219+
- {'name': 'bucket_id', 'type': 'unsigned'}
220+
- {'name': 'name', 'type': 'string'}
221+
- {'name': 'age', 'type': 'number'}
222+
rows:
223+
- [1, 477, 'Elizabeth', 23]
224+
- [2, 401, 'Anastasia', 22]
225+
...
226+
crud.insert_object_many('customers', {
227+
{id = 3, name = 'Elizabeth', age = 24},
228+
{id = 10, name = 'Anastasia', age = 21},
229+
})
230+
---
231+
- metadata:
232+
- {'name': 'id', 'type': 'unsigned'}
233+
- {'name': 'bucket_id', 'type': 'unsigned'}
234+
- {'name': 'name', 'type': 'string'}
235+
- {'name': 'age', 'type': 'number'}
236+
rows:
237+
- [3, 2804, 'Elizabeth', 24]
238+
- [10, 569, 'Anastasia', 21]
239+
240+
-- Partial success
241+
local res, errs = crud.insert_object_many('customers', {
242+
{id = 22, name = 'Alex', age = 34},
243+
{id = 3, name = 'Anastasia', age = 22},
244+
{id = 5, name = 'Sergey', age = 25},
245+
})
246+
---
247+
res
248+
- metadata:
249+
- {'name': 'id', 'type': 'unsigned'}
250+
- {'name': 'bucket_id', 'type': 'unsigned'}
251+
- {'name': 'name', 'type': 'string'}
252+
- {'name': 'age', 'type': 'number'}
253+
rows:
254+
- [5, 1172, 'Sergey', 25],
255+
- [22, 655, 'Alex', 34],
256+
257+
#errs -- 1
258+
errs[1].class_name -- BatchInsertError
259+
errs[1].err -- 'Duplicate key exists <...>'
260+
errs[1].tuple -- {3, 2804, 'Anastasia', 22}
261+
...
262+
```
263+
176264
### Get
177265

178266
```lua

crud.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
local cfg = require('crud.cfg')
66
local insert = require('crud.insert')
7+
local batch_insert = require('crud.batch_insert')
78
local replace = require('crud.replace')
89
local get = require('crud.get')
910
local update = require('crud.update')
@@ -31,6 +32,14 @@ crud.insert = stats.wrap(insert.tuple, stats.op.INSERT)
3132
-- @function insert_object
3233
crud.insert_object = stats.wrap(insert.object, stats.op.INSERT)
3334

35+
-- @refer batch_insert.tuples_batch
36+
-- @function insert_many
37+
crud.insert_many = batch_insert.tuples_batch
38+
39+
-- @refer batch_insert.objects_batch
40+
-- @function insert_object_many
41+
crud.insert_object_many = batch_insert.objects_batch
42+
3443
-- @refer get.call
3544
-- @function get
3645
crud.get = stats.wrap(get.call, stats.op.GET)
@@ -124,6 +133,7 @@ function crud.init_storage()
124133
end
125134

126135
insert.init()
136+
batch_insert.init()
127137
get.init()
128138
replace.init()
129139
update.init()

crud/batch_insert.lua

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
local checks = require('checks')
2+
local errors = require('errors')
3+
local vshard = require('vshard')
4+
5+
local call = require('crud.common.call')
6+
local const = require('crud.common.const')
7+
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
9+
local dev_checks = require('crud.common.dev_checks')
10+
local schema = require('crud.common.schema')
11+
12+
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
13+
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
14+
15+
local BatchInsertError = errors.new_class('BatchInsertError', {capture_stack = false})
16+
17+
local batch_insert = {}
18+
19+
local BATCH_INSERT_FUNC_NAME = '_crud.batch_insert_on_storage'
20+
21+
local function batch_insert_on_storage(space_name, tuples, opts)
22+
dev_checks('string', 'table', {
23+
add_space_schema_hash = '?boolean',
24+
fields = '?table',
25+
stop_on_error = '?boolean',
26+
rollback_on_error = '?boolean',
27+
sharding_key_hash = '?number',
28+
sharding_func_hash = '?number',
29+
skip_sharding_hash_check = '?boolean',
30+
})
31+
32+
opts = opts or {}
33+
34+
local space = box.space[space_name]
35+
if space == nil then
36+
return nil, {BatchInsertError:new("Space %q doesn't exist", space_name)}
37+
end
38+
39+
local _, err = sharding.check_sharding_hash(space_name,
40+
opts.sharding_func_hash,
41+
opts.sharding_key_hash,
42+
opts.skip_sharding_hash_check)
43+
44+
if err ~= nil then
45+
return nil, {err}
46+
end
47+
48+
local inserted_tuples = {}
49+
local errs = {}
50+
51+
box.begin()
52+
for _, tuple in ipairs(tuples) do
53+
-- add_space_schema_hash is true only in case of insert_object_many
54+
-- the only one case when reloading schema can avoid insert error
55+
-- is flattening object on router
56+
local insert_result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
57+
add_space_schema_hash = opts.add_space_schema_hash,
58+
field_names = opts.fields,
59+
})
60+
61+
if insert_result.err ~= nil then
62+
local err = {
63+
err = insert_result.err,
64+
tuple = tuple,
65+
}
66+
67+
if opts.stop_on_error == true then
68+
if opts.rollback_on_error == true then
69+
box.rollback()
70+
return nil, {err}
71+
end
72+
73+
box.commit()
74+
75+
return inserted_tuples, {err}
76+
end
77+
78+
table.insert(errs, err)
79+
end
80+
81+
table.insert(inserted_tuples, insert_result.res)
82+
end
83+
84+
if next(errs) ~= nil then
85+
if opts.rollback_on_error == true then
86+
box.rollback()
87+
return nil, errs
88+
end
89+
90+
box.commit()
91+
92+
return inserted_tuples, errs
93+
end
94+
95+
box.commit()
96+
97+
return inserted_tuples
98+
end
99+
100+
function batch_insert.init()
101+
_G._crud.batch_insert_on_storage = batch_insert_on_storage
102+
end
103+
104+
-- returns result, err, need_reload
105+
-- need_reload indicates if reloading schema could help
106+
-- see crud.common.schema.wrap_func_reload()
107+
local function call_batch_insert_on_router(space_name, original_tuples, opts)
108+
dev_checks('string', 'table', {
109+
timeout = '?number',
110+
fields = '?table',
111+
add_space_schema_hash = '?boolean',
112+
stop_on_error = '?boolean',
113+
rollback_on_error = '?boolean',
114+
})
115+
116+
opts = opts or {}
117+
118+
local space = utils.get_space(space_name, vshard.router.routeall())
119+
if space == nil then
120+
return nil, {BatchInsertError:new("Space %q doesn't exist", space_name)}, true
121+
end
122+
123+
local tuples = table.deepcopy(original_tuples)
124+
125+
local batch_insert_on_storage_opts = {
126+
add_space_schema_hash = opts.add_space_schema_hash,
127+
fields = opts.fields,
128+
stop_on_error = opts.stop_on_error,
129+
rollback_on_error = opts.rollback_on_error,
130+
}
131+
132+
local iter, err = BatchInsertIterator:new({
133+
tuples = tuples,
134+
space = space,
135+
execute_on_storage_opts = batch_insert_on_storage_opts,
136+
})
137+
if err ~= nil then
138+
return nil, {err}, const.NEED_SCHEMA_RELOAD
139+
end
140+
141+
local postprocessor = BatchPostprocessor:new()
142+
143+
local rows, errs = call.map(BATCH_INSERT_FUNC_NAME, nil, {
144+
timeout = opts.timeout,
145+
mode = 'write',
146+
iter = iter,
147+
postprocessor = postprocessor,
148+
})
149+
150+
if next(rows) == nil then
151+
return nil, errs
152+
end
153+
154+
local res, err = utils.format_result(rows, space, opts.fields)
155+
if err ~= nil then
156+
return nil, {err}
157+
end
158+
159+
return res, errs
160+
end
161+
162+
--- Batch inserts tuples to the specified space
163+
--
164+
-- @function tuples_batch
165+
--
166+
-- @param string space_name
167+
-- A space name
168+
--
169+
-- @param table tuples
170+
-- Tuples
171+
--
172+
-- @tparam ?table opts
173+
-- Options of batch_insert.tuples_batch
174+
--
175+
-- @return[1] tuples
176+
-- @treturn[2] nil
177+
-- @treturn[2] table of tables Error description
178+
179+
function batch_insert.tuples_batch(space_name, tuples, opts)
180+
checks('string', 'table', {
181+
timeout = '?number',
182+
fields = '?table',
183+
add_space_schema_hash = '?boolean',
184+
stop_on_error = '?boolean',
185+
rollback_on_error = '?boolean',
186+
})
187+
188+
return schema.wrap_func_reload(call_batch_insert_on_router, space_name, tuples, opts)
189+
end
190+
191+
--- Batch inserts objects to the specified space
192+
--
193+
-- @function objects_batch
194+
--
195+
-- @param string space_name
196+
-- A space name
197+
--
198+
-- @param table objs
199+
-- Objects
200+
--
201+
-- @tparam ?table opts
202+
-- Options of batch_insert.tuples_batch
203+
--
204+
-- @return[1] objects
205+
-- @treturn[2] nil
206+
-- @treturn[2] table of tables Error description
207+
208+
function batch_insert.objects_batch(space_name, objs, opts)
209+
checks('string', 'table', {
210+
timeout = '?number',
211+
fields = '?table',
212+
stop_on_error = '?boolean',
213+
rollback_on_error = '?boolean',
214+
})
215+
216+
-- insert can fail if router uses outdated schema to flatten object
217+
opts = utils.merge_options(opts, {add_space_schema_hash = true})
218+
219+
local tuples = {}
220+
local errs = {}
221+
222+
for _, obj in ipairs(objs) do
223+
224+
local tuple, err = utils.flatten_obj_reload(space_name, obj)
225+
if err ~= nil then
226+
local err_obj = BatchInsertError:new("Failed to flatten object: %s", err)
227+
err_obj.tuple = obj
228+
229+
if opts.stop_on_error == true then
230+
return nil, {err_obj}
231+
end
232+
233+
table.insert(errs, err_obj)
234+
end
235+
236+
table.insert(tuples, tuple)
237+
end
238+
239+
if next(errs) ~= nil then
240+
return nil, errs
241+
end
242+
243+
return batch_insert.tuples_batch(space_name, tuples, opts)
244+
end
245+
246+
return batch_insert

0 commit comments

Comments
 (0)