Skip to content

Commit f7f774e

Browse files
committed
add implementation of the "ttr" queue option
"ttr" in seconds - the time after which, if there is no active connection in the session, it will be released with all its tasks. Closes #85
1 parent e0ef204 commit f7f774e

File tree

2 files changed

+141
-18
lines changed

2 files changed

+141
-18
lines changed

queue/abstract.lua

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -414,15 +414,11 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
414414
return self
415415
end
416416

417+
-- Sessions that don't have any active connections.
418+
local inactive_sessions = {}
419+
417420
--- Release all session tasks if the session has no actual connections.
418421
local function cleanup_session_tasks(queue_uuid)
419-
local sesion_ids = box.space._queue_session_ids.index.queue_uuid:select(
420-
queue_uuid,
421-
{limit = 1})[1]
422-
if sesion_ids ~= nil then
423-
return
424-
end
425-
426422
-- Release all session tasks.
427423
while true do
428424
local task = box.space._queue_taken.index.pk:min{queue_uuid}
@@ -444,6 +440,26 @@ local function cleanup_session_tasks(queue_uuid)
444440
end
445441
end
446442

443+
--- Remove a connection from the list of active connections and
444+
-- release its tasks if necessary.
445+
local function remove_connection(box_sid)
446+
local queue_session_ids = box.space._queue_session_ids
447+
local quuid = get_quuid_by_sid(box_sid)
448+
449+
queue_session_ids:delete{box_sid}
450+
local session_ids = queue_session_ids.index.queue_uuid:select(quuid,
451+
{limit = 1})[1]
452+
453+
if session_ids == nil then
454+
local ttr = queue.settings['ttr'] or 0
455+
if ttr > 0 then
456+
inactive_sessions[quuid] = event_time(ttr)
457+
else
458+
cleanup_session_tasks(quuid)
459+
end
460+
end
461+
end
462+
447463
function method._on_consumer_disconnect()
448464
local sid = session.id()
449465

@@ -465,11 +481,7 @@ function method._on_consumer_disconnect()
465481
end
466482
end
467483

468-
-- Remove connection from the list of active connections and
469-
-- release tasks if necessary.
470-
local quuid = get_quuid_by_sid(sid)
471-
box.space._queue_session_ids:delete{sid}
472-
cleanup_session_tasks(quuid)
484+
remove_connection(sid)
473485
end
474486

475487
-- function takes tuples and recreates tube
@@ -574,6 +586,24 @@ local function identification_init()
574586
end
575587
end
576588

589+
--- Create an expiration fiber to cleanup expired sessions.
590+
local function create_expiration_fiber()
591+
local exp_fiber = fiber.new(function()
592+
while true do
593+
local cur_time = time()
594+
for quuid, exp_time in pairs(inactive_sessions) do
595+
if cur_time >= exp_time then
596+
cleanup_session_tasks(quuid)
597+
inactive_sessions[quuid] = nil
598+
end
599+
end
600+
fiber.sleep(1)
601+
end
602+
end)
603+
604+
return exp_fiber
605+
end
606+
577607
-- create or join infrastructure
578608
function method.start()
579609
-- tube_name, tube_id, space_name, tube_type, opts
@@ -663,7 +693,7 @@ function method.start()
663693
end
664694

665695
identification_init()
666-
696+
queue.expiration_fiber = create_expiration_fiber()
667697
session.on_disconnect(queue._on_consumer_disconnect)
668698
return queue
669699
end
@@ -717,20 +747,29 @@ function method.identificate(queue_uuid)
717747
queue_session_ids:insert{sid, quuid}
718748
elseif queue_uuid ~= nil then
719749
-- Identificate using a previously created session.
750+
-- Check that a session with this uuid exists.
751+
local ids_by_uuid = queue_session_ids.index.queue_uuid:select(
752+
queue_uuid, {limit = 1})[1]
753+
if ids_by_uuid == nil and inactive_sessions[queue_uuid] == nil then
754+
error("The UUID " .. uuid.frombin(queue_uuid):str() ..
755+
" is unknown.")
756+
end
720757

721758
--Validate UUID.
722759
uuid.frombin(queue_uuid)
723760

724761
if quuid ~= queue_uuid then
725-
queue_session_ids:upsert({sid, queue_uuid}, {{'=', 2, queue_uuid}})
726762
if quuid ~= nil then
727-
-- Cleanup old session tasks.
728-
cleanup_session_tasks(quuid)
763+
remove_connection(sid)
729764
end
765+
queue_session_ids:insert({sid, queue_uuid})
730766
quuid = queue_uuid
731767
end
732768
end
733769

