diff --git a/.github/workflows/test_on_push.yaml b/.github/workflows/test_on_push.yaml index bcf6d348..2743aa57 100644 --- a/.github/workflows/test_on_push.yaml +++ b/.github/workflows/test_on_push.yaml @@ -13,13 +13,21 @@ jobs: matrix: # We need 1.10.6 here to check that module works with # old Tarantool versions that don't have "tuple-keydef"/"tuple-merger" support. - tarantool-version: ["1.10.6", "1.10", "2.2", "2.3", "2.4", "2.5", "2.6", "2.7"] + tarantool-version: ["1.10.6", "1.10", "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8"] + metrics-version: [""] remove-merger: [false] include: + - tarantool-version: "1.10" + metrics-version: "0.12.0" - tarantool-version: "2.7" remove-merger: true + - tarantool-version: "2.8" + metrics-version: "0.1.8" + - tarantool-version: "2.8" + metrics-version: "0.10.0" - tarantool-version: "2.8" coveralls: true + metrics-version: "0.12.0" fail-fast: false runs-on: [ubuntu-latest] steps: @@ -47,6 +55,10 @@ jobs: tarantool --version ./deps.sh + - name: Install metrics + if: matrix.metrics-version != '' + run: tarantoolctl rocks install metrics ${{ matrix.metrics-version }} + - name: Remove external merger if needed if: ${{ matrix.remove-merger }} run: rm .rocks/lib/tarantool/tuple/merger.so @@ -66,11 +78,47 @@ jobs: run: make -C build coveralls if: ${{ matrix.coveralls }} + run-perf-tests-ce: + if: | + github.event_name == 'push' || + github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository + strategy: + matrix: + tarantool-version: ["1.10", "2.8"] + metrics-version: ["0.12.0"] + fail-fast: false + runs-on: [ubuntu-latest] + steps: + - uses: actions/checkout@master + + - name: Setup Tarantool CE + uses: tarantool/setup-tarantool@v1 + with: + tarantool-version: ${{ matrix.tarantool-version }} + + - name: Install requirements for community + run: | + tarantool --version + ./deps.sh + + - name: Install metrics + run: tarantoolctl rocks install metrics ${{ matrix.metrics-version }} + + # This server starts and listen on 8084 port that is used for tests + - name: Stop Mono server + run: sudo kill -9 $(sudo lsof -t -i tcp:8084) || true + + - run: cmake -S . -B build + + - name: Run performance tests + run: make -C build performance + run-tests-ee: if: github.event_name == 'push' strategy: matrix: bundle_version: [ "1.10.11-0-gf0b0e7ecf-r422", "2.7.3-0-gdddf926c3-r422" ] + metrics-version: ["", "0.12.0"] fail-fast: false runs-on: [ ubuntu-latest ] steps: @@ -86,6 +134,12 @@ jobs: tarantool --version ./deps.sh + - name: Install metrics + if: matrix.metrics-version != '' + run: | + source tarantool-enterprise/env.sh + tarantoolctl rocks install metrics ${{ matrix.metrics-version }} + # This server starts and listen on 8084 port that is used for tests - name: Stop Mono server run: sudo kill -9 $(sudo lsof -t -i tcp:8084) || true diff --git a/.gitignore b/.gitignore index f7d45ec5..36d65d57 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ CMakeCache.txt CMakeFiles/ luacov.report.out luacov.stats.out +build/*.cmake +build/Makefile diff --git a/.luacheckrc b/.luacheckrc index 79021dfe..cf34b61f 100644 --- a/.luacheckrc +++ b/.luacheckrc @@ -3,3 +3,4 @@ globals = {'box', 'utf8', 'checkers', '_TARANTOOL'} include_files = {'**/*.lua', '*.luacheckrc', '*.rockspec'} exclude_files = {'**/*.rocks/', 'tmp/', 'tarantool-enterprise/'} max_line_length = 120 +max_comment_line_length = 150 diff --git a/CHANGELOG.md b/CHANGELOG.md index 3093ddef..135bfffe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +* Statistics for CRUD operations on router (#224). +* Integrate CRUD statistics with [`metrics`](https://github.com/tarantool/metrics) (#224). ### Changed diff --git a/CMakeLists.txt b/CMakeLists.txt index 714474a1..2e11fbf3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,6 +36,14 @@ add_custom_target(luatest COMMENT "Run regression tests" ) +set(PERFORMANCE_TESTS_SUBDIR "test/performance") + +add_custom_target(performance + COMMAND PERF_MODE_ON=true ${LUATEST} -v -c ${PERFORMANCE_TESTS_SUBDIR} + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + COMMENT "Run performance tests" +) + add_custom_target(coverage COMMAND ${LUACOV} ${PROJECT_SOURCE_DIR} && grep -A999 '^Summary' ${CODE_COVERAGE_REPORT} DEPENDS ${CODE_COVERAGE_STATS} diff --git a/README.md b/README.md index 7ce5f820..6805ca73 100644 --- a/README.md +++ b/README.md @@ -674,6 +674,171 @@ Combinations of `mode`, `prefer_replica` and `balance` options lead to: * prefer_replica, balance - [vshard call `callbre`](https://www.tarantool.io/en/doc/latest/reference/reference_rock/vshard/vshard_api/#router-api-callbre) +### Statistics + +`crud` routers can provide statistics on called operations. +```lua +-- Enable statistics collect. +crud.cfg{ stats = true } + +-- Returns table with statistics information. +crud.stats() + +-- Returns table with statistics information for specific space. +crud.stats('my_space') + +-- Disables statistics collect and destroys all collectors. +crud.cfg{ stats = false } + +-- Destroys all statistics collectors and creates them again. +crud.reset_stats() +``` + +If [`metrics`](https://github.com/tarantool/metrics) `0.10.0` or greater +found, metrics collectors will be used by default to store statistics +instead of local collectors. Quantiles in metrics summary collections +are disabled by default. You can manually choose driver and enable quantiles. +```lua +-- Use simple local collectors (default if no required metrics version found). +crud.cfg{ stats = true, stats_driver = 'local' } + +-- Use metrics collectors (default if metrics rock found). +crud.cfg{ stats = true, stats_driver = 'metrics' } + +-- Use metrics collectors with 0.99 quantiles. +crud.cfg{ stats = true, stats_driver = 'metrics', stats_quantiles = true } +``` + +You can use `crud.cfg` to check current stats state. +```lua +crud.cfg +--- +- stats_quantiles: true + stats: true + stats_driver: metrics +... +``` +Performance overhead is 3-10% in case of `local` driver and +5-15% in case of `metrics` driver, up to 20% for `metrics` with quantiles. + +Beware that iterating through `crud.cfg` with pairs is not supported yet, +refer to [tarantool/crud#265](https://github.com/tarantool/crud/issues/265). + +Format is as follows. +```lua +crud.stats() +--- +- spaces: + my_space: + insert: + ok: + latency: 0.002 + count: 19800 + time: 39.6 + error: + latency: 0.000001 + count: 4 + time: 0.000004 +... +crud.stats('my_space') +--- +- insert: + ok: + latency: 0.002 + count: 19800 + time: 39.6 + error: + latency: 0.000001 + count: 4 + time: 0.000004 +... +``` +`spaces` section contains statistics for each observed space. +If operation has never been called for a space, the corresponding +field will be empty. If no requests has been called for a +space, it will not be represented. Space data is based on +client requests rather than storages schema, so requests +for non-existing spaces are also collected. + +Possible statistics operation labels are +`insert` (for `insert` and `insert_object` calls), +`get`, `replace` (for `replace` and `replace_object` calls), `update`, +`upsert` (for `upsert` and `upsert_object` calls), `delete`, +`select` (for `select` and `pairs` calls), `truncate`, `len`, `count` +and `borders` (for `min` and `max` calls). + +Each operation section consists of different collectors +for success calls and error (both error throw and `nil, err`) +returns. `count` is the total requests count since instance start +or stats restart. `latency` is the 0.99 quantile of request execution +time if `metrics` driver used and quantiles enabled, +otherwise `latency` is the total average. +`time` is the total time of requests execution. + +In [`metrics`](https://www.tarantool.io/en/doc/latest/book/monitoring/) +registry statistics are stored as `tnt_crud_stats` metrics +with `operation`, `status` and `name` labels. +``` +metrics:collect() +--- +- - label_pairs: + status: ok + operation: insert + name: customers + value: 221411 + metric_name: tnt_crud_stats_count + - label_pairs: + status: ok + operation: insert + name: customers + value: 10.49834896344692 + metric_name: tnt_crud_stats_sum + - label_pairs: + status: ok + operation: insert + name: customers + quantile: 0.99 + value: 0.00023606420935973 + metric_name: tnt_crud_stats +... +``` + +`select` section additionally contains `details` collectors. +```lua +crud.stats('my_space').select.details +--- +- map_reduces: 4 + tuples_fetched: 10500 + tuples_lookup: 238000 +... +``` +`map_reduces` is the count of planned map reduces (including those not +executed successfully). `tuples_fetched` is the count of tuples fetched +from storages during execution, `tuples_lookup` is the count of tuples +looked up on storages while collecting responses for calls (including +scrolls for multibatch requests). Details data is updated as part of +the request process, so you may get new details before `select`/`pairs` +call is finished and observed with count, latency and time collectors. +In [`metrics`](https://www.tarantool.io/en/doc/latest/book/monitoring/) +registry they are stored as `tnt_crud_map_reduces`, +`tnt_crud_tuples_fetched` and `tnt_crud_tuples_lookup` metrics +with `{ operation = 'select', name = space_name }` labels. + +Since `pairs` request behavior differs from any other crud request, its +statistics collection also has specific behavior. Statistics (`select` +section) are updated after `pairs` cycle is finished: you +either have iterated through all records or an error was thrown. +If your pairs cycle was interrupted with `break`, statistics will +be collected when pairs objects are cleaned up with Lua garbage +collector. + +Statistics are preserved between package reloads. Statistics are preserved +between [Tarantool Cartridge role reloads](https://www.tarantool.io/en/doc/latest/book/cartridge/cartridge_api/modules/cartridge.roles/#reload) +if you use CRUD Cartridge roles. Beware that metrics 0.12.0 and below do not +support preserving stats between role reload +(see [tarantool/metrics#334](https://github.com/tarantool/metrics/issues/334)), +thus this feature will be unsupported for `metrics` driver. + ## Cartridge roles `cartridge.roles.crud-storage` is a Tarantool Cartridge role that depends on the diff --git a/cartridge/roles/crud-router.lua b/cartridge/roles/crud-router.lua index ef510e51..1c4d43fe 100644 --- a/cartridge/roles/crud-router.lua +++ b/cartridge/roles/crud-router.lua @@ -1,8 +1,10 @@ local crud = require('crud') +local stash = require('crud.common.stash') -- removes routes that changed in config and adds new routes local function init() crud.init_router() + stash.setup_cartridge_reload() end local function stop() diff --git a/cartridge/roles/crud-storage.lua b/cartridge/roles/crud-storage.lua index 8371c428..3728c88c 100644 --- a/cartridge/roles/crud-storage.lua +++ b/cartridge/roles/crud-storage.lua @@ -1,7 +1,9 @@ local crud = require('crud') +local stash = require('crud.common.stash') local function init() crud.init_storage() + stash.setup_cartridge_reload() end local function stop() diff --git a/crud.lua b/crud.lua index b045e7fe..3f2b5c59 100644 --- a/crud.lua +++ b/crud.lua @@ -2,6 +2,7 @@ -- -- @module crud +local cfg = require('crud.cfg') local insert = require('crud.insert') local replace = require('crud.replace') local get = require('crud.get') @@ -15,6 +16,7 @@ local count = require('crud.count') local borders = require('crud.borders') local sharding_metadata = require('crud.common.sharding.sharding_metadata') local utils = require('crud.common.utils') +local stats = require('crud.stats') local crud = {} @@ -23,47 +25,47 @@ local crud = {} -- @refer insert.tuple -- @function insert -crud.insert = insert.tuple +crud.insert = stats.wrap(insert.tuple, stats.op.INSERT) -- @refer insert.object -- @function insert_object -crud.insert_object = insert.object +crud.insert_object = stats.wrap(insert.object, stats.op.INSERT) -- @refer get.call -- @function get -crud.get = get.call +crud.get = stats.wrap(get.call, stats.op.GET) -- @refer replace.tuple -- @function replace -crud.replace = replace.tuple +crud.replace = stats.wrap(replace.tuple, stats.op.REPLACE) -- @refer replace.object -- @function replace_object -crud.replace_object = replace.object +crud.replace_object = stats.wrap(replace.object, stats.op.REPLACE) -- @refer update.call -- @function update -crud.update = update.call +crud.update = stats.wrap(update.call, stats.op.UPDATE) -- @refer upsert.tuple -- @function upsert -crud.upsert = upsert.tuple +crud.upsert = stats.wrap(upsert.tuple, stats.op.UPSERT) -- @refer upsert.object -- @function upsert -crud.upsert_object = upsert.object +crud.upsert_object = stats.wrap(upsert.object, stats.op.UPSERT) -- @refer delete.call -- @function delete -crud.delete = delete.call +crud.delete = stats.wrap(delete.call, stats.op.DELETE) -- @refer select.call -- @function select -crud.select = select.call +crud.select = stats.wrap(select.call, stats.op.SELECT) -- @refer select.pairs -- @function pairs -crud.pairs = select.pairs +crud.pairs = stats.wrap(select.pairs, stats.op.SELECT, { pairs = true }) -- @refer utils.unflatten_rows -- @function unflatten_rows @@ -71,23 +73,23 @@ crud.unflatten_rows = utils.unflatten_rows -- @refer truncate.call -- @function truncate -crud.truncate = truncate.call +crud.truncate = stats.wrap(truncate.call, stats.op.TRUNCATE) -- @refer len.call -- @function len -crud.len = len.call +crud.len = stats.wrap(len.call, stats.op.LEN) -- @refer count.call -- @function count -crud.count = count.call +crud.count = stats.wrap(count.call, stats.op.COUNT) -- @refer borders.min -- @function min -crud.min = borders.min +crud.min = stats.wrap(borders.min, stats.op.BORDERS) -- @refer borders.max -- @function max -crud.max = borders.max +crud.max = stats.wrap(borders.max, stats.op.BORDERS) -- @refer utils.cut_rows -- @function cut_rows @@ -97,6 +99,18 @@ crud.cut_rows = utils.cut_rows -- @function cut_objects crud.cut_objects = utils.cut_objects +-- @refer cfg.cfg +-- @function cfg +crud.cfg = cfg.cfg + +-- @refer stats.get +-- @function stats +crud.stats = stats.get + +-- @refer stats.reset +-- @function reset_stats +crud.reset_stats = stats.reset + --- Initializes crud on node -- -- Exports all functions that are used for calls diff --git a/crud/cfg.lua b/crud/cfg.lua new file mode 100644 index 00000000..f816df6c --- /dev/null +++ b/crud/cfg.lua @@ -0,0 +1,118 @@ +---- Module for CRUD configuration. +-- @module crud.cfg +-- + +local checks = require('checks') +local errors = require('errors') + +local stash = require('crud.common.stash') +local stats = require('crud.stats') + +local CfgError = errors.new_class('CfgError', {capture_stack = false}) + +local cfg_module = {} + +local function set_defaults_if_empty(cfg) + if cfg.stats == nil then + cfg.stats = false + end + + if cfg.stats_driver == nil then + cfg.stats_driver = stats.get_default_driver() + end + + if cfg.stats_quantiles == nil then + cfg.stats_quantiles = false + end + + return cfg +end + +local cfg = set_defaults_if_empty(stash.get(stash.name.cfg)) + +local function configure_stats(cfg, opts) + if (opts.stats == nil) + and (opts.stats_driver == nil) + and (opts.stats_quantiles == nil) then + return + end + + if opts.stats == nil then + opts.stats = cfg.stats + end + + if opts.stats_driver == nil then + opts.stats_driver = cfg.stats_driver + end + + if opts.stats_quantiles == nil then + opts.stats_quantiles = cfg.stats_quantiles + end + + if opts.stats == true then + stats.enable{ driver = opts.stats_driver, quantiles = opts.stats_quantiles } + else + stats.disable() + end + + rawset(cfg, 'stats', opts.stats) + rawset(cfg, 'stats_driver', opts.stats_driver) + rawset(cfg, 'stats_quantiles', opts.stats_quantiles) +end + +--- Configure CRUD module. +-- +-- @function __call +-- +-- @tab self +-- +-- @tab[opt] opts +-- +-- @bool[opt] opts.stats +-- Enable or disable statistics collect. +-- Statistics are observed only on router instances. +-- +-- @string[opt] opts.stats_driver +-- `'local'` or `'metrics'`. +-- If `'local'`, stores statistics in local registry (some Lua tables) +-- and computes latency as overall average. `'metrics'` requires +-- `metrics >= 0.10.0` installed and stores statistics in +-- global metrics registry (integrated with exporters). +-- `'metrics'` driver supports computing latency as 0.99 quantile with aging. +-- If `'metrics'` driver is available, it is used by default, +-- otherwise `'local'` is used. +-- +-- @bool[opt] opts.stats_quantiles +-- Enable or disable statistics quantiles (only for metrics driver). +-- Quantiles computations increases performance overhead up to 10%. +-- +-- @return Configuration table. +-- +local function __call(self, opts) + checks('table', { + stats = '?boolean', + stats_driver = '?string', + stats_quantiles = '?boolean' + }) + + opts = table.deepcopy(opts) or {} + + configure_stats(cfg, opts) + + return self +end + +local function __newindex() + CfgError:assert(false, 'Use crud.cfg{} instead') +end + +-- Iterating through `crud.cfg` with pairs is not supported +-- yet, refer to tarantool/crud#265. +cfg_module.cfg = setmetatable({}, { + __index = cfg, + __newindex = __newindex, + __call = __call, + __serialize = function() return cfg end +}) + +return cfg_module diff --git a/crud/common/stash.lua b/crud/common/stash.lua new file mode 100644 index 00000000..0557cb40 --- /dev/null +++ b/crud/common/stash.lua @@ -0,0 +1,67 @@ +---- Module for preserving data between reloads. +-- @module crud.common.stash +-- +local dev_checks = require('crud.common.dev_checks') + +local stash = {} + +--- Available stashes list. +-- +-- @tfield string cfg +-- Stash for CRUD module configuration. +-- +-- @tfield string stats_internal +-- Stash for main stats module. +-- +-- @tfield string stats_local_registry +-- Stash for local metrics registry. +-- +-- @tfield string stats_metrics_registry +-- Stash for metrics rocks statistics registry. +-- +stash.name = { + cfg = '__crud_cfg', + stats_internal = '__crud_stats_internal', + stats_local_registry = '__crud_stats_local_registry', + stats_metrics_registry = '__crud_stats_metrics_registry' +} + +--- Setup Tarantool Cartridge reload. +-- +-- Call on Tarantool Cartridge roles that are expected +-- to use stashes. +-- +-- @function setup_cartridge_reload +-- +-- @return Returns +-- +function stash.setup_cartridge_reload() + local hotreload = require('cartridge.hotreload') + for _, name in pairs(stash.name) do + hotreload.whitelist_globals({ name }) + end +end + +--- Get a stash instance, initialize if needed. +-- +-- Stashes are persistent to package reload. +-- To use them with Cartridge roles reload, +-- call `stash.setup_cartridge_reload` in role. +-- +-- @function get +-- +-- @string name +-- Stash identifier. Use one from `stash.name` table. +-- +-- @treturn table A stash instance. +-- +function stash.get(name) + dev_checks('string') + + local instance = rawget(_G, name) or {} + rawset(_G, name, instance) + + return instance +end + +return stash diff --git a/crud/select.lua b/crud/select.lua index a633f86b..b0d1ef9b 100644 --- a/crud/select.lua +++ b/crud/select.lua @@ -59,7 +59,7 @@ local function select_on_storage(space_name, index_id, conditions, opts) end -- execute select - local tuples, err = select_executor.execute(space, index, filter_func, { + local resp, err = select_executor.execute(space, index, filter_func, { scan_value = opts.scan_value, after_tuple = opts.after_tuple, tarantool_iter = opts.tarantool_iter, @@ -70,15 +70,20 @@ local function select_on_storage(space_name, index_id, conditions, opts) end local cursor - if #tuples < opts.limit or opts.limit == 0 then + if resp.tuples_fetched < opts.limit or opts.limit == 0 then cursor = {is_end = true} else - cursor = make_cursor(tuples) + cursor = make_cursor(resp.tuples) end + cursor.stats = { + tuples_lookup = resp.tuples_lookup, + tuples_fetched = resp.tuples_fetched, + } + -- getting tuples with user defined fields (if `fields` option is specified) -- and fields that are needed for comparison on router (primary key + scan key) - return cursor, schema.filter_tuples_fields(tuples, opts.field_names) + return cursor, schema.filter_tuples_fields(resp.tuples, opts.field_names) end function select_module.init() diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 1984a87a..a05e7a93 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -8,6 +8,7 @@ local dev_checks = require('crud.common.dev_checks') local common = require('crud.select.compat.common') local schema = require('crud.common.schema') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') +local stats = require('crud.stats') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.compare.plan') @@ -115,6 +116,8 @@ local function build_select_iterator(space_name, user_conditions, opts) if err ~= nil then return nil, err, true end + else + stats.update_map_reduces(space_name) end local tuples_limit = opts.first diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 1cf88744..a4d79e85 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -9,6 +9,7 @@ local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') +local stats = require('crud.stats') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.compare.plan') @@ -59,6 +60,14 @@ local function select_iteration(space_name, plan, opts) local tuples = {} for replicaset_uuid, replicaset_results in pairs(results) do + -- Stats extracted with callback here and not passed + -- outside to wrapper because fetch for pairs can be + -- called even after pairs() return from generators. + local cursor = replicaset_results[1] + if cursor.stats ~= nil then + stats.update_fetch_stats(cursor.stats, space_name) + end + tuples[replicaset_uuid] = replicaset_results[2] end @@ -141,6 +150,8 @@ local function build_select_iterator(space_name, user_conditions, opts) if err ~= nil then return nil, err, true end + else + stats.update_map_reduces(space_name) end -- generate tuples comparator diff --git a/crud/select/executor.lua b/crud/select/executor.lua index 6d6f7483..10309be2 100644 --- a/crud/select/executor.lua +++ b/crud/select/executor.lua @@ -1,4 +1,5 @@ local errors = require('errors') +local fun = require('fun') local dev_checks = require('crud.common.dev_checks') local select_comparators = require('crud.compare.comparators') @@ -68,13 +69,12 @@ function executor.execute(space, index, filter_func, opts) opts = opts or {} + local resp = { tuples_fetched = 0, tuples_lookup = 0, tuples = {} } + if opts.limit == 0 then - return {} + return resp end - local tuples = {} - local tuples_count = 0 - local value = opts.scan_value if opts.after_tuple ~= nil then local new_value = generate_value(opts.after_tuple, opts.scan_value, index.parts, opts.tarantool_iter) @@ -84,7 +84,16 @@ function executor.execute(space, index, filter_func, opts) end local tuple - local gen = index:pairs(value, {iterator = opts.tarantool_iter}) + local raw_gen, param, state = index:pairs(value, {iterator = opts.tarantool_iter}) + local gen = fun.wrap(function(param, state) + local next_state, var = raw_gen(param, state) + + if var ~= nil then + resp.tuples_lookup = resp.tuples_lookup + 1 + end + + return next_state, var + end, param, state) if opts.after_tuple ~= nil then local err @@ -94,7 +103,7 @@ function executor.execute(space, index, filter_func, opts) end if tuple == nil then - return {} + return resp end end @@ -110,10 +119,10 @@ function executor.execute(space, index, filter_func, opts) local matched, early_exit = filter_func(tuple) if matched then - table.insert(tuples, tuple) - tuples_count = tuples_count + 1 + table.insert(resp.tuples, tuple) + resp.tuples_fetched = resp.tuples_fetched + 1 - if opts.limit ~= nil and tuples_count >= opts.limit then + if opts.limit ~= nil and resp.tuples_fetched >= opts.limit then break end elseif early_exit then @@ -123,7 +132,7 @@ function executor.execute(space, index, filter_func, opts) gen.state, tuple = gen(gen.param, gen.state) end - return tuples + return resp end return executor diff --git a/crud/select/merger.lua b/crud/select/merger.lua index fa443b84..e3c1bdf4 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -7,6 +7,7 @@ local compat = require('crud.common.compat') local merger_lib = compat.require('tuple.merger', 'merger') local Keydef = require('crud.compare.keydef') +local stats = require('crud.stats') local function bswap_u16(num) return bit.rshift(bit.bswap(tonumber(num)), 16) @@ -93,6 +94,7 @@ local function fetch_chunk(context, state) local replicaset = context.replicaset local vshard_call_name = context.vshard_call_name local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local space_name = context.space_name local future = state.future -- The source was entirely drained. @@ -109,6 +111,14 @@ local function fetch_chunk(context, state) -- Decode metainfo, leave data to be processed by the merger. local cursor = decode_metainfo(buf) + -- Extract stats info. + -- Stats extracted with callback here and not passed + -- outside to wrapper because fetch for pairs can be + -- called even after pairs() return from generators. + if cursor.stats ~= nil then + stats.update_fetch_stats(cursor.stats, space_name) + end + -- Check whether we need the next call. if cursor.is_end then local next_state = {} @@ -157,6 +167,7 @@ local function new(replicasets, space, index_id, func_name, func_args, opts) replicaset = replicaset, vshard_call_name = vshard_call_name, timeout = call_opts.timeout, + space_name = space.name, } local state = {future = future} local source = merger_lib.new_buffer_source(fetch_chunk, context, state) diff --git a/crud/stats/init.lua b/crud/stats/init.lua new file mode 100644 index 00000000..741574e1 --- /dev/null +++ b/crud/stats/init.lua @@ -0,0 +1,472 @@ +---- CRUD statistics module. +-- @module crud.stats +-- + +local clock = require('clock') +local checks = require('checks') +local errors = require('errors') +local fiber = require('fiber') +local fun = require('fun') +local log = require('log') +local vshard = require('vshard') + +local dev_checks = require('crud.common.dev_checks') +local stash = require('crud.common.stash') +local utils = require('crud.common.utils') +local op_module = require('crud.stats.operation') + +local StatsError = errors.new_class('StatsError', {capture_stack = false}) + +local stats = {} +local internal = stash.get(stash.name.stats_internal) + +local local_registry = require('crud.stats.local_registry') +local metrics_registry = require('crud.stats.metrics_registry') + +local drivers = { + ['local'] = local_registry, +} +if metrics_registry.is_supported() then + drivers['metrics'] = metrics_registry +end + +function internal:get_registry() + if self.driver == nil then + return nil + end + return drivers[self.driver] +end + +--- Check if statistics module was enabled. +-- +-- @function is_enabled +-- +-- @treturn boolean Returns `true` or `false`. +-- +function stats.is_enabled() + return internal.driver ~= nil +end + +--- Get default statistics driver name. +-- +-- @function get_default_driver +-- +-- @treturn string `metrics` if supported, `local` if unsupported. +-- +function stats.get_default_driver() + if drivers.metrics ~= nil then + return 'metrics' + else + return 'local' + end +end + +--- Initializes statistics registry, enables callbacks and wrappers. +-- +-- If already enabled, do nothing. +-- +-- @function enable +-- +-- @tab[opt] opts +-- +-- @string[opt] opts.driver +-- `'local'` or `'metrics'`. +-- If `'local'`, stores statistics in local registry (some Lua tables) +-- and computes latency as overall average. `'metrics'` requires +-- `metrics >= 0.9.0` installed and stores statistics in +-- global metrics registry (integrated with exporters). +-- `'metrics'` driver supports computing latency as 0.99 quantile with aging. +-- If `'metrics'` driver is available, it is used by default, +-- otherwise `'local'` is used. +-- +-- @bool[opt=false] opts.quantiles +-- If `'metrics'` driver used, you can enable +-- computing requests latency as 0.99 quantile with aging. +-- Performance overhead for enabling is near 10%. +-- +-- @treturn boolean Returns `true`. +-- +function stats.enable(opts) + checks({ driver = '?string', quantiles = '?boolean' }) + + StatsError:assert( + rawget(_G, 'crud') ~= nil, + 'Can be enabled only on crud router' + ) + + opts = table.deepcopy(opts) or {} + if opts.driver == nil then + opts.driver = stats.get_default_driver() + end + + StatsError:assert( + drivers[opts.driver] ~= nil, + 'Unsupported driver: %s', opts.driver + ) + + if opts.quantiles == nil then + opts.quantiles = false + end + + -- Do not reinit if called with same options. + if internal.driver == opts.driver + and internal.quantiles == opts.quantiles then + return true + end + + -- Disable old driver registry, if another one was requested. + stats.disable() + + internal.driver = opts.driver + internal.quantiles = opts.quantiles + + internal:get_registry().init({ quantiles = internal.quantiles }) + + return true +end + +--- Resets statistics registry. +-- +-- After reset collectors are the same as right +-- after initial `stats.enable()`. +-- +-- @function reset +-- +-- @treturn boolean Returns true. +-- +function stats.reset() + if not stats.is_enabled() then + return true + end + + internal:get_registry().destroy() + internal:get_registry().init({ quantiles = internal.quantiles }) + + return true +end + +--- Destroys statistics registry and disable callbacks. +-- +-- If already disabled, do nothing. +-- +-- @function disable +-- +-- @treturn boolean Returns true. +-- +function stats.disable() + if not stats.is_enabled() then + return true + end + + internal:get_registry().destroy() + internal.driver = nil + internal.quantiles = nil + + return true +end + +--- Get statistics on CRUD operations. +-- +-- @function get +-- +-- @string[opt] space_name +-- If specified, returns table with statistics +-- of operations on space, separated by operation type and +-- execution status. If there wasn't any requests of "op" type +-- for space, there won't be corresponding collectors. +-- If not specified, returns table with statistics +-- about all observed spaces. +-- +-- @treturn table Statistics on CRUD operations. +-- If statistics disabled, returns `{}`. +-- +function stats.get(space_name) + checks('?string') + + if not stats.is_enabled() then + return {} + end + + return internal:get_registry().get(space_name) +end + +local function resolve_space_name(space_id) + local replicasets = vshard.router.routeall() + if next(replicasets) == nil then + log.warn('Failed to resolve space name for stats: no replicasets found') + return nil + end + + local space = utils.get_space(space_id, replicasets) + if space == nil then + log.warn('Failed to resolve space name for stats: no space found for id %d', space_id) + return nil + end + + return space.name +end + +-- Hack to set __gc for a table in Lua 5.1 +-- See https://stackoverflow.com/questions/27426704/lua-5-1-workaround-for-gc-metamethod-for-tables +-- or https://habr.com/ru/post/346892/ +local function setmt__gc(t, mt) + local prox = newproxy(true) + getmetatable(prox).__gc = function() mt.__gc(t) end + t[prox] = true + return setmetatable(t, mt) +end + +-- If jit will be enabled here, gc_observer usage +-- may be optimized so our __gc hack will not work. +local function keep_observer_alive(gc_observer) --luacheck: ignore +end +jit.off(keep_observer_alive) + +local function wrap_pairs_gen(build_latency, space_name, op, gen, param, state) + local total_latency = build_latency + + local registry = internal:get_registry() + + -- If pairs() cycle will be interrupted with break, + -- we'll never get a proper obervation. + -- We create an object with the same lifespan as gen() + -- function so if someone break pairs cycle, + -- it still will be observed. + local observed = false + + local gc_observer = setmt__gc({}, { + __gc = function() + if observed == false then + -- Do not call observe directly because metrics + -- collectors may yield, for example + -- https://github.com/tarantool/metrics/blob/a23f8d49779205dd45bd211e21a1d34f26010382/metrics/collectors/shared.lua#L85 + -- Calling fiber.yield is prohibited in gc. + fiber.new(registry.observe, total_latency, space_name, op, 'ok') + observed = true + end + end + }) + + local wrapped_gen = function(param, state) + -- Mess with gc_observer so its lifespan will + -- be the same as wrapped_gen() function. + keep_observer_alive(gc_observer) + + local start_time = clock.monotonic() + + local status, next_state, var = pcall(gen, param, state) + + local finish_time = clock.monotonic() + + total_latency = total_latency + (finish_time - start_time) + + if status == false then + registry.observe(total_latency, space_name, op, 'error') + observed = true + error(next_state, 2) + end + + -- Observe stats in the end of pairs cycle + if var == nil then + registry.observe(total_latency, space_name, op, 'ok') + observed = true + return nil + end + + return next_state, var + end + + return fun.wrap(wrapped_gen, param, state) +end + +local function wrap_tail(space_name, op, pairs, start_time, call_status, ...) + dev_checks('string|number', 'string', 'boolean', 'number', 'boolean') + + local finish_time = clock.monotonic() + local latency = finish_time - start_time + + local registry = internal:get_registry() + + -- If space id is provided instead of name, try to resolve name. + -- If resolve have failed, use id as string to observe space. + -- If using space id will be deprecated, remove this code as well, + -- see https://github.com/tarantool/crud/issues/255 + if type(space_name) ~= 'string' then + local name = resolve_space_name(space_name) + if name ~= nil then + space_name = name + else + space_name = tostring(space_name) + end + end + + if call_status == false then + registry.observe(latency, space_name, op, 'error') + error((...), 2) + end + + if pairs == false then + if select(2, ...) ~= nil then + -- If not `pairs` call, return values `nil, err` + -- treated as error case. + registry.observe(latency, space_name, op, 'error') + return ... + else + registry.observe(latency, space_name, op, 'ok') + return ... + end + else + return wrap_pairs_gen(latency, space_name, op, ...) + end +end + +--- Wrap CRUD operation call to collect statistics. +-- +-- Approach based on `box.atomic()`: +-- https://github.com/tarantool/tarantool/blob/b9f7204b5e0d10b443c6f198e9f7f04e0d16a867/src/box/lua/schema.lua#L369 +-- +-- @function wrap +-- +-- @func func +-- Function to wrap. First argument is expected to +-- be a space name string. If statistics enabled, +-- errors are caught and thrown again. +-- +-- @string op +-- Label of registry collectors. +-- Use `require('crud.stats').op` to pick one. +-- +-- @tab[opt] opts +-- +-- @bool[opt=false] opts.pairs +-- If false, wraps only function passed as argument. +-- Second return value of wrapped function is treated +-- as error (`nil, err` case). +-- If true, also wraps gen() function returned by +-- call. Statistics observed on cycle end (last +-- element was fetched or error was thrown). If pairs +-- cycle was interrupted with `break`, statistics will +-- be collected when pairs objects are cleaned up with +-- Lua garbage collector. +-- +-- @return Wrapped function output. +-- +function stats.wrap(func, op, opts) + dev_checks('function', 'string', { pairs = '?boolean' }) + + local pairs + if type(opts) == 'table' and opts.pairs ~= nil then + pairs = opts.pairs + else + pairs = false + end + + return function(space_name, ...) + if not stats.is_enabled() then + return func(space_name, ...) + end + + local start_time = clock.monotonic() + + return wrap_tail( + space_name, op, pairs, start_time, + pcall(func, space_name, ...) + ) + end +end + +local storage_stats_schema = { tuples_fetched = 'number', tuples_lookup = 'number' } +--- Callback to collect storage tuples stats (select/pairs). +-- +-- @function update_fetch_stats +-- +-- @tab storage_stats +-- Statistics from select storage call. +-- +-- @number storage_stats.tuples_fetched +-- Count of tuples fetched during storage call. +-- +-- @number storage_stats.tuples_lookup +-- Count of tuples looked up on storages while collecting response. +-- +-- @string space_name +-- Name of space. +-- +-- @treturn boolean Returns `true`. +-- +function stats.update_fetch_stats(storage_stats, space_name) + dev_checks(storage_stats_schema, 'string') + + if not stats.is_enabled() then + return true + end + + internal:get_registry().observe_fetch( + storage_stats.tuples_fetched, + storage_stats.tuples_lookup, + space_name + ) + + return true +end + +--- Callback to collect planned map reduces stats (select/pairs). +-- +-- @function update_map_reduces +-- +-- @string space_name +-- Name of space. +-- +-- @treturn boolean Returns `true`. +-- +function stats.update_map_reduces(space_name) + dev_checks('string') + + if not stats.is_enabled() then + return true + end + + internal:get_registry().observe_map_reduces(1, space_name) + + return true +end + +--- Table with CRUD operation lables. +-- +-- @tfield string INSERT +-- Identifies both `insert` and `insert_object`. +-- +-- @tfield string GET +-- +-- @tfield string REPLACE +-- Identifies both `replace` and `replace_object`. +-- +-- @tfield string UPDATE +-- +-- @tfield string UPSERT +-- Identifies both `upsert` and `upsert_object`. +-- +-- @tfield string DELETE +-- +-- @tfield string SELECT +-- Identifies both `pairs` and `select`. +-- +-- @tfield string TRUNCATE +-- +-- @tfield string LEN +-- +-- @tfield string COUNT +-- +-- @tfield string BORDERS +-- Identifies both `min` and `max`. +-- +stats.op = op_module + +--- Stats module internal state (for debug/test). +-- +-- @tfield[opt] string driver Current statistics registry driver (if nil, stats disabled). +-- +-- @tfield[opt] boolean quantiles Is quantiles computed. +stats.internal = internal + +return stats diff --git a/crud/stats/local_registry.lua b/crud/stats/local_registry.lua new file mode 100644 index 00000000..0fd6be5e --- /dev/null +++ b/crud/stats/local_registry.lua @@ -0,0 +1,167 @@ +---- Internal module used to store statistics. +-- @module crud.stats.local_registry +-- + +local errors = require('errors') + +local dev_checks = require('crud.common.dev_checks') +local stash = require('crud.common.stash') +local op_module = require('crud.stats.operation') +local registry_utils = require('crud.stats.registry_utils') + +local registry = {} +local internal = stash.get(stash.name.stats_local_registry) +local StatsLocalError = errors.new_class('StatsLocalError', {capture_stack = false}) + +--- Initialize local metrics registry. +-- +-- Registries are not meant to used explicitly +-- by users, init is not guaranteed to be idempotent. +-- +-- @function init +-- +-- @tab opts +-- +-- @bool opts.quantiles +-- Quantiles is not supported for local, only `false` is valid. +-- +-- @treturn boolean Returns `true`. +-- +function registry.init(opts) + dev_checks({ quantiles = 'boolean' }) + + StatsLocalError:assert(opts.quantiles == false, + "Quantiles are not supported for 'local' statistics registry") + + internal.registry = {} + internal.registry.spaces = {} + + return true +end + +--- Destroy local metrics registry. +-- +-- Registries are not meant to used explicitly +-- by users, destroy is not guaranteed to be idempotent. +-- +-- @function destroy +-- +-- @treturn boolean Returns `true`. +-- +function registry.destroy() + internal.registry = nil + + return true +end + +--- Get copy of local metrics registry. +-- +-- Registries are not meant to used explicitly +-- by users, get is not guaranteed to work without init. +-- +-- @function get +-- +-- @string[opt] space_name +-- If specified, returns table with statistics +-- of operations on table, separated by operation type and +-- execution status. If there wasn't any requests for table, +-- returns `{}`. If not specified, returns table with statistics +-- about all observed spaces. +-- +-- @treturn table Returns copy of metrics registry (or registry section). +-- +function registry.get(space_name) + dev_checks('?string') + + if space_name ~= nil then + return table.deepcopy(internal.registry.spaces[space_name]) or {} + end + + return table.deepcopy(internal.registry) +end + +--- Increase requests count and update latency info. +-- +-- @function observe +-- +-- @string space_name +-- Name of space. +-- +-- @number latency +-- Time of call execution. +-- +-- @string op +-- Label of registry collectors. +-- Use `require('crud.stats').op` to pick one. +-- +-- @string success +-- `'ok'` if no errors on execution, `'error'` otherwise. +-- +-- @treturn boolean Returns `true`. +-- +function registry.observe(latency, space_name, op, status) + dev_checks('number', 'string', 'string', 'string') + + registry_utils.init_collectors_if_required(internal.registry.spaces, space_name, op) + local collectors = internal.registry.spaces[space_name][op][status] + + collectors.count = collectors.count + 1 + collectors.time = collectors.time + latency + collectors.latency = collectors.time / collectors.count + + return true +end + +--- Increase statistics of storage select/pairs calls +-- +-- @function observe_fetch +-- +-- @string space_name +-- Name of space. +-- +-- @number tuples_fetched +-- Count of tuples fetched during storage call. +-- +-- @number tuples_lookup +-- Count of tuples looked up on storages while collecting response. +-- +-- @treturn boolean Returns true. +-- +function registry.observe_fetch(tuples_fetched, tuples_lookup, space_name) + dev_checks('number', 'number', 'string') + + local op = op_module.SELECT + registry_utils.init_collectors_if_required(internal.registry.spaces, space_name, op) + local collectors = internal.registry.spaces[space_name][op].details + + collectors.tuples_fetched = collectors.tuples_fetched + tuples_fetched + collectors.tuples_lookup = collectors.tuples_lookup + tuples_lookup + + return true +end + +--- Increase statistics of planned map reduces during select/pairs +-- +-- @function observe_map_reduces +-- +-- @number count +-- Count of map reduces planned. +-- +-- @string space_name +-- Name of space. +-- +-- @treturn boolean Returns true. +-- +function registry.observe_map_reduces(count, space_name) + dev_checks('number', 'string') + + local op = op_module.SELECT + registry_utils.init_collectors_if_required(internal.registry.spaces, space_name, op) + local collectors = internal.registry.spaces[space_name][op].details + + collectors.map_reduces = collectors.map_reduces + count + + return true +end + +return registry diff --git a/crud/stats/metrics_registry.lua b/crud/stats/metrics_registry.lua new file mode 100644 index 00000000..0716aa4c --- /dev/null +++ b/crud/stats/metrics_registry.lua @@ -0,0 +1,376 @@ +---- Internal module used to store statistics in `metrics` registry. +-- @module crud.stats.metrics_registry +-- + +local is_package, metrics = pcall(require, 'metrics') + +local dev_checks = require('crud.common.dev_checks') +local op_module = require('crud.stats.operation') +local stash = require('crud.common.stash') +local registry_utils = require('crud.stats.registry_utils') + +local registry = {} +-- Used to cache collectors. +local internal = stash.get(stash.name.stats_metrics_registry) + +local metric_name = { + -- Summary collector for all operations. + stats = 'tnt_crud_stats', + -- `*_count` and `*_sum` are automatically created + -- by summary collector. + stats_count = 'tnt_crud_stats_count', + stats_sum = 'tnt_crud_stats_sum', + + -- Counter collectors for select/pairs details. + details = { + tuples_fetched = 'tnt_crud_tuples_fetched', + tuples_lookup = 'tnt_crud_tuples_lookup', + map_reduces = 'tnt_crud_map_reduces', + } +} + +local LATENCY_QUANTILE = 0.99 + +-- Increasing tolerance threshold affects performance. +local DEFAULT_QUANTILES = { + [LATENCY_QUANTILE] = 1e-2, +} + +local DEFAULT_AGE_PARAMS = { + age_buckets_count = 2, + max_age_time = 60, +} + +--- Check if application supports metrics rock for registry +-- +-- `metrics >= 0.10.0` is required. +-- `metrics >= 0.9.0` is required to use summary quantiles with +-- age buckets. `metrics >= 0.5.0, < 0.9.0` is unsupported +-- due to quantile overflow bug +-- (https://github.com/tarantool/metrics/issues/235). +-- `metrics == 0.9.0` has bug that do not permits +-- to create summary collector without quantiles +-- (https://github.com/tarantool/metrics/issues/262). +-- In fact, user may use `metrics >= 0.5.0`, `metrics != 0.9.0` +-- if he wants to use metrics without quantiles, and `metrics >= 0.9.0` +-- if he wants to use metrics with quantiles. But this is confusing, +-- so we use a single restriction solving both cases. +-- +-- @function is_supported +-- +-- @treturn boolean Returns `true` if `metrics >= 0.10.0` found, `false` otherwise. +-- +function registry.is_supported() + if is_package == false then + return false + end + + -- Only metrics >= 0.10.0 supported. + if metrics.unregister_callback == nil then + return false + end + + return true +end + +--- Initialize collectors in global metrics registry +-- +-- Registries are not meant to used explicitly +-- by users, init is not guaranteed to be idempotent. +-- Destroy collectors only through this registry methods. +-- +-- @function init +-- +-- @tab opts +-- +-- @bool opts.quantiles +-- If `true`, computes latency as 0.99 quantile with aging. +-- +-- @treturn boolean Returns `true`. +-- +function registry.init(opts) + dev_checks({ quantiles = 'boolean' }) + + internal.opts = table.deepcopy(opts) + + local quantile_params = nil + local age_params = nil + if opts.quantiles == true then + quantile_params = DEFAULT_QUANTILES + age_params = DEFAULT_AGE_PARAMS + end + + internal.registry = {} + internal.registry[metric_name.stats] = metrics.summary( + metric_name.stats, + 'CRUD router calls statistics', + quantile_params, + age_params) + + internal.registry[metric_name.details.tuples_fetched] = metrics.counter( + metric_name.details.tuples_fetched, + 'Tuples fetched from CRUD storages during select/pairs') + + internal.registry[metric_name.details.tuples_lookup] = metrics.counter( + metric_name.details.tuples_lookup, + 'Tuples looked up on CRUD storages while collecting response during select/pairs') + + internal.registry[metric_name.details.map_reduces] = metrics.counter( + metric_name.details.map_reduces, + 'Map reduces planned during CRUD select/pairs') + + return true +end + +--- Unregister collectors in global metrics registry. +-- +-- Registries are not meant to used explicitly +-- by users, destroy is not guaranteed to be idempotent. +-- Destroy collectors only through this registry methods. +-- +-- @function destroy +-- +-- @treturn boolean Returns `true`. +-- +function registry.destroy() + for _, c in pairs(internal.registry) do + metrics.registry:unregister(c) + end + + internal.registry = nil + internal.opts = nil + + return true +end + +--- Compute `latency` field of an observation. +-- +-- If it is a `{ time = ..., count = ... }` observation, +-- compute latency as overall average and store it +-- inside observation object. +-- +-- @function compute_obs_latency +-- @local +-- +-- @tab obs +-- Objects from `registry_utils` +-- `stats.spaces[name][op][status]`. +-- If something like `details` collector +-- passed, do nothing. +-- +local function compute_obs_latency(obs) + if obs.count == nil or obs.time == nil then + return + end + + if obs.count == 0 then + obs.latency = 0 + else + obs.latency = obs.time / obs.count + end +end + +--- Compute `latency` field of each observation. +-- +-- If quantiles disabled, we need to compute +-- latency as overall average from `time` and +-- `count` values. +-- +-- @function compute_latencies +-- @local +-- +-- @tab stats +-- Object from registry_utils stats. +-- +local function compute_latencies(stats) + for _, space_stats in pairs(stats.spaces) do + for _, op_stats in pairs(space_stats) do + for _, obs in pairs(op_stats) do + compute_obs_latency(obs) + end + end + end +end + +--- Get copy of global metrics registry. +-- +-- Registries are not meant to used explicitly +-- by users, get is not guaranteed to work without init. +-- +-- @function get +-- +-- @string[opt] space_name +-- If specified, returns table with statistics +-- of operations on table, separated by operation type and +-- execution status. If there wasn't any requests for table, +-- returns `{}`. If not specified, returns table with statistics +-- about all existing spaces, count of calls to spaces +-- that wasn't found and count of schema reloads. +-- +-- @treturn table Returns copy of metrics registry. +function registry.get(space_name) + dev_checks('?string') + + local stats = { + spaces = {}, + } + + -- Fill operation basic statistics values. + for _, obs in ipairs(internal.registry[metric_name.stats]:collect()) do + local op = obs.label_pairs.operation + local status = obs.label_pairs.status + local name = obs.label_pairs.name + + if space_name ~= nil and name ~= space_name then + goto stats_continue + end + + registry_utils.init_collectors_if_required(stats.spaces, name, op) + local space_stats = stats.spaces[name] + + -- metric_name.stats presents only if quantiles enabled. + if obs.metric_name == metric_name.stats then + if obs.label_pairs.quantile == LATENCY_QUANTILE then + space_stats[op][status].latency = obs.value + end + elseif obs.metric_name == metric_name.stats_sum then + space_stats[op][status].time = obs.value + elseif obs.metric_name == metric_name.stats_count then + space_stats[op][status].count = obs.value + end + + :: stats_continue :: + end + + if not internal.opts.quantiles then + compute_latencies(stats) + end + + -- Fill select/pairs detail statistics values. + for stat_name, metric_name in pairs(metric_name.details) do + for _, obs in ipairs(internal.registry[metric_name]:collect()) do + local name = obs.label_pairs.name + local op = obs.label_pairs.operation + + if space_name ~= nil and name ~= space_name then + goto details_continue + end + + registry_utils.init_collectors_if_required(stats.spaces, name, op) + stats.spaces[name][op].details[stat_name] = obs.value + + :: details_continue :: + end + end + + if space_name ~= nil then + return stats.spaces[space_name] or {} + end + + return stats +end + +--- Increase requests count and update latency info. +-- +-- @function observe +-- +-- @string space_name +-- Name of space. +-- +-- @number latency +-- Time of call execution. +-- +-- @string op +-- Label of registry collectors. +-- Use `require('crud.stats').op` to pick one. +-- +-- @string success +-- `'ok'` if no errors on execution, `'error'` otherwise. +-- +-- @treturn boolean Returns `true`. +-- +function registry.observe(latency, space_name, op, status) + dev_checks('number', 'string', 'string', 'string') + + -- Use `operations` label to be consistent with `tnt_stats_op_*` labels. + -- Use `name` label to be consistent with `tnt_space_*` labels. + -- Use `status` label to be consistent with `tnt_vinyl_*` and HTTP metrics labels. + local label_pairs = { operation = op, name = space_name, status = status } + + internal.registry[metric_name.stats]:observe(latency, label_pairs) + + return true +end + +--- Increase statistics of storage select/pairs calls. +-- +-- @function observe_fetch +-- +-- @string space_name +-- Name of space. +-- +-- @number tuples_fetched +-- Count of tuples fetched during storage call. +-- +-- @number tuples_lookup +-- Count of tuples looked up on storages while collecting response. +-- +-- @treturn boolean Returns `true`. +-- +function registry.observe_fetch(tuples_fetched, tuples_lookup, space_name) + dev_checks('number', 'number', 'string') + + local label_pairs = { name = space_name, operation = op_module.SELECT } + + internal.registry[metric_name.details.tuples_fetched]:inc(tuples_fetched, label_pairs) + internal.registry[metric_name.details.tuples_lookup]:inc(tuples_lookup, label_pairs) + + return true +end + +--- Increase statistics of planned map reduces during select/pairs. +-- +-- @function observe_map_reduces +-- +-- @number count +-- Count of map reduces planned. +-- +-- @string space_name +-- Name of space. +-- +-- @treturn boolean Returns `true`. +-- +function registry.observe_map_reduces(count, space_name) + dev_checks('number', 'string') + + local label_pairs = { name = space_name, operation = op_module.SELECT } + internal.registry[metric_name.details.map_reduces]:inc(count, label_pairs) + + return true +end + +-- Workaround for https://github.com/tarantool/metrics/issues/334 . +-- This workaround does not prevent observations reset between role reloads, +-- but it fixes collector unlink from registry. Without this workaround, +-- we will continue to use cached collectors that are already cleaned up +-- from registry and changes will not appear in metrics export output. +local function workaround_role_reload() + if not registry.is_supported() then + return + end + + -- Check if this registry was enabled before reload. + if internal.registry == nil then + return + end + + -- Check if base collector is in metrics package registry. + -- If it's not, then registry has beed cleaned up on role reload. + if metrics.registry:find('summary', metric_name.stats) == nil then + registry.init(internal.opts) + end +end + +workaround_role_reload() + +return registry \ No newline at end of file diff --git a/crud/stats/operation.lua b/crud/stats/operation.lua new file mode 100644 index 00000000..a6a9627a --- /dev/null +++ b/crud/stats/operation.lua @@ -0,0 +1,23 @@ +-- It is not clear how to describe modules +-- with constants for ldoc. ldoc-styled description +-- for this module is available at `crud.stats.init`. +-- See https://github.com/lunarmodules/LDoc/issues/369 +-- for possible updates. +return { + -- INSERT identifies both `insert` and `insert_object`. + INSERT = 'insert', + GET = 'get', + -- REPLACE identifies both `replace` and `replace_object`. + REPLACE = 'replace', + UPDATE = 'update', + -- UPSERT identifies both `upsert` and `upsert_object`. + UPSERT = 'upsert', + DELETE = 'delete', + -- SELECT identifies both `pairs` and `select`. + SELECT = 'select', + TRUNCATE = 'truncate', + LEN = 'len', + COUNT = 'count', + -- BORDERS identifies both `min` and `max`. + BORDERS = 'borders', +} diff --git a/crud/stats/registry_utils.lua b/crud/stats/registry_utils.lua new file mode 100644 index 00000000..95654461 --- /dev/null +++ b/crud/stats/registry_utils.lua @@ -0,0 +1,76 @@ +---- Internal module used by statistics registries. +-- @module crud.stats.registry_utils +-- + +local dev_checks = require('crud.common.dev_checks') +local op_module = require('crud.stats.operation') + +local registry_utils = {} + +--- Build collectors for local registry. +-- +-- @function build_collectors +-- +-- @string op +-- Label of registry collectors. +-- Use `require('crud.stats').op` to pick one. +-- +-- @treturn table Returns collectors for success and error requests. +-- Collectors store 'count', 'latency' and 'time' values. Also +-- returns additional collectors for select operation. +-- +function registry_utils.build_collectors(op) + dev_checks('string') + + local collectors = { + ok = { + count = 0, + latency = 0, + time = 0, + }, + error = { + count = 0, + latency = 0, + time = 0, + }, + } + + if op == op_module.SELECT then + collectors.details = { + tuples_fetched = 0, + tuples_lookup = 0, + map_reduces = 0, + } + end + + return collectors +end + +--- Initialize all statistic collectors for a space operation. +-- +-- @function init_collectors_if_required +-- +-- @tab spaces +-- `spaces` section of registry. +-- +-- @string space_name +-- Name of space. +-- +-- @string op +-- Label of registry collectors. +-- Use `require('crud.stats').op` to pick one. +-- +function registry_utils.init_collectors_if_required(spaces, space_name, op) + dev_checks('table', 'string', 'string') + + if spaces[space_name] == nil then + spaces[space_name] = {} + end + + local space_collectors = spaces[space_name] + if space_collectors[op] == nil then + space_collectors[op] = registry_utils.build_collectors(op) + end +end + +return registry_utils diff --git a/deps.sh b/deps.sh index 87ce6b92..abff8f4b 100755 --- a/deps.sh +++ b/deps.sh @@ -4,7 +4,7 @@ set -e # Test dependencies: -tarantoolctl rocks install luatest 0.5.5 +tarantoolctl rocks install luatest 0.5.7 tarantoolctl rocks install luacheck 0.25.0 tarantoolctl rocks install luacov 0.13.0 diff --git a/test/entrypoint/srv_ddl.lua b/test/entrypoint/srv_ddl.lua index 30f432b8..ab8fc5ce 100755 --- a/test/entrypoint/srv_ddl.lua +++ b/test/entrypoint/srv_ddl.lua @@ -102,6 +102,12 @@ package.preload['customers-storage'] = function() }, } + local customers_id_schema = table.deepcopy(customers_schema) + customers_id_schema.sharding_key = {'id'} + table.insert(customers_id_schema.indexes, primary_index_id) + table.insert(customers_id_schema.indexes, bucket_id_index) + table.insert(customers_id_schema.indexes, age_index) + local customers_name_key_schema = table.deepcopy(customers_schema) customers_name_key_schema.sharding_key = {'name'} table.insert(customers_name_key_schema.indexes, primary_index) @@ -157,6 +163,7 @@ package.preload['customers-storage'] = function() local schema = { spaces = { + customers = customers_id_schema, customers_name_key = customers_name_key_schema, customers_name_key_uniq_index = customers_name_key_uniq_index_schema, customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema, @@ -195,8 +202,13 @@ local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, { 'customers-storage', 'cartridge.roles.crud-router', 'cartridge.roles.crud-storage', - }, -}) + }}, + -- Increase readahead for performance tests. + -- Performance tests on HP ProBook 440 G5 16 Gb + -- bump into default readahead limit and thus not + -- give a full picture. + { readahead = 20 * 1024 * 1024 } +) if not ok then log.error('%s', err) diff --git a/test/entrypoint/srv_stats.lua b/test/entrypoint/srv_stats.lua new file mode 100755 index 00000000..b8bd813f --- /dev/null +++ b/test/entrypoint/srv_stats.lua @@ -0,0 +1,64 @@ +#!/usr/bin/env tarantool + +require('strict').on() +_G.is_initialized = function() return false end + +local log = require('log') +local errors = require('errors') +local cartridge = require('cartridge') + +package.preload['customers-storage'] = function() + local engine = os.getenv('ENGINE') or 'memtx' + return { + role_name = 'customers-storage', + init = function() + local customers_space = box.schema.space.create('customers', { + format = { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'last_name', type = 'string'}, + {name = 'age', type = 'number'}, + {name = 'city', type = 'string'}, + }, + if_not_exists = true, + engine = engine, + id = 542, + }) + -- primary index + customers_space:create_index('id_index', { + parts = { {field = 'id'} }, + if_not_exists = true, + }) + customers_space:create_index('bucket_id', { + parts = { {field = 'bucket_id'} }, + unique = false, + if_not_exists = true, + }) + customers_space:create_index('age_index', { + parts = { {field = 'age'} }, + unique = false, + if_not_exists = true, + }) + end, + } +end + +local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, { + advertise_uri = 'localhost:3301', + http_port = 8081, + bucket_count = 3000, + roles = { + 'cartridge.roles.crud-router', + 'cartridge.roles.crud-storage', + 'customers-storage', + }, + roles_reload_allowed = true, +}) + +if not ok then + log.error('%s', err) + os.exit(1) +end + +_G.is_initialized = cartridge.is_healthy diff --git a/test/helper.lua b/test/helper.lua index f2cdb6ab..f355efed 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -378,4 +378,102 @@ function helpers.get_sharding_func_cache_size(cluster) ]]) end +function helpers.simple_functions_params() + return { + sleep_time = 0.01, + error = { err = 'err' }, + error_msg = 'throw me', + } +end + +function helpers.prepare_simple_functions(router) + local params = helpers.simple_functions_params() + + local _, err = router:eval([[ + local clock = require('clock') + local fiber = require('fiber') + + local params = ... + local sleep_time = params.sleep_time + local error_table = params.error + local error_msg = params.error_msg + + -- Using `fiber.sleep(time)` between two `clock.monotonic()` + -- may return diff less than `time`. + sleep_for = function(time) + local start = clock.monotonic() + while (clock.monotonic() - start) < time do + fiber.sleep(time / 10) + end + end + + return_true = function(space_name) + sleep_for(sleep_time) + return true + end + + return_err = function(space_name) + sleep_for(sleep_time) + return nil, error_table + end + + throws_error = function() + sleep_for(sleep_time) + error(error_msg) + end + ]], { params }) + + t.assert_equals(err, nil) +end + +function helpers.is_space_exist(router, space_name) + local res, err = router:eval([[ + local vshard = require('vshard') + local utils = require('crud.common.utils') + + local space, err = utils.get_space(..., vshard.router.routeall()) + if err ~= nil then + return nil, err + end + return space ~= nil + ]], { space_name }) + + t.assert_equals(err, nil) + return res +end + +function helpers.reload_package(srv) + srv.net_box:eval([[ + local function startswith(text, prefix) + return text:find(prefix, 1, true) == 1 + end + + for k, _ in pairs(package.loaded) do + if startswith(k, 'crud') then + package.loaded[k] = nil + end + end + + crud = require('crud') + ]]) +end + +function helpers.reload_roles(srv) + local ok, err = srv.net_box:eval([[ + return require('cartridge.roles').reload() + ]]) + + t.assert_equals({ok, err}, {true, nil}) +end + +function helpers.get_map_reduces_stat(router, space_name) + return router:eval([[ + local stats = require('crud').stats(...) + if stats.select == nil then + return 0 + end + return stats.select.details.map_reduces + ]], { space_name }) +end + return helpers diff --git a/test/helpers/storage_stat.lua b/test/helpers/storage_stat.lua deleted file mode 100644 index 2bb4dcd4..00000000 --- a/test/helpers/storage_stat.lua +++ /dev/null @@ -1,118 +0,0 @@ -local checks = require('checks') -local helpers = require('test.helper') - -local storage_stat = {} - --- Wrap crud's select_on_storage()/count_on_storage() --- function to count selects/counts and add storage_stat() --- function that returns resulting statistics. --- --- Call it after crud's initialization. -function storage_stat.init_on_storage(method_on_storage_name) - assert(_G._crud[method_on_storage_name] ~= nil) - - -- Here we count requests. - local storage_stat_table = { - requests = 0, - } - - -- Wrap method on storage function. - local requests_on_storage_saved = _G._crud[method_on_storage_name] - _G._crud[method_on_storage_name] = function(...) - local requests = storage_stat_table.requests - storage_stat_table.requests = requests + 1 - return requests_on_storage_saved(...) - end - - -- Accessor for the statistics. - rawset(_G, 'storage_stat', function() - return storage_stat_table - end) -end - -function storage_stat.init_on_storage_for_select() - storage_stat.init_on_storage('select_on_storage') -end - -function storage_stat.init_on_storage_for_count() - storage_stat.init_on_storage('count_on_storage') -end - --- Accumulate statistics from storages. --- --- The statistics is grouped by replicasets. --- --- Example of a return value: --- --- | { --- | ['s-1'] = { --- | select_requests = 1, --- | }, --- | ['s-2'] = { --- | select_requests = 0, --- | }, --- | } -function storage_stat.collect(cluster) - checks('table') - - local res = {} - - helpers.call_on_storages(cluster, function(server, replicaset) - checks('table', 'table') - - -- Collect the statistics. - local storage_stat = server.net_box:call('storage_stat') - - -- Initialize if needed. - if res[replicaset.alias] == nil then - res[replicaset.alias] = {} - end - - -- Accumulate the collected statistics. - for key, val in pairs(storage_stat) do - local old = res[replicaset.alias][key] or 0 - res[replicaset.alias][key] = old + val - end - end) - - return res -end - --- Difference between 'a' and 'b' storage statistics. --- --- The return value structure is the same as for --- storage_stat.collect(). -function storage_stat.diff(a, b) - checks('table', 'table') - - local diff = table.deepcopy(a) - - for replicaset_alias, stat_b in pairs(b) do - -- Initialize if needed. - if diff[replicaset_alias] == nil then - diff[replicaset_alias] = {} - end - - -- Substract 'b' statistics from 'a'. - for key, val in pairs(stat_b) do - local old = diff[replicaset_alias][key] or 0 - diff[replicaset_alias][key] = old - val - end - end - - return diff -end - --- Accepts collect (or diff) return value and returns --- total number of select/count requests across all storages. -function storage_stat.total(stats) - local total = 0 - - for _, stat in pairs(stats) do - total = total + (stat.requests or 0) - end - - return total -end - -return storage_stat diff --git a/test/integration/cfg_test.lua b/test/integration/cfg_test.lua new file mode 100644 index 00000000..81ae0e1a --- /dev/null +++ b/test/integration/cfg_test.lua @@ -0,0 +1,79 @@ +local fio = require('fio') + +local t = require('luatest') + +local stats = require('crud.stats') +local helpers = require('test.helper') + +local group = t.group('cfg') + +group.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_stats'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + }) + + g.cluster:start() +end) + +group.after_all(function(g) helpers.stop_cluster(g.cluster) end) + +group.test_defaults = function(g) + local cfg = g.cluster:server('router'):eval("return require('crud').cfg") + t.assert_equals(cfg, { + stats = false, + stats_driver = stats.get_default_driver(), + stats_quantiles = false, + }) +end + +group.test_change_value = function(g) + local new_cfg = g.cluster:server('router'):eval("return require('crud').cfg({ stats = true })") + t.assert_equals(new_cfg.stats, true) +end + +group.test_table_is_immutable = function(g) + local router = g.cluster:server('router') + + t.assert_error_msg_contains( + 'Use crud.cfg{} instead', + router.eval, router, + [[ + local cfg = require('crud').cfg() + cfg.stats = 'newvalue' + ]]) + + t.assert_error_msg_contains( + 'Use crud.cfg{} instead', + router.eval, router, + [[ + local cfg = require('crud').cfg() + cfg.newfield = 'newvalue' + ]]) +end + +group.test_package_reload_preserves_values = function(g) + local router = g.cluster:server('router') + + -- Generate some non-default values. + router:eval("return require('crud').cfg({ stats = true })") + + helpers.reload_package(router) + + local cfg = router:eval("return require('crud').cfg") + t.assert_equals(cfg.stats, true) +end + +group.test_role_reload_preserves_values = function(g) + local router = g.cluster:server('router') + + -- Generate some non-default values. + router:eval("return require('crud').cfg({ stats = true })") + + helpers.reload_roles(router) + + local cfg = router:eval("return require('crud').cfg") + t.assert_equals(cfg.stats, true) +end diff --git a/test/integration/count_test.lua b/test/integration/count_test.lua index 9fb62a10..008ea788 100644 --- a/test/integration/count_test.lua +++ b/test/integration/count_test.lua @@ -4,7 +4,6 @@ local clock = require('clock') local t = require('luatest') local helpers = require('test.helper') -local storage_stat = require('test.helpers.storage_stat') local pgroup = t.group('count', { {engine = 'memtx'}, @@ -24,12 +23,9 @@ pgroup.before_all(function(g) g.cluster:start() - helpers.call_on_storages(g.cluster, function(server) - server.net_box:eval([[ - local storage_stat = require('test.helpers.storage_stat') - storage_stat.init_on_storage_for_count() - ]]) - end) + g.cluster:server('router').net_box:eval([[ + require('crud').cfg{ stats = true } + ]]) end) pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end) @@ -583,7 +579,8 @@ pgroup.test_count_no_map_reduce = function(g) }, }) - local stat_a = storage_stat.collect(g.cluster) + local router = g.cluster:server('router').net_box + local map_reduces_before = helpers.get_map_reduces_stat(router, 'customers') -- Case: no conditions, just bucket id. local result, err = g.cluster.main_server.net_box:call('crud.count', { @@ -594,15 +591,9 @@ pgroup.test_count_no_map_reduce = function(g) t.assert_equals(err, nil) t.assert_equals(result, 1) - local stat_b = storage_stat.collect(g.cluster) - t.assert_equals(storage_stat.diff(stat_b, stat_a), { - ['s-1'] = { - requests = 1, - }, - ['s-2'] = { - requests = 0, - }, - }) + local map_reduces_after_1 = helpers.get_map_reduces_stat(router, 'customers') + local diff_1 = map_reduces_after_1 - map_reduces_before + t.assert_equals(diff_1, 0, 'Count request was not a map reduce') -- Case: EQ on secondary index, which is not in the sharding -- index (primary index in the case). @@ -614,15 +605,9 @@ pgroup.test_count_no_map_reduce = function(g) t.assert_equals(err, nil) t.assert_equals(result, 1) - local stat_c = storage_stat.collect(g.cluster) - t.assert_equals(storage_stat.diff(stat_c, stat_b), { - ['s-1'] = { - requests = 0, - }, - ['s-2'] = { - requests = 1, - }, - }) + local map_reduces_after_2 = helpers.get_map_reduces_stat(router, 'customers') + local diff_2 = map_reduces_after_2 - map_reduces_after_1 + t.assert_equals(diff_2, 0, 'Count request was not a map reduce') end pgroup.test_count_timeout = function(g) diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index 8a874ad0..28b2676f 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -3,7 +3,6 @@ local crud = require('crud') local t = require('luatest') local helpers = require('test.helper') -local storage_stat = require('test.helpers.storage_stat') local ok = pcall(require, 'ddl') if not ok then @@ -35,12 +34,9 @@ pgroup.before_all(function(g) t.assert_equals(type(result), 'table') t.assert_equals(err, nil) - helpers.call_on_storages(g.cluster, function(server) - server.net_box:eval([[ - local storage_stat = require('test.helpers.storage_stat') - storage_stat.init_on_storage_for_select() - ]]) - end) + g.cluster.main_server.net_box:eval([[ + require('crud').cfg{ stats = true } + ]]) end) pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end) @@ -367,22 +363,19 @@ for name, case in pairs(cases) do pgroup[('test_%s_wont_lead_to_map_reduce'):format(name)] = function(g) case.prepare_data(g, case.space_name) - local stat_a = storage_stat.collect(g.cluster) + local router = g.cluster:server('router').net_box + local map_reduces_before = helpers.get_map_reduces_stat(router, case.space_name) - local result, err = g.cluster.main_server.net_box:call('crud.select', { + local result, err = router:call('crud.select', { case.space_name, case.conditions }) t.assert_equals(err, nil) t.assert_not_equals(result, nil) t.assert_equals(#result.rows, 1) - local stat_b = storage_stat.collect(g.cluster) - - -- Check a number of select() requests made by CRUD on cluster's storages - -- after calling select() on a router. Make sure only a single storage has - -- a single select() request. Otherwise we lead to map-reduce. - local stats = storage_stat.diff(stat_b, stat_a) - t.assert_equals(storage_stat.total(stats), 1, 'Select request was not a map reduce') + local map_reduces_after = helpers.get_map_reduces_stat(router, case.space_name) + local diff = map_reduces_after - map_reduces_before + t.assert_equals(diff, 0, 'Select request was not a map reduce') end end @@ -390,22 +383,19 @@ pgroup.test_select_for_part_of_sharding_key_will_lead_to_map_reduce = function(g local space_name = 'customers_name_age_key_different_indexes' prepare_data_name_age_sharding_key(g, space_name) - local stat_a = storage_stat.collect(g.cluster) + local router = g.cluster:server('router').net_box + local map_reduces_before = helpers.get_map_reduces_stat(router, space_name) - local result, err = g.cluster.main_server.net_box:call('crud.select', { + local result, err = router:call('crud.select', { space_name, {{'==', 'age', 58}}, }) t.assert_equals(err, nil) t.assert_not_equals(result, nil) t.assert_equals(#result.rows, 1) - local stat_b = storage_stat.collect(g.cluster) - - -- Check a number of select() requests made by CRUD on cluster's storages - -- after calling select() on a router. Make sure it was a map-reduce - -- since we do not have sharding key values in conditions. - local stats = storage_stat.diff(stat_b, stat_a) - t.assert_equals(storage_stat.total(stats), 2, 'Select request was a map reduce') + local map_reduces_after = helpers.get_map_reduces_stat(router, space_name) + local diff = map_reduces_after - map_reduces_before + t.assert_equals(diff, 1, 'Select request was a map reduce') end pgroup.test_select_secondary_idx = function(g) @@ -706,6 +696,7 @@ pgroup.test_update_cache_with_incorrect_key = function(g) -- records for all spaces exist sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { + customers = {parts = {{fieldno = 1}}}, customers_G_func = {parts = {{fieldno = 1}}}, customers_body_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, @@ -732,6 +723,7 @@ pgroup.test_update_cache_with_incorrect_key = function(g) -- other records for correct spaces exist in cache sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { + customers = {parts = {{fieldno = 1}}}, customers_G_func = {parts = {{fieldno = 1}}}, customers_body_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, @@ -757,6 +749,7 @@ pgroup.test_update_cache_with_incorrect_key = function(g) -- other records for correct spaces exist in cache sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { + customers = {parts = {{fieldno = 1}}}, customers_G_func = {parts = {{fieldno = 1}}}, customers_body_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, diff --git a/test/integration/pairs_test.lua b/test/integration/pairs_test.lua index aacb052f..a0c90618 100644 --- a/test/integration/pairs_test.lua +++ b/test/integration/pairs_test.lua @@ -5,7 +5,6 @@ local t = require('luatest') local crud_utils = require('crud.common.utils') local helpers = require('test.helper') -local storage_stat = require('test.helpers.storage_stat') local pgroup = t.group('pairs', { {engine = 'memtx'}, @@ -27,12 +26,9 @@ pgroup.before_all(function(g) g.space_format = g.cluster.servers[2].net_box.space.customers:format() - helpers.call_on_storages(g.cluster, function(server) - server.net_box:eval([[ - local storage_stat = require('test.helpers.storage_stat') - storage_stat.init_on_storage_for_select() - ]]) - end) + g.cluster.main_server.net_box:eval([[ + require('crud').cfg{ stats = true } + ]]) end) pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end) @@ -842,10 +838,11 @@ pgroup.test_pairs_no_map_reduce = function(g) table.sort(customers, function(obj1, obj2) return obj1.id < obj2.id end) - local stat_a = storage_stat.collect(g.cluster) + local router = g.cluster:server('router').net_box + local map_reduces_before = helpers.get_map_reduces_stat(router, 'customers') -- Case: no conditions, just bucket id. - local rows = g.cluster.main_server.net_box:eval([[ + local rows = router:eval([[ local crud = require('crud') return crud.pairs(...):totable() @@ -858,15 +855,9 @@ pgroup.test_pairs_no_map_reduce = function(g) {3, 2804, 'David', 'Smith', 33, 'Los Angeles'}, }) - local stat_b = storage_stat.collect(g.cluster) - t.assert_equals(storage_stat.diff(stat_b, stat_a), { - ['s-1'] = { - requests = 1, - }, - ['s-2'] = { - requests = 0, - }, - }) + local map_reduces_after_1 = helpers.get_map_reduces_stat(router, 'customers') + local diff_1 = map_reduces_after_1 - map_reduces_before + t.assert_equals(diff_1, 0, 'Select request was not a map reduce') -- Case: EQ on secondary index, which is not in the sharding -- index (primary index in the case). @@ -883,13 +874,7 @@ pgroup.test_pairs_no_map_reduce = function(g) {4, 1161, 'William', 'White', 81, 'Chicago'}, }) - local stat_c = storage_stat.collect(g.cluster) - t.assert_equals(storage_stat.diff(stat_c, stat_b), { - ['s-1'] = { - requests = 0, - }, - ['s-2'] = { - requests = 1, - }, - }) + local map_reduces_after_2 = helpers.get_map_reduces_stat(router, 'customers') + local diff_2 = map_reduces_after_2 - map_reduces_after_1 + t.assert_equals(diff_2, 0, 'Select request was not a map reduce') end diff --git a/test/integration/reload_test.lua b/test/integration/reload_test.lua index c1f20c67..5d8b25fb 100644 --- a/test/integration/reload_test.lua +++ b/test/integration/reload_test.lua @@ -8,14 +8,6 @@ local g = t.group() local helpers = require('test.helper') -local function reload(srv) - local ok, err = srv.net_box:eval([[ - return require("cartridge.roles").reload() - ]]) - - t.assert_equals({ok, err}, {true, nil}) -end - g.before_all(function() g.cluster = helpers.Cluster:new({ datadir = fio.tempdir(), @@ -92,7 +84,7 @@ function g.test_router() t.assert_equals(last_insert[3], 'A', 'No workload for label A') end) - reload(g.router) + helpers.reload_roles(g.router) local cnt = #g.insertions_passed g.cluster:retrying({}, function() @@ -117,7 +109,7 @@ function g.test_storage() -- snapshot with a signal g.s1_master.process:kill('USR1') - reload(g.s1_master) + helpers.reload_roles(g.s1_master) g.cluster:retrying({}, function() g.s1_master.net_box:call('box.snapshot') diff --git a/test/integration/select_test.lua b/test/integration/select_test.lua index f42896f3..97eadf35 100644 --- a/test/integration/select_test.lua +++ b/test/integration/select_test.lua @@ -6,7 +6,6 @@ local crud = require('crud') local crud_utils = require('crud.common.utils') local helpers = require('test.helper') -local storage_stat = require('test.helpers.storage_stat') local pgroup = t.group('select', { {engine = 'memtx'}, @@ -28,12 +27,9 @@ pgroup.before_all(function(g) g.space_format = g.cluster.servers[2].net_box.space.customers:format() - helpers.call_on_storages(g.cluster, function(server) - server.net_box:eval([[ - local storage_stat = require('test.helpers.storage_stat') - storage_stat.init_on_storage_for_select() - ]]) - end) + g.cluster:server('router').net_box:eval([[ + require('crud').cfg{ stats = true } + ]]) end) pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end) @@ -1624,7 +1620,8 @@ pgroup.test_select_no_map_reduce = function(g) table.sort(customers, function(obj1, obj2) return obj1.id < obj2.id end) - local stat_a = storage_stat.collect(g.cluster) + local router = g.cluster:server('router').net_box + local map_reduces_before = helpers.get_map_reduces_stat(router, 'customers') -- Case: no conditions, just bucket id. local result, err = g.cluster.main_server.net_box:call('crud.select', { @@ -1637,15 +1634,9 @@ pgroup.test_select_no_map_reduce = function(g) {3, 2804, 'David', 'Smith', 33, 'Los Angeles'}, }) - local stat_b = storage_stat.collect(g.cluster) - t.assert_equals(storage_stat.diff(stat_b, stat_a), { - ['s-1'] = { - requests = 1, - }, - ['s-2'] = { - requests = 0, - }, - }) + local map_reduces_after_1 = helpers.get_map_reduces_stat(router, 'customers') + local diff_1 = map_reduces_after_1 - map_reduces_before + t.assert_equals(diff_1, 0, 'Select request was not a map reduce') -- Case: EQ on secondary index, which is not in the sharding -- index (primary index in the case). @@ -1659,13 +1650,7 @@ pgroup.test_select_no_map_reduce = function(g) {4, 1161, 'William', 'White', 81, 'Chicago'}, }) - local stat_c = storage_stat.collect(g.cluster) - t.assert_equals(storage_stat.diff(stat_c, stat_b), { - ['s-1'] = { - requests = 0, - }, - ['s-2'] = { - requests = 1, - }, - }) + local map_reduces_after_2 = helpers.get_map_reduces_stat(router, 'customers') + local diff_2 = map_reduces_after_2 - map_reduces_after_1 + t.assert_equals(diff_2, 0, 'Select request was not a map reduce') end diff --git a/test/integration/stats_test.lua b/test/integration/stats_test.lua new file mode 100644 index 00000000..f712c69d --- /dev/null +++ b/test/integration/stats_test.lua @@ -0,0 +1,1022 @@ +local fio = require('fio') +local clock = require('clock') +local t = require('luatest') + +local stats_registry_utils = require('crud.stats.registry_utils') + +local pgroup = t.group('stats_integration', { + { driver = 'local' }, + { driver = 'metrics', quantiles = false }, + { driver = 'metrics', quantiles = true }, +}) +local group_metrics = t.group('stats_metrics_integration', { + { driver = 'metrics', quantiles = false }, + { driver = 'metrics', quantiles = true }, +}) + +local helpers = require('test.helper') + +local space_id = 542 +local space_name = 'customers' +local non_existing_space_id = 100500 +local non_existing_space_name = 'non_existing_space' +local new_space_name = 'newspace' + +local function before_all(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_stats'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + }) + g.cluster:start() + g.router = g.cluster:server('router').net_box + + if g.params.driver == 'metrics' then + local is_metrics_supported = g.router:eval([[ + return require('crud.stats.metrics_registry').is_supported() + ]]) + t.skip_if(is_metrics_supported == false, 'Metrics registry is unsupported') + end +end + +local function after_all(g) + helpers.stop_cluster(g.cluster) +end + +local function get_stats(g, space_name) + return g.router:eval("return require('crud').stats(...)", { space_name }) +end + +local function enable_stats(g, params) + params = params or g.params + g.router:eval([[ + local params = ... + require('crud').cfg{ + stats = true, + stats_driver = params.driver, + stats_quantiles = params.quantiles + } + ]], { params }) +end + +local function disable_stats(g) + g.router:eval("require('crud').cfg{ stats = false }") +end + +local function before_each(g) + g.router:eval("crud = require('crud')") + enable_stats(g) + helpers.truncate_space_on_cluster(g.cluster, space_name) + helpers.drop_space_on_cluster(g.cluster, new_space_name) +end + +local function get_metrics(g) + return g.router:eval("return require('metrics').collect()") +end + +pgroup.before_all(before_all) + +pgroup.after_all(after_all) + +pgroup.before_each(before_each) + +pgroup.after_each(disable_stats) + + +group_metrics.before_all(before_all) + +group_metrics.after_all(after_all) + +group_metrics.before_each(before_each) + +group_metrics.after_each(disable_stats) + + +local function create_new_space(g) + helpers.call_on_storages(g.cluster, function(server) + server.net_box:eval([[ + local space_name = ... + if not box.cfg.read_only then + local sp = box.schema.space.create(space_name, { format = { + {name = 'id', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + }}) + + sp:create_index('pk', { + parts = { {field = 'id'} }, + }) + + sp:create_index('bucket_id', { + parts = { {field = 'bucket_id'} }, + unique = false, + }) + end + ]], { new_space_name }) + end) +end + +-- If there weren't any operations, space stats is {}. +-- To compute stats diff, this helper return real stats +-- if they're already present or default stats if +-- this operation of space hasn't been observed yet. +local function set_defaults_if_empty(space_stats, op) + if space_stats[op] ~= nil then + return space_stats[op] + else + return stats_registry_utils.build_collectors(op) + end +end + +local eval = { + pairs = [[ + local space_name = select(1, ...) + local conditions = select(2, ...) + + local result = {} + for _, v in crud.pairs(space_name, conditions, { batch_size = 1 }) do + table.insert(result, v) + end + + return result + ]], + + pairs_pcall = [[ + local space_name = select(1, ...) + local conditions = select(2, ...) + + local _, err = pcall(crud.pairs, space_name, conditions, { batch_size = 1 }) + + return nil, tostring(err) + ]], +} + +local simple_operation_cases = { + insert = { + func = 'crud.insert', + args = { + space_name, + { 12, box.NULL, 'Ivan', 'Ivanov', 20, 'Moscow' }, + }, + op = 'insert', + }, + insert_object = { + func = 'crud.insert_object', + args = { + space_name, + { id = 13, name = 'Ivan', last_name = 'Ivanov', age = 20, city = 'Moscow' }, + }, + op = 'insert', + }, + get = { + func = 'crud.get', + args = { space_name, { 12 } }, + op = 'get', + }, + select = { + func = 'crud.select', + args = { space_name, {{ '==', 'id_index', 3 }} }, + op = 'select', + }, + pairs = { + eval = eval.pairs, + args = { space_name, {{ '==', 'id_index', 3 }} }, + op = 'select', + }, + replace = { + func = 'crud.replace', + args = { + space_name, + { 12, box.NULL, 'Ivan', 'Ivanov', 20, 'Moscow' }, + }, + op = 'replace', + }, + replace_object = { + func = 'crud.replace_object', + args = { + space_name, + { id = 12, name = 'Ivan', last_name = 'Ivanov', age = 20, city = 'Moscow' }, + }, + op = 'replace', + }, + update = { + prepare = function(g) + helpers.insert_objects(g, space_name, {{ + id = 15, name = 'Ivan', last_name = 'Ivanov', + age = 20, city = 'Moscow' + }}) + end, + func = 'crud.update', + args = { space_name, 12, {{'+', 'age', 10}} }, + op = 'update', + }, + upsert = { + func = 'crud.upsert', + args = { + space_name, + { 16, box.NULL, 'Ivan', 'Ivanov', 20, 'Moscow' }, + {{'+', 'age', 1}}, + }, + op = 'upsert', + }, + upsert_object = { + func = 'crud.upsert_object', + args = { + space_name, + { id = 17, name = 'Ivan', last_name = 'Ivanov', age = 20, city = 'Moscow' }, + {{'+', 'age', 1}} + }, + op = 'upsert', + }, + delete = { + func = 'crud.delete', + args = { space_name, { 12 } }, + op = 'delete', + }, + truncate = { + func = 'crud.truncate', + args = { space_name }, + op = 'truncate', + }, + len = { + func = 'crud.len', + args = { space_name }, + op = 'len', + }, + count = { + func = 'crud.count', + args = { space_name, {{ '==', 'id_index', 3 }} }, + op = 'count', + }, + min = { + func = 'crud.min', + args = { space_name }, + op = 'borders', + }, + max = { + func = 'crud.max', + args = { space_name }, + op = 'borders', + }, + insert_error = { + func = 'crud.insert', + args = { space_name, { 'id' } }, + op = 'insert', + expect_error = true, + }, + insert_object_error = { + func = 'crud.insert_object', + args = { space_name, { 'id' } }, + op = 'insert', + expect_error = true, + }, + get_error = { + func = 'crud.get', + args = { space_name, { 'id' } }, + op = 'get', + expect_error = true, + }, + select_error = { + func = 'crud.select', + args = { space_name, {{ '==', 'id_index', 'sdf' }} }, + op = 'select', + expect_error = true, + }, + pairs_error = { + eval = eval.pairs, + args = { space_name, {{ '%=', 'id_index', 'sdf' }} }, + op = 'select', + expect_error = true, + pcall = true, + }, + replace_error = { + func = 'crud.replace', + args = { space_name, { 'id' } }, + op = 'replace', + expect_error = true, + }, + replace_object_error = { + func = 'crud.replace_object', + args = { space_name, { 'id' } }, + op = 'replace', + expect_error = true, + }, + update_error = { + func = 'crud.update', + args = { space_name, { 'id' }, {{'+', 'age', 1}} }, + op = 'update', + expect_error = true, + }, + upsert_error = { + func = 'crud.upsert', + args = { space_name, { 'id' }, {{'+', 'age', 1}} }, + op = 'upsert', + expect_error = true, + }, + upsert_object_error = { + func = 'crud.upsert_object', + args = { space_name, { 'id' }, {{'+', 'age', 1}} }, + op = 'upsert', + expect_error = true, + }, + delete_error = { + func = 'crud.delete', + args = { space_name, { 'id' } }, + op = 'delete', + expect_error = true, + }, + count_error = { + func = 'crud.count', + args = { space_name, {{ '==', 'id_index', 'sdf' }} }, + op = 'count', + expect_error = true, + }, + min_error = { + func = 'crud.min', + args = { space_name, 'badindex' }, + op = 'borders', + expect_error = true, + }, + max_error = { + func = 'crud.max', + args = { space_name, 'badindex' }, + op = 'borders', + expect_error = true, + }, +} + +local prepare_select_data = function(g) + helpers.insert_objects(g, space_name, { + -- Storage is s-2. + { + id = 1, name = "Elizabeth", last_name = "Jackson", + age = 12, city = "New York", + }, + -- Storage is s-2. + { + id = 2, name = "Mary", last_name = "Brown", + age = 46, city = "Los Angeles", + }, + -- Storage is s-1. + { + id = 3, name = "David", last_name = "Smith", + age = 33, city = "Los Angeles", + }, + -- Storage is s-2. + { + id = 4, name = "William", last_name = "White", + age = 81, city = "Chicago", + } + }) +end + +local select_cases = { + select_by_primary_index = { + func = 'crud.select', + conditions = {{ '==', 'id_index', 3 }}, + map_reduces = 0, + tuples_fetched = 1, + tuples_lookup = 1, + }, + select_by_secondary_index = { + func = 'crud.select', + conditions = {{ '==', 'age_index', 46 }}, + map_reduces = 1, + tuples_fetched = 1, + tuples_lookup = 1, + }, + select_full_scan = { + func = 'crud.select', + conditions = {{ '>', 'id_index', 0 }, { '==', 'city', 'Kyoto' }}, + map_reduces = 1, + tuples_fetched = 0, + tuples_lookup = 4, + }, + pairs_by_primary_index = { + eval = eval.pairs, + conditions = {{ '==', 'id_index', 3 }}, + map_reduces = 0, + tuples_fetched = 1, + -- Since batch_size == 1, extra lookup is generated with + -- after_tuple scroll for second batch. + tuples_lookup = 2, + }, + pairs_by_secondary_index = { + eval = eval.pairs, + conditions = {{ '==', 'age_index', 46 }}, + map_reduces = 1, + tuples_fetched = 1, + -- Since batch_size == 1, extra lookup is generated with + -- after_tuple scroll for second batch. + tuples_lookup = 2, + }, + pairs_full_scan = { + eval = eval.pairs, + conditions = {{ '>', 'id_index', 0 }, { '==', 'city', 'Kyoto' }}, + map_reduces = 1, + tuples_fetched = 0, + tuples_lookup = 4, + }, +} + +-- Generate non-null stats for all cases. +local function generate_stats(g) + for _, case in pairs(simple_operation_cases) do + if case.prepare ~= nil then + case.prepare(g) + end + + local _, err + if case.eval ~= nil then + if case.pcall then + _, err = pcall(g.router.eval, g.router, case.eval, case.args) + else + _, err = g.router:eval(case.eval, case.args) + end + else + _, err = g.router:call(case.func, case.args) + end + + if case.expect_error ~= true then + t.assert_equals(err, nil) + else + t.assert_not_equals(err, nil) + end + end + + -- Generate non-null select details. + prepare_select_data(g) + for _, case in pairs(select_cases) do + local _, err + if case.eval ~= nil then + _, err = g.router:eval(case.eval, { space_name, case.conditions }) + else + _, err = g.router:call(case.func, { space_name, case.conditions }) + end + + t.assert_equals(err, nil) + end +end + + +-- Call some operations for existing +-- spaces and ensure statistics is updated. +for name, case in pairs(simple_operation_cases) do + local test_name = ('test_%s'):format(name) + + if case.prepare ~= nil then + pgroup.before_test(test_name, case.prepare) + end + + pgroup[test_name] = function(g) + -- Collect stats before call. + local stats_before = get_stats(g, space_name) + t.assert_type(stats_before, 'table') + + -- Call operation. + local before_start = clock.monotonic() + + local _, err + if case.eval ~= nil then + if case.pcall then + _, err = pcall(g.router.eval, g.router, case.eval, case.args) + else + _, err = g.router:eval(case.eval, case.args) + end + else + _, err = g.router:call(case.func, case.args) + end + + local after_finish = clock.monotonic() + + if case.expect_error ~= true then + t.assert_equals(err, nil) + else + t.assert_not_equals(err, nil) + end + + -- Collect stats after call. + local stats_after = get_stats(g, space_name) + t.assert_type(stats_after, 'table') + t.assert_not_equals(stats_after[case.op], nil) + + -- Expecting 'ok' metrics to change on `expect_error == false` + -- or 'error' to change otherwise. + local changed, unchanged + if case.expect_error == true then + changed = 'error' + unchanged = 'ok' + else + unchanged = 'error' + changed = 'ok' + end + + local op_before = set_defaults_if_empty(stats_before, case.op) + local changed_before = op_before[changed] + local op_after = set_defaults_if_empty(stats_after, case.op) + local changed_after = op_after[changed] + + t.assert_equals(changed_after.count - changed_before.count, 1, + 'Expected count incremented') + + local ok_latency_max = math.max(changed_before.latency, after_finish - before_start) + + t.assert_gt(changed_after.latency, 0, + 'Changed latency has appropriate value') + t.assert_le(changed_after.latency, ok_latency_max, + 'Changed latency has appropriate value') + + local time_diff = changed_after.time - changed_before.time + + t.assert_gt(time_diff, 0, 'Total time increase has appropriate value') + t.assert_le(time_diff, after_finish - before_start, + 'Total time increase has appropriate value') + + local unchanged_before = op_before[unchanged] + local unchanged_after = stats_after[case.op][unchanged] + + t.assert_equals(unchanged_before, unchanged_after, 'Other stats remained the same') + end +end + + +-- Call some operation on non-existing +-- space and ensure statistics are updated. +pgroup.before_test('test_non_existing_space', function(g) + t.assert_equals( + helpers.is_space_exist(g.router, non_existing_space_name), + false, + ('Space %s does not exist'):format(non_existing_space_name) + ) +end) + +pgroup.test_non_existing_space = function(g) + local op = 'get' + + -- Collect stats before call. + local stats_before = get_stats(g, non_existing_space_name) + t.assert_type(stats_before, 'table') + local op_before = set_defaults_if_empty(stats_before, op) + + -- Call operation. + local _, err = g.router:call('crud.get', { non_existing_space_name, { 1 } }) + t.assert_not_equals(err, nil) + + -- Collect stats after call. + local stats_after = get_stats(g, non_existing_space_name) + t.assert_type(stats_after, 'table') + local op_after = stats_after[op] + t.assert_type(op_after, 'table', 'Section has been created if not existed') + + t.assert_equals(op_after.error.count - op_before.error.count, 1, + 'Error count for non-existing space incremented') +end + + +for name, case in pairs(select_cases) do + local test_name = ('test_%s_details'):format(name) + + pgroup.before_test(test_name, prepare_select_data) + + pgroup[test_name] = function(g) + local op = 'select' + local space_name = space_name + + -- Collect stats before call. + local stats_before = get_stats(g, space_name) + t.assert_type(stats_before, 'table') + + -- Call operation. + local _, err + if case.eval ~= nil then + _, err = g.router:eval(case.eval, { space_name, case.conditions }) + else + _, err = g.router:call(case.func, { space_name, case.conditions }) + end + + t.assert_equals(err, nil) + + -- Collect stats after call. + local stats_after = get_stats(g, space_name) + t.assert_type(stats_after, 'table') + + local op_before = set_defaults_if_empty(stats_before, op) + local details_before = op_before.details + local op_after = set_defaults_if_empty(stats_after, op) + local details_after = op_after.details + + local tuples_fetched_diff = details_after.tuples_fetched - details_before.tuples_fetched + t.assert_equals(tuples_fetched_diff, case.tuples_fetched, + 'Expected count of tuples fetched') + + local tuples_lookup_diff = details_after.tuples_lookup - details_before.tuples_lookup + t.assert_equals(tuples_lookup_diff, case.tuples_lookup, + 'Expected count of tuples looked up on storage') + + local map_reduces_diff = details_after.map_reduces - details_before.map_reduces + t.assert_equals(map_reduces_diff, case.map_reduces, + 'Expected count of map reduces planned') + end +end + + +pgroup.test_resolve_name_from_id = function(g) + local op = 'len' + g.router:call('crud.len', { space_id }) + + local stats = get_stats(g, space_name) + t.assert_not_equals(stats[op], nil, "Statistics is filled by name") +end + + +pgroup.test_resolve_nonexisting_space_from_id = function(g) + local op = 'len' + g.router:call('crud.len', { non_existing_space_id }) + + local stats = get_stats(g, tostring(non_existing_space_id)) + t.assert_not_equals(stats[op], nil, "Statistics is filled by id as string") +end + + +pgroup.before_test( + 'test_role_reload_do_not_reset_observations', + generate_stats) + +pgroup.test_role_reload_do_not_reset_observations = function(g) + t.xfail_if(g.params.driver == 'metrics', + 'See https://github.com/tarantool/metrics/issues/334') + + local stats_before = get_stats(g) + + helpers.reload_roles(g.cluster:server('router')) + + local stats_after = get_stats(g) + t.assert_equals(stats_after, stats_before) +end + + +pgroup.before_test( + 'test_module_reload_do_not_reset_observations', + generate_stats) + +pgroup.test_module_reload_do_not_reset_observations = function(g) + local stats_before = get_stats(g) + + helpers.reload_package(g.cluster:server('router')) + + local stats_after = get_stats(g) + t.assert_equals(stats_after, stats_before) +end + + +pgroup.test_spaces_created_in_runtime_supported_with_stats = function(g) + local op = 'insert' + local stats_before = get_stats(g, new_space_name) + local op_before = set_defaults_if_empty(stats_before, op) + + create_new_space(g) + + local _, err = g.router:call('crud.insert', { new_space_name, { 1, box.NULL }}) + t.assert_equals(err, nil) + + local stats_after = get_stats(g, new_space_name) + local op_after = stats_after[op] + t.assert_type(op_after, 'table', "'insert' stats found for new space") + t.assert_type(op_after.ok, 'table', "success 'insert' stats found for new space") + t.assert_equals(op_after.ok.count - op_before.ok.count, 1, + "Success requests count incremented for new space") +end + + +pgroup.before_test( + 'test_spaces_dropped_in_runtime_supported_with_stats', + function(g) + create_new_space(g) + + local _, err = g.router:call('crud.insert', { new_space_name, { 1, box.NULL }}) + t.assert_equals(err, nil) + end) + +pgroup.test_spaces_dropped_in_runtime_supported_with_stats = function(g) + local op = 'insert' + local stats_before = get_stats(g, new_space_name) + local op_before = set_defaults_if_empty(stats_before, op) + t.assert_type(op_before, 'table', "'insert' stats found for new space") + + helpers.drop_space_on_cluster(g.cluster, new_space_name) + + local _, err = g.router:call('crud.insert', { new_space_name, { 2, box.NULL }}) + t.assert_not_equals(err, nil, "Should trigger 'space not found' error") + + local stats_after = get_stats(g, new_space_name) + local op_after = stats_after[op] + t.assert_type(op_after, 'table', "'insert' stats found for dropped new space") + t.assert_type(op_after.error, 'table', "error 'insert' stats found for dropped new space") + t.assert_equals(op_after.error.count - op_before.error.count, 1, + "Error requests count incremented since space was known to registry before drop") +end + +-- https://github.com/tarantool/metrics/blob/fc5a67072340b12f983f09b7d383aca9e2f10cf1/test/utils.lua#L22-L31 +local function find_obs(metric_name, label_pairs, observations) + for _, obs in pairs(observations) do + local same_label_pairs = pcall(t.assert_equals, obs.label_pairs, label_pairs) + if obs.metric_name == metric_name and same_label_pairs then + return obs + end + end + + return { value = 0 } +end + +-- https://github.com/tarantool/metrics/blob/fc5a67072340b12f983f09b7d383aca9e2f10cf1/test/utils.lua#L55-L63 +local function find_metric(metric_name, metrics_data) + local m = {} + for _, v in ipairs(metrics_data) do + if v.metric_name == metric_name then + table.insert(m, v) + end + end + return #m > 0 and m or nil +end + +local function get_unique_label_values(metrics_data, label_key) + local label_values_map = {} + for _, v in ipairs(metrics_data) do + local label_pairs = v.label_pairs or {} + if label_pairs[label_key] ~= nil then + label_values_map[label_pairs[label_key]] = true + end + end + + local label_values = {} + for k, _ in pairs(label_values_map) do + table.insert(label_values, k) + end + + return label_values +end + +local function validate_metrics(g, metrics) + local quantile_stats + if g.params.quantiles == true then + quantile_stats = find_metric('tnt_crud_stats', metrics) + t.assert_type(quantile_stats, 'table', '`tnt_crud_stats` summary metrics found') + end + + local stats_count = find_metric('tnt_crud_stats_count', metrics) + t.assert_type(stats_count, 'table', '`tnt_crud_stats` summary metrics found') + + local stats_sum = find_metric('tnt_crud_stats_sum', metrics) + t.assert_type(stats_sum, 'table', '`tnt_crud_stats` summary metrics found') + + + local expected_operations = { 'insert', 'get', 'replace', 'update', + 'upsert', 'delete', 'select', 'truncate', 'len', 'count', 'borders' } + + if g.params.quantiles == true then + t.assert_items_equals(get_unique_label_values(quantile_stats, 'operation'), expected_operations, + 'Metrics are labelled with operation') + end + + t.assert_items_equals(get_unique_label_values(stats_count, 'operation'), expected_operations, + 'Metrics are labelled with operation') + + t.assert_items_equals(get_unique_label_values(stats_sum, 'operation'), expected_operations, + 'Metrics are labelled with operation') + + + local expected_statuses = { 'ok', 'error' } + + if g.params.quantiles == true then + t.assert_items_equals( + get_unique_label_values(quantile_stats, 'status'), + expected_statuses, + 'Metrics are labelled with status') + end + + t.assert_items_equals(get_unique_label_values(stats_count, 'status'), expected_statuses, + 'Metrics are labelled with status') + + t.assert_items_equals(get_unique_label_values(stats_sum, 'status'), expected_statuses, + 'Metrics are labelled with status') + + + local expected_names = { space_name } + + if g.params.quantiles == true then + t.assert_items_equals( + get_unique_label_values(quantile_stats, 'name'), + expected_names, + 'Metrics are labelled with space name') + end + + t.assert_items_equals(get_unique_label_values(stats_count, 'name'), + expected_names, + 'Metrics are labelled with space name') + + t.assert_items_equals( + get_unique_label_values(stats_sum, 'name'), + expected_names, + 'Metrics are labelled with space name') + + if g.params.quantiles == true then + local expected_quantiles = { 0.99 } + t.assert_items_equals(get_unique_label_values(quantile_stats, 'quantile'), expected_quantiles, + 'Quantile metrics presents') + end + + + local tuples_fetched = find_metric('tnt_crud_tuples_fetched', metrics) + t.assert_type(tuples_fetched, 'table', '`tnt_crud_tuples_fetched` metrics found') + + t.assert_items_equals(get_unique_label_values(tuples_fetched, 'operation'), { 'select' }, + 'Metrics are labelled with operation') + + t.assert_items_equals(get_unique_label_values(tuples_fetched, 'name'), expected_names, + 'Metrics are labelled with space name') + + + local tuples_lookup = find_metric('tnt_crud_tuples_lookup', metrics) + t.assert_type(tuples_lookup, 'table', '`tnt_crud_tuples_lookup` metrics found') + + t.assert_items_equals(get_unique_label_values(tuples_lookup, 'operation'), { 'select' }, + 'Metrics are labelled with operation') + + t.assert_items_equals(get_unique_label_values(tuples_lookup, 'name'), expected_names, + 'Metrics are labelled with space name') + + + local map_reduces = find_metric('tnt_crud_map_reduces', metrics) + t.assert_type(map_reduces, 'table', '`tnt_crud_map_reduces` metrics found') + + t.assert_items_equals(get_unique_label_values(map_reduces, 'operation'), { 'select' }, + 'Metrics are labelled with operation') + + t.assert_items_equals(get_unique_label_values(map_reduces, 'name'), expected_names, + 'Metrics are labelled with space name') +end + +local function check_updated_per_call(g) + local metrics_before = get_metrics(g) + local stats_labels = { operation = 'select', status = 'ok', name = space_name } + local details_labels = { operation = 'select', name = space_name } + + local count_before = find_obs('tnt_crud_stats_count', stats_labels, metrics_before) + local time_before = find_obs('tnt_crud_stats_sum', stats_labels, metrics_before) + local tuples_lookup_before = find_obs('tnt_crud_tuples_lookup', details_labels, metrics_before) + local tuples_fetched_before = find_obs('tnt_crud_tuples_fetched', details_labels, metrics_before) + local map_reduces_before = find_obs('tnt_crud_map_reduces', details_labels, metrics_before) + + local case = select_cases['select_by_secondary_index'] + local _, err = g.router:call(case.func, { space_name, case.conditions }) + t.assert_equals(err, nil) + + local metrics_after = get_metrics(g) + local count_after = find_obs('tnt_crud_stats_count', stats_labels, metrics_after) + local time_after = find_obs('tnt_crud_stats_sum', stats_labels, metrics_after) + local tuples_lookup_after = find_obs('tnt_crud_tuples_lookup', details_labels, metrics_after) + local tuples_fetched_after = find_obs('tnt_crud_tuples_fetched', details_labels, metrics_after) + local map_reduces_after = find_obs('tnt_crud_map_reduces', details_labels, metrics_after) + + t.assert_equals(count_after.value - count_before.value, 1, + '`select` metrics count increased') + t.assert_ge(time_after.value - time_before.value, 0, + '`select` total time increased') + t.assert_ge(tuples_lookup_after.value - tuples_lookup_before.value, case.tuples_lookup, + '`select` tuples lookup expected change') + t.assert_ge(tuples_fetched_after.value - tuples_fetched_before.value, case.tuples_fetched, + '`select` tuples feched expected change') + t.assert_ge(map_reduces_after.value - map_reduces_before.value, case.tuples_lookup, + '`select` map reduces expected change') +end + + +group_metrics.before_test( + 'test_stats_stored_in_global_metrics_registry', + generate_stats) + +group_metrics.test_stats_stored_in_global_metrics_registry = function(g) + local metrics = get_metrics(g) + validate_metrics(g, metrics) +end + + +group_metrics.before_test('test_metrics_updated_per_call', generate_stats) + +group_metrics.test_metrics_updated_per_call = check_updated_per_call + + + +group_metrics.before_test( + 'test_metrics_collectors_destroyed_if_stats_disabled', + generate_stats) + +group_metrics.test_metrics_collectors_destroyed_if_stats_disabled = function(g) + disable_stats(g) + + local metrics = get_metrics(g) + + local stats = find_metric('tnt_crud_stats', metrics) + t.assert_equals(stats, nil, '`tnt_crud_stats` summary metrics not found') + + local stats_count = find_metric('tnt_crud_stats_count', metrics) + t.assert_equals(stats_count, nil, '`tnt_crud_stats` summary metrics not found') + + local stats_sum = find_metric('tnt_crud_stats_sum', metrics) + t.assert_equals(stats_sum, nil, '`tnt_crud_stats` summary metrics not found') + + local tuples_fetched = find_metric('tnt_crud_tuples_fetched', metrics) + t.assert_equals(tuples_fetched, nil, '`tnt_crud_tuples_fetched` metrics not found') + + local tuples_lookup = find_metric('tnt_crud_tuples_lookup', metrics) + t.assert_equals(tuples_lookup, nil, '`tnt_crud_tuples_lookup` metrics not found') + + local map_reduces = find_metric('tnt_crud_map_reduces', metrics) + t.assert_equals(map_reduces, nil, '`tnt_crud_map_reduces` metrics not found') +end + + +group_metrics.before_test( + 'test_stats_stored_in_metrics_registry_after_switch_to_metrics_driver', + disable_stats) + +group_metrics.test_stats_stored_in_metrics_registry_after_switch_to_metrics_driver = function(g) + enable_stats(g, { driver = 'local', quantiles = false }) + -- Switch to metrics driver. + enable_stats(g) + + generate_stats(g) + local metrics = get_metrics(g) + validate_metrics(g, metrics) +end + +group_metrics.before_test( + 'test_role_reload_do_not_reset_metrics_observations', + generate_stats) + +group_metrics.test_role_reload_do_not_reset_metrics_observations = function(g) + t.xfail('See https://github.com/tarantool/metrics/issues/334') + + helpers.reload_roles(g.cluster:server('router')) + g.router:eval("crud = require('crud')") + local metrics = get_metrics(g) + validate_metrics(g, metrics) +end + + +group_metrics.before_test( + 'test_module_reload_do_not_reset_metrics_observations', + generate_stats) + +group_metrics.test_module_reload_do_not_reset_metrics_observations = function(g) + g.router:eval([[ + local function startswith(text, prefix) + return text:find(prefix, 1, true) == 1 + end + + for k, _ in pairs(package.loaded) do + if startswith(k, 'crud') then + package.loaded[k] = nil + end + end + + crud = require('crud') + ]]) + + local metrics = get_metrics(g) + validate_metrics(g, metrics) +end + + +group_metrics.before_test( + 'test_stats_changed_in_metrics_registry_after_role_reload', + prepare_select_data) + +group_metrics.test_stats_changed_in_metrics_registry_after_role_reload = function(g) + helpers.reload_roles(g.cluster:server('router')) + g.router:eval("crud = require('crud')") + check_updated_per_call(g) +end + + +group_metrics.before_test( + 'test_stats_changed_in_metrics_registry_after_module_reload', + prepare_select_data) + +group_metrics.test_stats_changed_in_metrics_registry_after_module_reload = function(g) + g.router:eval([[ + local function startswith(text, prefix) + return text:find(prefix, 1, true) == 1 + end + + for k, _ in pairs(package.loaded) do + if startswith(k, 'crud') then + package.loaded[k] = nil + end + end + + crud = require('crud') + ]]) + + check_updated_per_call(g) +end diff --git a/test/performance/perf_test.lua b/test/performance/perf_test.lua new file mode 100644 index 00000000..4d161951 --- /dev/null +++ b/test/performance/perf_test.lua @@ -0,0 +1,531 @@ +local fio = require('fio') +local clock = require('clock') +local fiber = require('fiber') +local errors = require('errors') +local net_box = require('net.box') +local log = require('log') + +local t = require('luatest') +local g = t.group('perf') + +local helpers = require('test.helper') + + +local id = 0 +local function gen() + id = id + 1 + return id +end + +local function reset_gen() + id = 0 +end + +g.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_ddl'), + use_vshard = true, + replicasets = { + { + uuid = helpers.uuid('a'), + alias = 'router', + roles = { 'crud-router' }, + servers = { + { instance_uuid = helpers.uuid('a', 1), alias = 'router' }, + }, + }, + { + uuid = helpers.uuid('b'), + alias = 's-1', + roles = { 'customers-storage', 'crud-storage' }, + servers = { + { instance_uuid = helpers.uuid('b', 1), alias = 's1-master' }, + { instance_uuid = helpers.uuid('b', 2), alias = 's1-replica' }, + }, + }, + { + uuid = helpers.uuid('c'), + alias = 's-2', + roles = { 'customers-storage', 'crud-storage' }, + servers = { + { instance_uuid = helpers.uuid('c', 1), alias = 's2-master' }, + { instance_uuid = helpers.uuid('c', 2), alias = 's2-replica' }, + }, + }, + { + uuid = helpers.uuid('d'), + alias = 's-2', + roles = { 'customers-storage', 'crud-storage' }, + servers = { + { instance_uuid = helpers.uuid('d', 1), alias = 's3-master' }, + { instance_uuid = helpers.uuid('d', 2), alias = 's3-replica' }, + }, + } + }, + }) + g.cluster:start() + + g.router = g.cluster:server('router').net_box + + g.router:eval([[ + rawset(_G, 'crud', require('crud')) + ]]) + + -- Run real perf tests only with flag, otherwise run short version + -- to test compatibility as part of unit/integration test run. + g.perf_mode_on = os.getenv('PERF_MODE_ON') + + g.total_report = {} +end) + +g.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers') + reset_gen() +end) + +local function normalize(s, n) + if type(s) == 'number' then + s = ('%.2f'):format(s) + end + + local len = s:len() + if len > n then + return s:sub(1, n) + end + + return (' '):rep(n - len) .. s +end + +local row_name = { + insert = 'insert', + select_pk = 'select by pk', + select_gt_pk = 'select gt by pk (limit 10)', + pairs_gt = 'pairs gt by pk (limit 100)', +} + +local column_name = { + without_stats_wrapper = 'without stats wrapper', + stats_disabled = 'stats disabled', + local_stats = 'local stats', + metrics_stats = 'metrics stats (no quantiles)', + metrics_quantile_stats = 'metrics stats (with quantiles)', +} + +local function visualize_section(total_report, name, comment, section, params) + local report_str = ('== %s ==\n(%s)\n\n'):format(name, comment or '') + + local normalized_row_header = normalize('', params.row_header_width) + local headers = '| ' .. normalized_row_header .. ' |' + local after_headers = '| ' .. ('-'):rep(normalized_row_header:len()) .. ' |' + + for _, column in ipairs(params.columns) do + local normalized_column_header = normalize(column, params.col_width[column]) + headers = headers .. ' ' .. normalized_column_header .. ' |' + after_headers = after_headers .. ' ' .. ('-'):rep(normalized_column_header:len()) .. ' |' + end + + report_str = report_str .. headers .. '\n' + report_str = report_str .. after_headers .. '\n' + + for _, row in ipairs(params.rows) do + local row_str = '| ' .. normalize(row, params.row_header_width) .. ' |' + + for _, column in ipairs(params.columns) do + local report = total_report[row][column] + + local report_str + if report ~= nil then + report_str = report.str[section] + else + report_str = 'unknown' + end + + row_str = row_str .. ' ' .. normalize(report_str, params.col_width[column]) .. ' |' + end + + report_str = report_str .. row_str .. '\n' + end + + report_str = report_str .. '\n\n\n' + + return report_str +end + +local function visualize_report(report) + local params = {} + + params.col_width = 2 + for _, name in pairs(column_name) do + params.col_width = math.max(name:len() + 2, params.col_width) + end + + params.row_header_width = 30 + + -- Set columns and rows explicitly to preserve custom order. + params.columns = { + column_name.without_stats_wrapper, + column_name.stats_disabled, + column_name.local_stats, + column_name.metrics_stats, + column_name.metrics_quantile_stats, + } + + params.rows = { + row_name.select_pk, + row_name.select_gt_pk, + row_name.pairs_gt, + row_name.insert, + } + + params.row_header_width = 1 + for _, name in pairs(row_name) do + params.row_header_width = math.max(name:len(), params.row_header_width) + end + + local min_col_width = 12 + params.col_width = {} + for _, name in ipairs(params.columns) do + params.col_width[name] = math.max(name:len(), min_col_width) + end + + local report_str = '\n==== PERFORMANCE REPORT ====\n\n\n' + + report_str = report_str .. visualize_section(report, 'SUCCESS REQUESTS', + 'The higher the better', 'success_count', params) + report_str = report_str .. visualize_section(report, 'SUCCESS REQUESTS PER SECOND', + 'The higher the better', 'success_rps', params) + report_str = report_str .. visualize_section(report, 'ERRORS', + 'Bad if higher than zero', 'error_count', params) + report_str = report_str .. visualize_section(report, 'AVERAGE CALL TIME', + 'The lower the better', 'average_time', params) + report_str = report_str .. visualize_section(report, 'MAX CALL TIME', + 'The lower the better', 'max_time', params) + + log.info(report_str) +end + +g.after_each(function(g) + g.router:call('crud.cfg', {{ stats = false }}) +end) + +g.after_all(function(g) + g.cluster:stop() + fio.rmtree(g.cluster.datadir) + + visualize_report(g.total_report) +end) + +local function generate_customer() + return { gen(), box.NULL, 'David Smith', 33 } +end + +local select_prepare = function(g) + local count + if g.perf_mode_on then + count = 10100 + else + count = 100 + end + + for _ = 1, count do + g.router:call('crud.insert', { 'customers', generate_customer() }) + end + reset_gen() +end + +local insert_params = function() + return { 'customers', generate_customer() } +end + +local select_params_pk_eq = function() + return { 'customers', {{'==', 'id', gen() % 10000}} } +end + +local select_params_pk_gt = function() + return { 'customers', {{'>', 'id', gen() % 10000}}, { first = 10 } } +end + +local pairs_params_pk_gt = function() + return { 'customers', {{'>', 'id', gen() % 10000}}, { first = 100, batch_size = 50 } } +end + +local stats_cases = { + stats_disabled = { + column_name = column_name.stats_disabled, + }, + local_stats = { + prepare = function(g) + g.router:call('crud.cfg', {{ stats = true, stats_driver = 'local', stats_quantiles = false }}) + end, + column_name = column_name.local_stats, + }, + metrics_stats = { + prepare = function(g) + local is_metrics_supported = g.router:eval([[ + return require('crud.stats.metrics_registry').is_supported() + ]]) + t.skip_if(is_metrics_supported == false, 'Metrics registry is unsupported') + g.router:call('crud.cfg', {{ stats = true, stats_driver = 'metrics', stats_quantiles = false }}) + end, + column_name = column_name.metrics_stats, + }, + metrics_quantile_stats = { + prepare = function(g) + local is_metrics_supported = g.router:eval([[ + return require('crud.stats.metrics_registry').is_supported() + ]]) + t.skip_if(is_metrics_supported == false, 'Metrics registry is unsupported') + g.router:call('crud.cfg', {{ stats = true, stats_driver = 'metrics', stats_quantiles = true }}) + end, + column_name = column_name.metrics_quantile_stats, + }, +} + +local integration_params = { + timeout = 2, + fiber_count = 5, + connection_count = 2, +} + +local pairs_integration = { + timeout = 5, + fiber_count = 1, + connection_count = 1, +} + +local insert_perf = { + timeout = 30, + fiber_count = 600, + connection_count = 10, +} + +-- Higher load may lead to net_msg_max limit break. +local select_perf = { + timeout = 30, + fiber_count = 200, + connection_count = 10, +} + +local pairs_perf = { + timeout = 30, + fiber_count = 100, + connection_count = 10, +} + +local cases = { + crud_insert = { + call = 'crud.insert', + params = insert_params, + matrix = stats_cases, + integration_params = integration_params, + perf_params = insert_perf, + row_name = row_name.insert, + }, + + crud_insert_without_stats_wrapper = { + prepare = function(g) + g.router:eval([[ + rawset(_G, '_plain_insert', require('crud.insert').tuple) + ]]) + end, + call = '_plain_insert', + params = insert_params, + matrix = { [''] = { column_name = column_name.without_stats_wrapper } }, + integration_params = integration_params, + perf_params = insert_perf, + row_name = row_name.insert, + }, + + crud_select_pk_eq = { + prepare = select_prepare, + call = 'crud.select', + params = select_params_pk_eq, + matrix = stats_cases, + integration_params = integration_params, + perf_params = select_perf, + row_name = row_name.select_pk, + }, + + crud_select_without_stats_wrapper_pk_eq = { + prepare = function(g) + g.router:eval("_plain_select = require('crud.select').call") + select_prepare(g) + end, + call = '_plain_select', + params = select_params_pk_eq, + matrix = { [''] = { column_name = column_name.without_stats_wrapper } }, + integration_params = integration_params, + perf_params = select_perf, + row_name = row_name.select_pk, + }, + + crud_select_pk_gt = { + prepare = select_prepare, + call = 'crud.select', + params = select_params_pk_gt, + matrix = stats_cases, + integration_params = integration_params, + perf_params = select_perf, + row_name = row_name.select_gt_pk, + }, + + crud_select_without_stats_wrapper_pk_gt = { + prepare = function(g) + g.router:eval([[ + rawset(_G, '_plain_select', require('crud.select').call) + ]]) + select_prepare(g) + end, + call = '_plain_select', + params = select_params_pk_gt, + matrix = { [''] = { column_name = column_name.without_stats_wrapper } }, + integration_params = integration_params, + perf_params = select_perf, + row_name = row_name.select_gt_pk, + }, + + crud_pairs_gt = { + prepare = function(g) + g.router:eval([[ + _run_pairs = function(...) + local t = {} + for _, tuple in require('crud').pairs(...) do + table.insert(t, tuple) + end + end + ]]) + select_prepare(g) + end, + call = '_run_pairs', + params = pairs_params_pk_gt, + matrix = stats_cases, + integration_params = pairs_integration, + perf_params = pairs_perf, + row_name = row_name.pairs_gt, + }, + + crud_pairs_without_stats_wrapper_pk_gt = { + prepare = function(g) + g.router:eval([[ + _run_pairs = function(...) + local t = {} + for _, tuple in require('crud.select').pairs(...) do + table.insert(t, tuple) + end + end + ]]) + select_prepare(g) + end, + call = '_run_pairs', + params = pairs_params_pk_gt, + matrix = { [''] = { column_name = column_name.without_stats_wrapper } }, + integration_params = pairs_integration, + perf_params = pairs_perf, + row_name = row_name.pairs_gt, + }, +} + +local function generator_f(conn, call, params, report, timeout) + local start = clock.monotonic() + + while (clock.monotonic() - start) < timeout do + local call_start = clock.monotonic() + local ok, res, err = pcall(conn.call, conn, call, params()) + local call_time = clock.monotonic() - call_start + + if not ok then + log.error(res) + table.insert(report.errors, res) + elseif err ~= nil then + errors.wrap(err) + log.error(err) + table.insert(report.errors, err) + else + report.count = report.count + 1 + end + + report.total_time = report.total_time + call_time + report.max_time = math.max(report.max_time, call_time) + end +end + +for name, case in pairs(cases) do + local matrix = case.matrix or { [''] = { { column_name = '' } } } + + for subname, subcase in pairs(matrix) do + local name_tail = '' + if subname ~= '' then + name_tail = ('_with_%s'):format(subname) + end + + local test_name = ('test_%s%s'):format(name, name_tail) + + g.before_test(test_name, function(g) + if case.prepare ~= nil then + case.prepare(g) + end + + if subcase.prepare ~= nil then + subcase.prepare(g) + end + end) + + g[test_name] = function(g) + local params + if g.perf_mode_on then + params = case.perf_params + else + params = case.integration_params + end + + local connections = {} + + local router = g.cluster:server('router') + for _ = 1, params.connection_count do + local c = net_box:connect(router.net_box_uri, router.net_box_credentials) + if c == nil then + t.fail('Failed to prepare connections') + end + table.insert(connections, c) + end + + local fibers = {} + local report = { errors = {}, count = 0, total_time = 0, max_time = 0 } + for id = 1, params.fiber_count do + local conn_id = id % params.connection_count + 1 + local conn = connections[conn_id] + local f = fiber.new(generator_f, conn, case.call, case.params, report, params.timeout) + f:set_joinable(true) + table.insert(fibers, f) + end + + local start_time = clock.monotonic() + for i = 1, params.fiber_count do + fibers[i]:join() + end + local run_time = clock.monotonic() - start_time + + report.str = { + success_count = ('%d'):format(report.count), + error_count = ('%d'):format(#report.errors), + success_rps = ('%.2f'):format(report.count / run_time), + max_time = ('%.3f ms'):format(report.max_time * 1e3), + } + + local total_count = report.count + #report.errors + if total_count > 0 then + report.str.average_time = ('%.3f ms'):format(report.total_time / total_count * 1e3) + else + report.str.average_time = 'unknown' + end + + g.total_report[case.row_name] = g.total_report[case.row_name] or {} + g.total_report[case.row_name][subcase.column_name] = report + + log.info('\n%s: %s success requests (rps %s), %s errors, call average time %s, call max time %s \n', + test_name, report.str.success_count, report.str.success_rps, report.str.error_count, + report.str.average_time, report.str.max_time) + end + end +end diff --git a/test/performance/select_perf_test.lua b/test/performance/select_perf_test.lua deleted file mode 100644 index 683e7708..00000000 --- a/test/performance/select_perf_test.lua +++ /dev/null @@ -1,167 +0,0 @@ -local fio = require('fio') -local fiber = require('fiber') -local errors = require('errors') -local net_box = require('net.box') -local log = require('log') - -local t = require('luatest') -local g = t.group('perf') - -local helpers = require('test.helper') - -g.before_all = function() - g.cluster = helpers.Cluster:new({ - datadir = fio.tempdir(), - server_command = helpers.entrypoint('srv_select'), - use_vshard = true, - replicasets = { - { - uuid = helpers.uuid('a'), - alias = 'router', - roles = { 'crud-router' }, - servers = { - { instance_uuid = helpers.uuid('a', 1), alias = 'router' }, - }, - }, - { - uuid = helpers.uuid('b'), - alias = 's-1', - roles = { 'customers-storage', 'crud-storage' }, - servers = { - { instance_uuid = helpers.uuid('b', 1), alias = 's1-master' }, - { instance_uuid = helpers.uuid('b', 2), alias = 's1-replica' }, - }, - }, - { - uuid = helpers.uuid('c'), - alias = 's-2', - roles = { 'customers-storage', 'crud-storage' }, - servers = { - { instance_uuid = helpers.uuid('c', 1), alias = 's2-master' }, - { instance_uuid = helpers.uuid('c', 2), alias = 's2-replica' }, - }, - }, - { - uuid = helpers.uuid('d'), - alias = 's-2', - roles = { 'customers-storage', 'crud-storage' }, - servers = { - { instance_uuid = helpers.uuid('d', 1), alias = 's3-master' }, - { instance_uuid = helpers.uuid('d', 2), alias = 's3-replica' }, - }, - } - }, - }) - g.cluster:start() -end - -g.after_all = function() - g.cluster:stop() - fio.rmtree(g.cluster.datadir) -end - -g.before_each(function() end) - -local function insert_customers(conn, id, count, timeout, report) - local customer = {id, box.NULL, 'David', 'Smith', 33, 'Los Angeles'} - local start = fiber.clock() - - while (fiber.clock() - start) < timeout do - local ok, res, err = pcall(conn.call, conn, [[package.loaded.crud.insert]], {'customers', customer}) - if not ok then - log.error('Insert error: %s', res) - table.insert(report.errors, res) - elseif err ~= nil then - errors.wrap(err) - log.error('Insert error: %s', err) - table.insert(report.errors, err) - else - report.count = report.count + 1 - end - customer[1] = customer[1] + count - end -end - -local function select_customers(conn, id, timeout, report) - local start = fiber.clock() - local ok, err = pcall(function() - while (fiber.clock() - start) < timeout do - local _, err = conn:call([[package.loaded.crud.select]], {'customers', {{'>', 'id', id}}, {first = 10}}) - if err ~= nil then - errors.wrap(err) - log.error(err) - table.insert(report.errors, err) - else - report.count = report.count + 1 - end - end - end) - if not ok then - table.insert(report.errors, err) - log.error(err) - end -end - -g.test_insert = function() - local timeout = 30 - local fiber_count = 600 - local connection_count = 10 - local connections = {} - - local server = g.cluster.main_server - server.net_box:eval([[require('crud')]]) - for _ = 1, connection_count do - local c = net_box:connect(server.net_box_uri, server.net_box_credentials) - assert(c) - table.insert(connections, c) - end - - local fibers = {} - local report = {errors = {}, count = 0} - for id = 1, fiber_count do - local conn_id = id % connection_count + 1 - local conn = connections[conn_id] - local f = fiber.new(insert_customers, conn, id, fiber_count, timeout, report) - f:set_joinable(true) - table.insert(fibers, f) - end - - for i = 1, fiber_count do - fibers[i]:join() - end - - log.error('\nINSERT: requests %d, rps %d, errors %d', - report.count, report.count / timeout, #report.errors) -end - -g.test_select = function() - local timeout = 30 - local fiber_count = 200 - local connection_count = 10 - local connections = {} - - local server = g.cluster.main_server - server.net_box:eval([[require('crud')]]) - for _ = 1, connection_count do - local c = net_box:connect(server.net_box_uri, server.net_box_credentials) - assert(c) - table.insert(connections, c) - end - - local fibers = {} - local report = {errors = {}, count = 0} - for id = 1, fiber_count do - local conn_id = id % connection_count + 1 - local conn = connections[conn_id] - local f = fiber.new(select_customers, conn, id, timeout, report) - f:set_joinable(true) - table.insert(fibers, f) - end - - for i = 1, fiber_count do - fibers[i]:join() - end - - log.error('\nSELECT: requests %d, rps %d, errors %d', - report.count, report.count / timeout, #report.errors) -end diff --git a/test/unit/select_executor_test.lua b/test/unit/select_executor_test.lua index ff09b9ca..da014875 100644 --- a/test/unit/select_executor_test.lua +++ b/test/unit/select_executor_test.lua @@ -105,7 +105,7 @@ g.test_one_condition_no_index = function() tarantool_iter = plan.tarantool_iter, scan_condition_num = plan.scan_condition_num, }) - t.assert_equals(get_ids(results), {2, 3}) + t.assert_equals(get_ids(results.tuples), {2, 3}) -- after tuple 2 local after_tuple = space:frommap(customers[2]):totable() @@ -115,7 +115,7 @@ g.test_one_condition_no_index = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {3}) + t.assert_equals(get_ids(results.tuples), {3}) -- after tuple 3 local after_tuple = space:frommap(customers[3]):totable() @@ -125,7 +125,7 @@ g.test_one_condition_no_index = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(#results, 0) + t.assert_equals(#results.tuples, 0) end g.test_one_condition_with_index = function() @@ -164,7 +164,7 @@ g.test_one_condition_with_index = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {3, 2, 4}) -- in age order + t.assert_equals(get_ids(results.tuples), {3, 2, 4}) -- in age order -- after tuple 3 local after_tuple = space:frommap(customers[3]):totable() @@ -174,7 +174,7 @@ g.test_one_condition_with_index = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {2, 4}) -- in age order + t.assert_equals(get_ids(results.tuples), {2, 4}) -- in age order end g.test_multiple_conditions = function() @@ -220,7 +220,7 @@ g.test_multiple_conditions = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {5, 2}) -- in age order + t.assert_equals(get_ids(results.tuples), {5, 2}) -- in age order -- after tuple 5 local after_tuple = space:frommap(customers[5]):totable() @@ -230,7 +230,7 @@ g.test_multiple_conditions = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {2}) + t.assert_equals(get_ids(results.tuples), {2}) end g.test_composite_index = function() @@ -271,7 +271,7 @@ g.test_composite_index = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {2, 1, 4}) -- in full_name order + t.assert_equals(get_ids(results.tuples), {2, 1, 4}) -- in full_name order -- after tuple 2 local after_tuple = space:frommap(customers[2]):totable() @@ -281,7 +281,7 @@ g.test_composite_index = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {1, 4}) + t.assert_equals(get_ids(results.tuples), {1, 4}) end g.test_get_by_id = function() @@ -319,7 +319,7 @@ g.test_get_by_id = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {2}) + t.assert_equals(get_ids(results.tuples), {2}) end g.test_early_exit = function() @@ -360,7 +360,7 @@ g.test_early_exit = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {4, 2}) + t.assert_equals(get_ids(results.tuples), {4, 2}) end g.test_select_all = function() @@ -397,7 +397,7 @@ g.test_select_all = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {1, 2, 3, 4}) + t.assert_equals(get_ids(results.tuples), {1, 2, 3, 4}) end g.test_limit = function() @@ -435,7 +435,7 @@ g.test_limit = function() tarantool_iter = plan.tarantool_iter, limit = 0, }) - t.assert_equals(#results, 0) + t.assert_equals(#results.tuples, 0) -- limit 2 local results = select_executor.execute(space, index, filter_func, { @@ -443,5 +443,5 @@ g.test_limit = function() tarantool_iter = plan.tarantool_iter, limit = 2, }) - t.assert_equals(get_ids(results), {1, 2}) + t.assert_equals(get_ids(results.tuples), {1, 2}) end diff --git a/test/unit/stats_test.lua b/test/unit/stats_test.lua new file mode 100644 index 00000000..d8e4b2ed --- /dev/null +++ b/test/unit/stats_test.lua @@ -0,0 +1,704 @@ +local clock = require('clock') +local fio = require('fio') +local fun = require('fun') +local t = require('luatest') + +local stats_module = require('crud.stats') + +local pgroup = t.group('stats_unit', { + { driver = 'local' }, + { driver = 'metrics', quantiles = false }, + { driver = 'metrics', quantiles = true }, +}) +local group_driver = t.group('stats_driver_unit') +local helpers = require('test.helper') + +local space_name = 'customers' + +local function before_all(g) + -- Enable test cluster for "is space exist?" checks. + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_stats'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + }) + g.cluster:start() + g.router = g.cluster:server('router').net_box + + helpers.prepare_simple_functions(g.router) + g.router:eval("stats_module = require('crud.stats')") + + g.is_metrics_supported = g.router:eval([[ + return require('crud.stats.metrics_registry').is_supported() + ]]) + + if g.params ~= nil and g.params.driver == 'metrics' then + t.skip_if(g.is_metrics_supported == false, 'Metrics registry is unsupported') + end +end + +local function after_all(g) + helpers.stop_cluster(g.cluster) +end + +local function get_stats(g, space_name) + return g.router:eval("return stats_module.get(...)", { space_name }) +end + +local function enable_stats(g, params) + params = params or g.params + g.router:eval("stats_module.enable(...)", { params }) +end + +local function disable_stats(g) + g.router:eval("stats_module.disable()") +end + +local function reset_stats(g) + g.router:eval("return stats_module.reset()") +end + +pgroup.before_all(before_all) + +pgroup.after_all(after_all) + +-- Reset statistics between tests, reenable if needed. +pgroup.before_each(enable_stats) + +pgroup.after_each(disable_stats) + + +group_driver.before_all(before_all) + +group_driver.after_all(after_all) + +group_driver.after_each(disable_stats) + +pgroup.test_get_format_after_enable = function(g) + local stats = get_stats(g) + + t.assert_type(stats, 'table') + t.assert_equals(stats.spaces, {}) +end + +pgroup.test_get_by_space_name_format_after_enable = function(g) + local stats = get_stats(g, space_name) + + t.assert_type(stats, 'table') + t.assert_equals(stats, {}) +end + +-- Test statistics values after wrapped functions call. +local observe_cases = { + wrapper_observes_expected_values_on_ok = { + operations = stats_module.op, + func = 'return_true', + changed_coll = 'ok', + unchanged_coll = 'error', + }, + wrapper_observes_expected_values_on_error_return = { + operations = stats_module.op, + func = 'return_err', + changed_coll = 'error', + unchanged_coll = 'ok', + }, + wrapper_observes_expected_values_on_error_throw = { + operations = stats_module.op, + func = 'throws_error', + changed_coll = 'error', + unchanged_coll = 'ok', + pcall = true, + }, +} + +local call_wrapped = [[ + local func = rawget(_G, select(1, ...)) + local op = select(2, ...) + local opts = select(3, ...) + local space_name = select(4, ...) + + stats_module.wrap(func, op, opts)(space_name) +]] + +for name, case in pairs(observe_cases) do + for _, op in pairs(case.operations) do + local test_name = ('test_%s_%s'):format(op, name) + + pgroup[test_name] = function(g) + -- Call wrapped functions on server side. + -- Collect execution times from outside. + local run_count = 10 + local time_diffs = {} + + local args = { case.func, op, case.opts, space_name } + + for _ = 1, run_count do + local before_start = clock.monotonic() + + if case.pcall then + pcall(g.router.eval, g.router, call_wrapped, args) + else + g.router:eval(call_wrapped, args) + end + + local after_finish = clock.monotonic() + + table.insert(time_diffs, after_finish - before_start) + end + + table.sort(time_diffs) + local total_time = fun.sum(time_diffs) + + -- Validate stats format after execution. + local total_stats = get_stats(g) + t.assert_type(total_stats, 'table', 'Total stats present after observations') + + local space_stats = get_stats(g, space_name) + t.assert_type(space_stats, 'table', 'Space stats present after observations') + + t.assert_equals(total_stats.spaces[space_name], space_stats, + 'Space stats is a section of total stats') + + local op_stats = space_stats[op] + t.assert_type(op_stats, 'table', 'Op stats present after observations for the space') + + -- Expected collectors (changed_coll: 'ok' or 'error') have changed. + local changed = op_stats[case.changed_coll] + t.assert_type(changed, 'table', 'Status stats present after observations') + + t.assert_equals(changed.count, run_count, 'Count incremented by count of runs') + + local sleep_time = helpers.simple_functions_params().sleep_time + t.assert_ge(changed.latency, sleep_time, 'Latency has appropriate value') + t.assert_le(changed.latency, time_diffs[#time_diffs], 'Latency has appropriate value') + + t.assert_ge(changed.time, sleep_time * run_count, + 'Total time increase has appropriate value') + t.assert_le(changed.time, total_time, 'Total time increase has appropriate value') + + -- Other collectors (unchanged_coll: 'error' or 'ok') + -- have been initialized and have default values. + local unchanged = op_stats[case.unchanged_coll] + t.assert_type(unchanged, 'table', 'Other status stats present after observations') + + t.assert_equals( + unchanged, + { + count = 0, + latency = 0, + time = 0 + }, + 'Other status collectors initialized after observations' + ) + + -- SELECT collectors have additional details section. + if op == stats_module.op.SELECT then + t.assert_equals( + op_stats.details, + { + tuples_fetched = 0, + tuples_lookup = 0, + map_reduces = 0, + }, + 'Detail collectors initialized after select observations' + ) + end + end + end +end + +local pairs_cases = { + success_run = { + prepare = [[ + local params = ... + local sleep_time = params.sleep_time + + local function sleep_ten_times(param, state) + if state == 10 then + return nil + end + + sleep_for(sleep_time) + + return state + 1, param + end + rawset(_G, 'sleep_ten_times', sleep_ten_times) + ]], + eval = [[ + local params, space_name, op = ... + local sleep_time = params.sleep_time + + local build_sleep_multiplier = 2 + + local wrapped = stats_module.wrap( + function(space_name) + sleep_for(build_sleep_multiplier * sleep_time) + + return sleep_ten_times, {}, 0 + end, + op, + { pairs = true } + ) + + for _, _ in wrapped(space_name) do end + ]], + build_sleep_multiplier = 2, + iterations_expected = 10, + changed_coll = 'ok', + unchanged_coll = 'error', + }, + error_throw = { + prepare = [[ + local params = ... + local sleep_time = params.sleep_time + local error_table = params.error + + + local function sleep_five_times_and_throw_error(param, state) + if state == 5 then + error(error_table) + end + + sleep_for(sleep_time) + + return state + 1, param + end + rawset(_G, 'sleep_five_times_and_throw_error', sleep_five_times_and_throw_error) + ]], + eval = [[ + local params, space_name, op = ... + local sleep_time = params.sleep_time + + local build_sleep_multiplier = 2 + + local wrapped = stats_module.wrap( + function(space_name) + sleep_for(build_sleep_multiplier * sleep_time) + + return sleep_five_times_and_throw_error, {}, 0 + end, + op, + { pairs = true } + ) + + for _, _ in wrapped(space_name) do end + ]], + build_sleep_multiplier = 2, + iterations_expected = 5, + changed_coll = 'error', + unchanged_coll = 'ok', + pcall = true, + }, + break_after_gc = { + prepare = [[ + local params = ... + local sleep_time = params.sleep_time + + local function sleep_ten_times(param, state) + if state == 10 then + return nil + end + + sleep_for(sleep_time) + + return state + 1, param + end + rawset(_G, 'sleep_ten_times', sleep_ten_times) + ]], + eval = [[ + local params, space_name, op = ... + local sleep_time = params.sleep_time + + local build_sleep_multiplier = 2 + + local wrapped = stats_module.wrap( + function(space_name) + sleep_for(build_sleep_multiplier * sleep_time) + + return sleep_ten_times, {}, 0 + end, + op, + { pairs = true } + ) + + for i, _ in wrapped(space_name) do + if i == 5 then + break + end + end + ]], + post_eval = [[ + collectgarbage('collect') + collectgarbage('collect') + require('fiber').yield() + ]], + build_sleep_multiplier = 2, + iterations_expected = 5, + changed_coll = 'ok', + unchanged_coll = 'error', + } +} + +for name, case in pairs(pairs_cases) do + local test_name = ('test_pairs_wrapper_observes_all_iterations_on_%s'):format(name) + + pgroup.before_test(test_name, function(g) + g.router:eval(case.prepare, { helpers.simple_functions_params() }) + end) + + pgroup[test_name] = function(g) + local op = stats_module.op.SELECT + + local params = helpers.simple_functions_params() + local args = { params, space_name, op } + + local before_start = clock.monotonic() + + if case.pcall then + pcall(g.router.eval, g.router, case.eval, args) + else + g.router:eval(case.eval, args) + end + + if case.post_eval then + g.router:eval(case.post_eval) + end + + local after_finish = clock.monotonic() + local time_diff = after_finish - before_start + + -- Validate stats format after execution. + local total_stats = get_stats(g) + t.assert_type(total_stats, 'table', 'Total stats present after observations') + + local space_stats = get_stats(g, space_name) + t.assert_type(space_stats, 'table', 'Space stats present after observations') + + t.assert_equals(total_stats.spaces[space_name], space_stats, + 'Space stats is a section of total stats') + + local op_stats = space_stats[op] + t.assert_type(op_stats, 'table', 'Op stats present after observations for the space') + + -- Expected collectors (changed_coll: 'ok' or 'error') have changed. + local changed = op_stats[case.changed_coll] + t.assert_type(changed, 'table', 'Status stats present after observations') + + t.assert_equals(changed.count, 1, 'Count incremented by 1') + + t.assert_ge(changed.latency, + params.sleep_time * (case.build_sleep_multiplier + case.iterations_expected), + 'Latency has appropriate value') + t.assert_le(changed.latency, time_diff, 'Latency has appropriate value') + + t.assert_ge(changed.time, + params.sleep_time * (case.build_sleep_multiplier + case.iterations_expected), + 'Total time has appropriate value') + t.assert_le(changed.time, time_diff, 'Total time has appropriate value') + + -- Other collectors (unchanged_coll: 'error' or 'ok') + -- have been initialized and have default values. + local unchanged = op_stats[case.unchanged_coll] + t.assert_type(unchanged, 'table', 'Other status stats present after observations') + + t.assert_equals( + unchanged, + { + count = 0, + latency = 0, + time = 0 + }, + 'Other status collectors initialized after observations' + ) + end +end + +-- Test wrapper preserves return values. +local disable_stats_cases = { + stats_disable_before_wrap_ = { + before_wrap = 'stats_module.disable()', + after_wrap = '', + }, + stats_disable_after_wrap_ = { + before_wrap = '', + after_wrap = 'stats_module.disable()', + }, + [''] = { + before_wrap = '', + after_wrap = '', + }, +} + +local preserve_return_cases = { + wrapper_preserves_return_values_on_ok = { + func = 'return_true', + res = true, + err = nil, + }, + wrapper_preserves_return_values_on_error = { + func = 'return_err', + res = nil, + err = helpers.simple_functions_params().error, + }, +} + +local preserve_throw_cases = { + wrapper_preserves_error_throw = { + opts = { pairs = false }, + }, + pairs_wrapper_preserves_error_throw = { + opts = { pairs = true }, + }, +} + +for name_head, disable_case in pairs(disable_stats_cases) do + for name_tail, return_case in pairs(preserve_return_cases) do + local test_name = ('test_%s%s'):format(name_head, name_tail) + + pgroup[test_name] = function(g) + local op = stats_module.op.INSERT + + local eval = ([[ + local func = rawget(_G, select(1, ...)) + local op = select(2, ...) + local space_name = select(3, ...) + + %s -- before_wrap + local w_func = stats_module.wrap(func, op) + %s -- after_wrap + + return w_func(space_name) + ]]):format(disable_case.before_wrap, disable_case.after_wrap) + + local res, err = g.router:eval(eval, { return_case.func, op, space_name }) + + t.assert_equals(res, return_case.res, 'Wrapper preserves first return value') + t.assert_equals(err, return_case.err, 'Wrapper preserves second return value') + end + end + + local test_name = ('test_%spairs_wrapper_preserves_return_values'):format(name_head) + + pgroup[test_name] = function(g) + local op = stats_module.op.INSERT + + local input = { a = 'a', b = 'b' } + local eval = ([[ + local input = select(1, ...) + local func = function() return pairs(input) end + local op = select(2, ...) + local space_name = select(3, ...) + + %s -- before_wrap + local w_func = stats_module.wrap(func, op, { pairs = true }) + %s -- after_wrap + + local res = {} + for k, v in w_func(space_name) do + res[k] = v + end + + return res + ]]):format(disable_case.before_wrap, disable_case.after_wrap) + + local res = g.router:eval(eval, { input, op, space_name }) + + t.assert_equals(input, res, 'Wrapper preserves pairs return values') + end + + for name_tail, throw_case in pairs(preserve_throw_cases) do + local test_name = ('test_%s%s'):format(name_head, name_tail) + + pgroup[test_name] = function(g) + local op = stats_module.op.INSERT + + local eval = ([[ + local func = rawget(_G, 'throws_error') + local opts = select(1, ...) + local op = select(2, ...) + local space_name = select(3, ...) + + %s -- before_wrap + local w_func = stats_module.wrap(func, op, opts) + %s -- after_wrap + + w_func(space_name) + ]]):format(disable_case.before_wrap, disable_case.after_wrap) + + t.assert_error_msg_contains( + helpers.simple_functions_params().error_msg, + g.router.eval, g.router, eval, { throw_case.opts, op, space_name } + ) + end + end +end + +pgroup.test_stats_is_empty_after_disable = function(g) + disable_stats(g) + + local op = stats_module.op.INSERT + g.router:eval(call_wrapped, { 'return_true', op, {}, space_name }) + + local stats = get_stats(g) + t.assert_equals(stats, {}) +end + + +local function prepare_non_default_stats(g) + local op = stats_module.op.INSERT + g.router:eval(call_wrapped, { 'return_true', op, {}, space_name }) + + local stats = get_stats(g, space_name) + t.assert_equals(stats[op].ok.count, 1, 'Non-zero stats prepared') + + return stats +end + +pgroup.test_enable_is_idempotent = function(g) + local stats_before = prepare_non_default_stats(g) + + enable_stats(g) + + local stats_after = get_stats(g, space_name) + + t.assert_equals(stats_after, stats_before, 'Stats have not been reset') +end + +pgroup.test_reset = function(g) + prepare_non_default_stats(g) + + reset_stats(g) + + local stats = get_stats(g, space_name) + + t.assert_equals(stats, {}, 'Stats have been reset') +end + +pgroup.test_reset_for_disabled_stats_does_not_init_module = function(g) + disable_stats(g) + + local stats_before = get_stats(g) + t.assert_equals(stats_before, {}, "Stats is empty") + + reset_stats(g) + + local stats_after = get_stats(g) + t.assert_equals(stats_after, {}, "Stats is still empty") +end + +pgroup.test_fetch_stats_update = function(g) + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + + g.router:eval([[ stats_module.update_fetch_stats(...) ]], + { storage_cursor_stats, space_name }) + + local op = stats_module.op.SELECT + local stats = get_stats(g, space_name) + + t.assert_not_equals(stats[op], nil, + 'Fetch stats update inits SELECT collectors') + + local details = stats[op].details + + t.assert_equals(details.tuples_fetched, 5, + 'tuples_fetched is inremented by expected value') + t.assert_equals(details.tuples_lookup, 25, + 'tuples_lookup is inremented by expected value') +end + +pgroup.test_disable_stats_do_not_break_fetch_stats_update_call = function(g) + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + + disable_stats(g) + + local _, err = g.router:eval([[ stats_module.update_fetch_stats(...) ]], + { storage_cursor_stats, space_name }) + t.assert_equals(err, nil) +end + +pgroup.test_map_reduce_increment = function(g) + local op = stats_module.op.SELECT + + local _, err = g.router:eval([[ stats_module.update_map_reduces(...) ]], { space_name }) + t.assert_equals(err, nil) + + local stats = get_stats(g) + + t.assert_equals(stats.spaces[space_name][op].details.map_reduces, 1, + "Counter of map reduces incremented") +end + +pgroup.test_disable_stats_do_not_break_map_reduce_update_call = function(g) + disable_stats(g) + + local _, err = g.router:eval([[ stats_module.update_map_reduces(...) ]], { space_name }) + t.assert_equals(err, nil) +end + + +group_driver.test_default_driver = function(g) + enable_stats(g) + + local driver = g.router:eval(" return stats_module.internal.driver ") + + if g.is_metrics_supported then + t.assert_equals(driver, 'metrics') + else + t.assert_equals(driver, 'local') + end +end + + +group_driver.test_default_quantiles = function(g) + enable_stats(g) + + local quantiles = g.router:eval(" return stats_module.internal.quantiles ") + t.assert_equals(quantiles, false) +end + + +group_driver.before_test( + 'test_stats_reenable_with_different_driver_reset_stats', + function(g) + t.skip_if(g.is_metrics_supported == false, 'Metrics registry is unsupported') + end +) + +group_driver.test_stats_reenable_with_different_driver_reset_stats = function(g) + enable_stats(g, { driver = 'metrics' }) + + prepare_non_default_stats(g) + + enable_stats(g, { driver = 'local' }) + local stats = get_stats(g) + t.assert_equals(stats.spaces, {}, 'Stats have been reset') +end + + +group_driver.test_unknown_driver_throws_error = function(g) + t.assert_error_msg_contains( + 'Unsupported driver: unknown', + enable_stats, g, { driver = 'unknown' }) +end + + +group_driver.before_test( + 'test_stats_enable_with_metrics_throws_error_if_unsupported', + function(g) + t.skip_if(g.is_metrics_supported == true, 'Metrics registry is supported') + end +) + +group_driver.test_stats_enable_with_metrics_throws_error_if_unsupported = function(g) + t.assert_error_msg_contains( + 'Unsupported driver: metrics', + enable_stats, g, { driver = 'metrics' }) +end + + +group_driver.test_stats_enable_with_local_throws_error_if_quantiles_enabled = function(g) + t.assert_error_msg_contains( + 'Quantiles are not supported', + enable_stats, g, { driver = 'local', quantiles = true }) +end