Skip to content

Commit 4ada70e

Browse files
committed
Add implementation of count() method
This commit introduces count method that: * has arguments and options like `select()`/`pairs()`; * counts number of rows in space with yield by `yield_every`; * counts by any conditions. Closes #74
1 parent be2b7de commit 4ada70e

File tree

8 files changed

+1166
-0
lines changed

8 files changed

+1166
-0
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2121
key specified with DDL schema or in `_ddl_sharding_key` space.
2222
NOTE: CRUD methods delete(), get() and update() requires that sharding key
2323
must be a part of primary key.
24+
* `crud.count()` function to calculate the number of tuples
25+
in the space according to conditions.
2426

2527
### Fixed
2628

README.md

+43
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ Table below describe what operations supports custom sharding key:
8787
| `replace()` / `replace_object()` | Yes |
8888
| `upsert()` / `upsert_object()` | Yes |
8989
| `select()` / `pairs()` | Yes |
90+
| `count()` | Yes |
9091
| `update()` | Yes |
9192
| `upsert()` / `upsert_object()` | Yes |
9293
| `replace() / replace_object()` | Yes |
@@ -586,6 +587,48 @@ crud.len('customers')
586587
...
587588
```
588589

590+
### Count
591+
592+
`CRUD` supports multi-conditional count, treating a cluster as a single space.
593+
The same as with `select()` the conditions may include field names or numbers,
594+
as well as index names. The recommended first condition is a TREE index; this
595+
helps to reduce the number of tuples to scan. Otherwise a full scan is performed.
596+
If compared with `len()`, `count()` method scans the entire space to count the
597+
tuples according user conditions. This method does yield that's why result may
598+
be approximate. Number of tuples before next `yield()` is under control with
599+
option `yield_every`.
600+
601+
```lua
602+
local result, err = crud.count(space_name, conditions, opts)
603+
```
604+
605+
where:
606+
607+
* `space_name` (`string`) - name of the space
608+
* `conditions` (`?table`) - array of [conditions](#select-conditions)
609+
* `opts`:
610+
* `yield_every` (`?number`) - number of tuples processed to yield after,
611+
`yield_every` should be > 0, default value is 100
612+
* `timeout` (`?number`) - `vshard.call` timeout (in seconds), default value is 2
613+
* `bucket_id` (`?number|cdata`) - bucket ID
614+
* `force_map_call` (`?boolean`) - if `true`
615+
then the map call is performed without any optimizations even,
616+
default value is `false`
617+
* `mode` (`?string`, `read` or `write`) - if `write` is specified then `count` is
618+
performed on master, default value is `read`
619+
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
620+
the replicas, default value is `false`
621+
* `balance` (`?boolean`) - use replica according to
622+
[vshard load balancing policy](https://www.tarantool.io/en/doc/latest/reference/reference_rock/vshard/vshard_api/#router-api-call),
623+
default value is `false`
624+
625+
```lua
626+
crud.count('customers', {{'<=', 'age', 35}})
627+
---
628+
- 5
629+
...
630+
```
631+
589632
### Call options for crud methods
590633

591634
Combinations of `mode`, `prefer_replica` and `balance` options lead to:

crud.lua

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ local delete = require('crud.delete')
1111
local select = require('crud.select')
1212
local truncate = require('crud.truncate')
1313
local len = require('crud.len')
14+
local count = require('crud.count')
1415
local borders = require('crud.borders')
1516
local sharding_key = require('crud.common.sharding_key')
1617
local utils = require('crud.common.utils')
@@ -76,6 +77,10 @@ crud.truncate = truncate.call
7677
-- @function len
7778
crud.len = len.call
7879

80+
-- @refer count.call
81+
-- @function count
82+
crud.count = count.call
83+
7984
-- @refer borders.min
8085
-- @function min
8186
crud.min = borders.min
@@ -113,6 +118,7 @@ function crud.init_storage()
113118
select.init()
114119
truncate.init()
115120
len.init()
121+
count.init()
116122
borders.init()
117123
sharding_key.init()
118124
end

crud/count.lua

+271
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
local checks = require('checks')
2+
local errors = require('errors')
3+
local vshard = require('vshard')
4+
local fiber = require('fiber')
5+
6+
local call = require('crud.common.call')
7+
local utils = require('crud.common.utils')
8+
local sharding = require('crud.common.sharding')
9+
local filters = require('crud.compare.filters')
10+
local count_plan = require('crud.compare.plan')
11+
local dev_checks = require('crud.common.dev_checks')
12+
local schema = require('crud.common.schema')
13+
local sharding_key_module = require('crud.common.sharding_key')
14+
15+
local compare_conditions = require('crud.compare.conditions')
16+
17+
local CountError = errors.new_class('CountError', {capture_stack = false})
18+
19+
local COUNT_FUNC_NAME = '_crud.count_on_storage'
20+
local DEFAULT_YIELD_EVERY = 1000
21+
22+
local count = {}
23+
24+
local function count_on_storage(space_name, index_id, conditions, opts)
25+
dev_checks('string', 'number', '?table', {
26+
scan_value = 'table',
27+
tarantool_iter = 'number',
28+
yield_every = '?number',
29+
scan_condition_num = '?number',
30+
})
31+
32+
opts = opts or {}
33+
34+
local space = box.space[space_name]
35+
36+
local index = space.index[index_id]
37+
if index == nil then
38+
return nil, CountError:new("Index with ID %s doesn't exist", index_id)
39+
end
40+
41+
local value = opts.scan_value
42+
43+
local filter_func, err = filters.gen_func(space, conditions, {
44+
tarantool_iter = opts.tarantool_iter,
45+
scan_condition_num = opts.scan_condition_num,
46+
})
47+
if err ~= nil then
48+
return nil, CountError:new("Failed to generate tuples filter: %s", err)
49+
end
50+
51+
local tuples_count = 0
52+
local looked_up_tuples = 0
53+
54+
for _, tuple in index:pairs(value, {iterator = opts.tarantool_iter}) do
55+
if tuple == nil then
56+
break
57+
end
58+
59+
looked_up_tuples = looked_up_tuples + 1
60+
61+
local matched, early_exit = filter_func(tuple)
62+
63+
if matched then
64+
tuples_count = tuples_count + 1
65+
66+
if opts.yield_every ~= nil and looked_up_tuples % opts.yield_every == 0 then
67+
fiber.yield()
68+
end
69+
elseif early_exit then
70+
break
71+
end
72+
end
73+
74+
return tuples_count
75+
end
76+
77+
function count.init()
78+
_G._crud.count_on_storage = count_on_storage
79+
end
80+
81+
-- returns result, err, need_reload
82+
-- need_reload indicates if reloading schema could help
83+
-- see crud.common.schema.wrap_func_reload()
84+
local function call_count_on_router(space_name, user_conditions, opts)
85+
checks('string', '?table', {
86+
timeout = '?number',
87+
bucket_id = '?number|cdata',
88+
force_map_call = '?boolean',
89+
yield_every = '?number',
90+
prefer_replica = '?boolean',
91+
balance = '?boolean',
92+
mode = '?string',
93+
})
94+
95+
opts = opts or {}
96+
97+
if opts.yield_every ~= nil and opts.yield_every < 1 then
98+
return nil, CountError:new("yield_every should be > 0")
99+
end
100+
101+
-- check conditions
102+
local conditions, err = compare_conditions.parse(user_conditions)
103+
if err ~= nil then
104+
return nil, CountError:new("Failed to parse conditions: %s", err)
105+
end
106+
107+
local replicasets, err = vshard.router.routeall()
108+
if err ~= nil then
109+
return nil, CountError:new("Failed to get all replicasets: %s", err)
110+
end
111+
112+
local space = utils.get_space(space_name, replicasets)
113+
if space == nil then
114+
return nil, CountError:new("Space %q doesn't exist", space_name), true
115+
end
116+
117+
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
118+
if err ~= nil then
119+
return nil, err
120+
end
121+
122+
-- plan count
123+
local plan, err = count_plan.new(space, conditions, {
124+
sharding_key_as_index_obj = sharding_key_as_index_obj,
125+
})
126+
if err ~= nil then
127+
return nil, CountError:new("Failed to plan count: %s", err), true
128+
end
129+
130+
-- set replicasets to count from
131+
local replicasets_to_count = replicasets
132+
133+
-- Whether to call one storage replicaset or perform
134+
-- map-reduce?
135+
--
136+
-- If map-reduce is requested explicitly, ignore provided
137+
-- bucket_id and fetch data from all storage replicasets.
138+
--
139+
-- Otherwise:
140+
--
141+
-- 1. If particular replicaset is pointed by a caller (using
142+
-- the bucket_id option[^1]), crud MUST fetch data only
143+
-- from this storage replicaset: disregarding whether other
144+
-- storages have tuples that fit given condition.
145+
--
146+
-- 2. If a replicaset may be deduced from conditions
147+
-- (conditions -> sharding key -> bucket id -> replicaset),
148+
-- fetch data only from the replicaset. It does not change
149+
-- the result[^2], but significantly reduces network
150+
-- pressure.
151+
--
152+
-- 3. Fallback to map-reduce otherwise.
153+
--
154+
-- [^1]: We can change meaning of this option in a future,
155+
-- see gh-190. But now bucket_id points a storage
156+
-- replicaset, not a virtual bucket.
157+
--
158+
-- [^2]: It is correct statement only if we'll turn a blind
159+
-- eye to resharding. However, AFAIU, the optimization
160+
-- does not make the result less consistent (sounds
161+
-- weird, huh?).
162+
local perform_map_reduce = opts.force_map_call == true or
163+
(opts.bucket_id == nil and plan.sharding_key == nil)
164+
if not perform_map_reduce then
165+
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
166+
assert(bucket_id ~= nil)
167+
168+
local err
169+
replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id)
170+
if err ~= nil then
171+
return nil, err, true
172+
end
173+
end
174+
175+
local yield_every = opts.yield_every or DEFAULT_YIELD_EVERY
176+
177+
-- call `count_on_storage` on all replicasets
178+
local call_opts = {
179+
mode = opts.mode or 'read',
180+
prefer_replica = opts.prefer_replica,
181+
balance = opts.balance,
182+
timeout = opts.timeout,
183+
replicasets = replicasets_to_count,
184+
}
185+
186+
local count_opts = {
187+
scan_value = plan.scan_value,
188+
tarantool_iter = plan.tarantool_iter,
189+
yield_every = yield_every,
190+
scan_condition_num = plan.scan_condition_num,
191+
}
192+
193+
local results, err = call.map(COUNT_FUNC_NAME, {
194+
space_name, plan.index_id, plan.conditions, count_opts
195+
}, call_opts)
196+
197+
if err ~= nil then
198+
return nil, CountError:new("Failed to call count on storage-side: %s", err)
199+
end
200+
201+
if results.err ~= nil then
202+
return nil, CountError:new("Failed to call count: %s", err)
203+
end
204+
205+
local total_count = 0
206+
for _, replicaset_results in pairs(results) do
207+
if replicaset_results[1] ~= nil then
208+
total_count = total_count + replicaset_results[1]
209+
end
210+
end
211+
212+
return total_count
213+
end
214+
215+
--- Calculates the number of tuples by conditions
216+
--
217+
-- @function call
218+
--
219+
-- @param string space_name
220+
-- A space name
221+
--
222+
-- @param ?table user_conditions
223+
-- Conditions by which tuples are counted,
224+
-- default value is nil
225+
--
226+
-- @tparam ?number opts.timeout
227+
-- Function call timeout in seconds,
228+
-- default value is 2 seconds
229+
--
230+
-- @tparam ?number opts.bucket_id
231+
-- Bucket ID
232+
-- default is vshard.router.bucket_id_strcrc32 of primary key
233+
--
234+
-- @tparam ?boolean opts.force_map_call
235+
-- Call is performed without any optimizations
236+
-- default is `false`
237+
--
238+
-- @tparam ?number opts.yield_every
239+
-- Number of tuples processed to yield after,
240+
-- default value is 1000
241+
--
242+
-- @tparam ?boolean opts.prefer_replica
243+
-- Call on replica if it's possible,
244+
-- default value is `nil`, which works as with `false`
245+
--
246+
-- @tparam ?boolean opts.balance
247+
-- Use replica according to round-robin load balancing
248+
-- default value is `nil`, which works as with `false`
249+
--
250+
-- @tparam ?string opts.mode
251+
-- vshard call mode, default value is `read`
252+
--
253+
-- @return[1] number
254+
-- @treturn[2] nil
255+
-- @treturn[2] table Error description
256+
--
257+
function count.call(space_name, user_conditions, opts)
258+
checks('string', '?table', {
259+
timeout = '?number',
260+
bucket_id = '?number|cdata',
261+
force_map_call = '?boolean',
262+
yield_every = '?number',
263+
prefer_replica = '?boolean',
264+
balance = '?boolean',
265+
mode = '?string',
266+
})
267+
268+
return schema.wrap_func_reload(call_count_on_router, space_name, user_conditions, opts)
269+
end
270+
271+
return count

test/entrypoint/srv_select.lua

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ package.preload['customers-storage'] = function()
111111
})
112112
coord_space:create_index('bucket_id', {
113113
parts = { {field = 'bucket_id'} },
114+
unique = false,
114115
if_not_exists = true,
115116
})
116117

0 commit comments

Comments
 (0)