Skip to content

Commit 905d912

Browse files
committed
fix custom driver registration after reboot
Previously, if a custom driver is registered after calling box.cfg() it causes a problem when the instance will be restarted. The cause of the problem is an attempt to recreate a tube of custom type (which is not registered yet). This bug is now fixed. Fixes #137
1 parent d709208 commit 905d912

File tree

3 files changed

+76
-4
lines changed

3 files changed

+76
-4
lines changed

queue/abstract.lua

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -570,15 +570,44 @@ function method.start()
570570
end
571571

572572
for _, tube_tuple in _queue:pairs() do
573-
local tube = recreate_tube(tube_tuple)
574-
-- gh-66: release all taken tasks on start
575-
tube_release_all_tasks(tube)
573+
-- Recreate tubes for registered drivers only.
574+
-- Check if a driver exists for this type of tube.
575+
if queue.driver[tube_tuple[4]] ~= nil then
576+
local tube = recreate_tube(tube_tuple)
577+
-- gh-66: release all taken tasks on start
578+
tube_release_all_tasks(tube)
579+
end
576580
end
577581

578582
session.on_disconnect(queue._on_consumer_disconnect)
579583
return queue
580584
end
581585

586+
--- Register the custom driver.
587+
-- Unlike the "register_driver" method from init.lua, this method
588+
-- recreates the existing tubes of the registered driver.
589+
function method.register_driver(driver_name, tube_ctr)
590+
if type(tube_ctr.create_space) ~= 'function' or
591+
type(tube_ctr.new) ~= 'function' then
592+
error('tube control methods must contain functions "create_space"'
593+
.. ' and "new"')
594+
end
595+
if queue.driver[driver_name] then
596+
error(('overriding registered driver "%s"'):format(driver_name))
597+
end
598+
queue.driver[driver_name] = tube_ctr
599+
600+
-- Recreates the existing tubes of the registered driver.
601+
local _queue = box.space._queue
602+
for _, tube_tuple in _queue:pairs() do
603+
if tube_tuple[4] == driver_name then
604+
local tube = recreate_tube(tube_tuple)
605+
-- Release all task for tube on start.
606+
tube_release_all_tasks(tube)
607+
end
608+
end
609+
end
610+
582611
local function build_stats(space)
583612
local stats = {tasks = {}, calls = {
584613
ack = 0, bury = 0, delete = 0,

queue/init.lua

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ function wrapper_impl(...)
7777
rawset(queue, name, val)
7878
end
7979
abstract.driver = queue.driver
80+
-- Now the "register_driver" method from abstract will be used.
81+
queue.register_driver = nil
8082
setmetatable(queue, getmetatable(abstract))
8183
queue.start()
8284
else
@@ -95,7 +97,6 @@ local function queue_init()
9597
if rawget(box, 'space') ~= nil and box.info.ro == false then
9698
-- The box was configured with read_only = false
9799
queue = require('queue.abstract')
98-
queue.register_driver = register_driver
99100
queue.driver = core_drivers
100101
queue.start()
101102
else

t/170-register-driver-after-reload.t

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/usr/bin/env tarantool
2+
3+
local os = require('os')
4+
local queue = require('queue')
5+
local tap = require('tap')
6+
local tnt = require('t.tnt')
7+
8+
local test = tap.test('custom driver registration after reload')
9+
test:plan(1)
10+
11+
tnt.cfg()
12+
13+
--- Accept gh-137, we need to check custom driver registration
14+
-- after restart. Instead of tarantool reboot, we will additionally
15+
-- call queue.start() to simulate the reload of the module. This
16+
-- is not a clean enough, because queue module doesn't provide the
17+
-- hot restart.
18+
--
19+
-- All tricks in this test are done by professionals, don't try
20+
-- to repeat it yourself!!!
21+
local function check_driver_registration_after_reload()
22+
local fifo = require('queue.abstract.driver.fifo')
23+
queue.register_driver('fifo_cust', fifo)
24+
25+
local tube = queue.create_tube('tube_cust', 'fifo_cust')
26+
tube:put('1')
27+
local task_id = tube:take()[1]
28+
29+
-- Simulate the module reload.
30+
queue.driver.fifo_cust = nil
31+
queue.start()
32+
33+
-- Check the task has been released after reload.
34+
queue.register_driver('fifo_cust', fifo)
35+
local task_status = queue.tube.tube_cust:peek(task_id)[2]
36+
test:is(task_status, 'r', 'check driver registration after reload')
37+
end
38+
39+
check_driver_registration_after_reload()
40+
41+
tnt.finish()
42+
os.exit(test:check() and 0 or 1)

0 commit comments

Comments
 (0)