Skip to content

Commit fe2fda4

Browse files
committed
benchmark: add a benchmark for working with multiple consumers
Related to #105 Follows up #85
1 parent 862afb9 commit fe2fda4

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed

t/benchmark/multi_consumer_work.lua

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#!/usr/bin/env tarantool
2+
3+
local clock = require('clock')
4+
local os = require('os')
5+
local fiber = require('fiber')
6+
local queue = require('queue')
7+
8+
-- Set the number of consumers.
9+
local consumers_count = 100
10+
-- Set the number of tasks processed by one consumer per iteration.
11+
local batch_size = 100
12+
13+
local barrier = fiber.cond()
14+
local wait_count = 0
15+
16+
box.cfg()
17+
18+
local test_queue = queue.create_tube('test_queue', 'fifo', {temporary = true})
19+
20+
local function prepare_consumers()
21+
local consumers = {}
22+
local test_data = 'test data'
23+
24+
for i = 1, consumers_count do
25+
consumers[i] = fiber.create(function()
26+
wait_count = wait_count + 1
27+
-- Wait for all consumers to start.
28+
barrier:wait()
29+
30+
-- Create test tasks.
31+
for j = 1, batch_size do
32+
test_queue:put(test_data)
33+
end
34+
35+
wait_count = wait_count + 1
36+
-- Wait for all consumers to create tasks.
37+
barrier:wait()
38+
39+
-- Ack the tasks.
40+
for j = 1, batch_size do
41+
local task = test_queue:take()
42+
test_queue:ack(task[1])
43+
end
44+
45+
wait_count = wait_count + 1
46+
end)
47+
end
48+
49+
return consumers
50+
end
51+
52+
local function multi_consumer_bench()
53+
--- Wait for all consumer fibers.
54+
local wait_all = function()
55+
while (wait_count ~= consumers_count) do
56+
fiber.yield()
57+
end
58+
wait_count = 0
59+
end
60+
61+
-- Wait for all consumers to start.
62+
local consumers = prepare_consumers()
63+
wait_all()
64+
65+
-- Start timing creation of tasks.
66+
local start_put_time = clock.proc64()
67+
barrier:broadcast()
68+
-- Wait for all consumers to create tasks.
69+
wait_all()
70+
-- Complete timing creation of tasks.
71+
local start_ack_time = clock.proc64()
72+
barrier:broadcast()
73+
-- Wait for all tasks to be acked.
74+
wait_all()
75+
-- Complete the timing of task confirmation.
76+
local complete_time = clock.proc64()
77+
78+
--Print the result in microseconds
79+
print(string.format("Time it takes to fill the queue: %i",
80+
tonumber((start_ack_time - start_put_time) / 10^3)))
81+
print(string.format("Time it takes to confirm the tasks: %i",
82+
tonumber((complete_time - start_ack_time) / 10^3)))
83+
end
84+
85+
-- Start benchmark.
86+
multi_consumer_bench()
87+
88+
-- Cleanup.
89+
test_queue:drop()
90+
91+
os.exit(0)

0 commit comments

Comments
 (0)