770+
-- Exclude the session from inactive.
771+
inactive_sessions[quuid] = nil
772+
734773
return quuid
735774
end
736775

t/170-work-with-quuid.t

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ local os = require('os')
77
local tnt = require('t.tnt')
88

99
local test = tap.test('test work with quuid')
10-
test:plan(1)
10+
test:plan(3)
1111

1212
local listen = 'localhost:1918'
1313
tnt.cfg{listen = listen}
@@ -17,7 +17,7 @@ rawset(_G, 'queue', require('queue'))
1717
test:test('test work with two consumers with the same quuid', function(test)
1818
test:plan(4)
1919

20-
-- Preparing of the tube
20+
-- Preparing of the tube.
2121
local tube = queue.create_tube('test_tube', 'fifo')
2222
tube:grant('guest', {call = true})
2323
local quuid_con1
@@ -81,5 +81,89 @@ test:test('test work with two consumers with the same quuid', function(test)
8181
tube:drop()
8282
end)
8383

84+
test:test('test reconnect and ack the task', function(test)
85+
test:plan(2)
86+
87+
-- Preparing of the tube.
88+
queue.set_settings({ttr = 60})
89+
local tube = queue.create_tube('test_tube', 'fifo')
90+
tube:put('test_data')
91+
tube:grant('guest', {call = true})
92+
93+
local fiber_consumer = fiber.new(function()
94+
-- Connect and take a task.
95+
local conn = netbox.connect(listen)
96+
local task = conn:call('queue.tube.test_tube:take')
97+
local task_id = task[1]
98+
local session_id = conn:call('queue.identificate')
99+
conn:close()
100+
101+
-- Reconnect and ack the task.
102+
conn = netbox.connect(listen)
103+
conn:call('queue.identificate', {session_id})
104+
task = conn:call('queue.tube.test_tube:ack', {task_id})
105+
test:ok(task[1] == task_id and task[2] == '-', 'task has been acked')
106+
107+
conn:close()
108+
end)
109+
110+
fiber_consumer:set_joinable(true)
111+
112+
local ok = fiber_consumer:join()
113+
test:ok(ok, 'reconnect and ack the task test done')
114+
115+
tube:drop()
116+
end)
117+
118+
test:test('test expiration', function(test)
119+
test:plan(5)
120+
121+
-- Preparing of the tube.
122+
local tube = queue.create_tube('test_tube', 'fifo')
123+
tube:put('test_data')
124+
tube:grant('guest', {call = true})
125+
126+
local fiber_consumer = fiber.new(function()
127+
queue.set_settings({ttr = 1})
128+
-- Connect and take a task.
129+
local conn = netbox.connect(listen)
130+
local task_id = conn:call('queue.tube.test_tube:take')[1]
131+
local quuid_con = conn:call('queue.identificate')
132+
conn:close()
133+
134+
-- Check that the task is in a "taken" state before ttr expires.
135+
fiber.sleep(0.1)
136+
test:ok(tube:peek(task_id)[2] == 't', 'task in taken state')
137+
fiber.sleep(2)
138+
139+
-- The task must be released after the ttr expires.
140+
test:ok(tube:peek(task_id)[2] == 'r', 'task in ready state after ttr')
141+
142+
-- The old queue session must expire.
143+
conn = netbox.connect(listen)
144+
local ok, err = pcall(conn.call, conn, 'queue.identificate',
145+
{quuid_con})
146+
local check_ident = not ok and err:match('UUID .* is unknown.')
147+
test:ok(check_ident, "the old queue session has expired.")
148+
conn:close()
149+
150+
-- If ttr = 0, the task should be released immediately.
151+
queue.set_settings({ttr = 0})
152+
conn = netbox.connect(listen)
153+
task_id = conn:call('queue.tube.test_tube:take')[1]
154+
conn:close()
155+
fiber.sleep(0.1)
156+
test:ok(tube:peek(task_id)[2] == 'r',
157+
'task has been immediately released')
158+
end)
159+
160+
fiber_consumer:set_joinable(true)
161+
162+
local ok = fiber_consumer:join()
163+
test:ok(ok, 'expiration test done')
164+
165+
tube:drop()
166+
end)
167+
84168
tnt.finish()
85169
os.exit(test:check() and 0 or 1)

0 commit comments

Comments
 (0)