|
| 1 | +#!/usr/bin/env tarantool |
| 2 | + |
| 3 | +local clock = require('clock') |
| 4 | +local os = require('os') |
| 5 | +local fiber = require('fiber') |
| 6 | +local queue = require('queue') |
| 7 | + |
| 8 | +-- Set the number of consumers. |
| 9 | +local consumers_count = 100 |
| 10 | +-- Set the number of tasks processed by one consumer per iteration. |
| 11 | +local batch_size = 100 |
| 12 | + |
| 13 | +local barrier = fiber.cond() |
| 14 | +local wait_count = 0 |
| 15 | + |
| 16 | +box.cfg() |
| 17 | + |
| 18 | +local test_queue = queue.create_tube('test_queue', 'fifo', {temporary = true}) |
| 19 | + |
| 20 | +local function prepare_consumers() |
| 21 | + local consumers = {} |
| 22 | + |
| 23 | + for i = 1, consumers_count do |
| 24 | + consumers[i] = fiber.new(function() |
| 25 | + wait_count = wait_count + 1 |
| 26 | + -- Wait for all consumers to start. |
| 27 | + barrier:wait() |
| 28 | + |
| 29 | + -- Create test tasks. |
| 30 | + for j = 1, batch_size do |
| 31 | + test_queue:put('test, cons: ' .. i) |
| 32 | + end |
| 33 | + |
| 34 | + wait_count = wait_count + 1 |
| 35 | + -- Wait for all consumers to create tasks. |
| 36 | + barrier:wait() |
| 37 | + |
| 38 | + -- Ack the tasks. |
| 39 | + for j = 1, batch_size do |
| 40 | + local task = test_queue:take() |
| 41 | + test_queue:ack(task[1]) |
| 42 | + end |
| 43 | + |
| 44 | + wait_count = wait_count + 1 |
| 45 | + end) |
| 46 | + end |
| 47 | + |
| 48 | + return consumers |
| 49 | +end |
| 50 | + |
| 51 | +local function multi_consumer_bench() |
| 52 | + --- Wait for all consumer fibers. |
| 53 | + local wait_all = function() |
| 54 | + while (wait_count ~= consumers_count) do |
| 55 | + fiber.yield() |
| 56 | + end |
| 57 | + wait_count = 0 |
| 58 | + end |
| 59 | + |
| 60 | + -- Wait for all consumers to start. |
| 61 | + local consumers = prepare_consumers() |
| 62 | + wait_all() |
| 63 | + |
| 64 | + -- Start timing creation of tasks. |
| 65 | + local start_put_time = clock.proc64() |
| 66 | + barrier:broadcast() |
| 67 | + -- Wait for all consumers to create tasks. |
| 68 | + wait_all() |
| 69 | + -- Complete timing creation of tasks. |
| 70 | + local start_ack_time = clock.proc64() |
| 71 | + barrier:broadcast() |
| 72 | + -- Wait for all tasks to be acked. |
| 73 | + wait_all() |
| 74 | + -- Complete the timing of task confirmation. |
| 75 | + local complete_time = clock.proc64() |
| 76 | + |
| 77 | + --Print the result in microseconds |
| 78 | + print("Time it takes to fill the queue: " .. |
| 79 | + tostring((start_ack_time - start_put_time) / 10^3)) |
| 80 | + print("Time it takes to confirm the tasks: " .. |
| 81 | + tostring((complete_time - start_ack_time) / 10^3)) |
| 82 | +end |
| 83 | + |
| 84 | +-- Start benchmark. |
| 85 | +multi_consumer_bench() |
| 86 | + |
| 87 | +-- Cleanup. |
| 88 | +test_queue:drop() |
| 89 | + |
| 90 | +os.exit(0) |
0 commit comments