Skip to content

Commit 62c1132

Browse files
committed
add "shared" sessions to queue
Before the patch a connection to server was synonym of the queue session. Now the session has a unique UUID (returned by the "queue.identify()" method), and one session can have many connections. The session will be deleted (all session tasks will be released) after "ttr" seconds have passed since the last connection was disconnected. To connect to an existing session, call "queue.identificate(uuid)" with the previously obtained UUID. "ttr" in seconds - the time after which, if there is no active connection in the session, it will be released with all its tasks. Also, the "_queue_taken" internal space has been replaced with the "_queue_taken_2" with a change format and used indexes (for better performance). The downgrade to previous version works correctly. Closes of #85
1 parent 69375cc commit 62c1132

File tree

6 files changed

+489
-62
lines changed

6 files changed

+489
-62
lines changed

README.md

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ align="right">
1818
* [The underlying spaces](#the-underlying-spaces)
1919
* [Fields of the \_queue space](#fields-of-the-_queue-space)
2020
* [Fields of the \_queue\_consumers space](#fields-of-the-_queue_consumers-space)
21-
* [Fields of the \_queue\_taken space](#fields-of-the-_queue_taken-space)
21+
* [Fields of the \_queue\_taken\_2 space](#fields-of-the-_queue_taken_2-space)
22+
* [Fields of the \_queue\_session\_ids space](#fields-of-the-_queue_session_ids-space)
2223
* [Fields of the space associated with each queue](#fields-of-the-space-associated-with-each-queue)
2324
* [Installing](#installing)
2425
* [Using the queue module](#using-the-queue-module)
@@ -252,17 +253,26 @@ space; the client waits for tasks in this queue
252253
1. `timeout` - the client wait timeout
253254
1. `time` - the time when the client took a task
254255

255-
The `_queue_taken` temporary space contains tuples for each job which is
256-
processing a task in the queue.
256+
The `_queue_taken_2` (`_queue_taken` is deprecated) temporary space contains
257+
tuples for each job which is processing a task in the queue.
257258

258-
## Fields of the `_queue_taken` space
259+
## Fields of the `_queue_taken_2` space
259260

260-
1. `connection_id` - connection ID of the client, referring to the
261-
`connection_id` field of the `_queue_consumers` space
262261
1. `tube_id` - queue ID, to which the task belongs
263262
1. `task_id` - task ID (of the task being taken)
263+
1. `connection_id` - connection ID of the client, referring to the
264+
`connection_id` field of the `_queue_consumers` space
265+
1. `session_uuid` - session UUID (string)
264266
1. `time` - the time when the client began to execute the task
265267

268+
The `_queue_session_ids` temporary space contains a map: box session id to the
269+
session UUID.
270+
271+
## Fields of the `_queue_session_ids` space
272+
273+
1. `box_session_id` - connection id (numeric)
274+
2. `session_uuid` - session UUID (string)
275+
266276
Also, there is a space which is associated with each queue,
267277
which is named in the `space` field of the `_queue` space.
268278
The associated space contains one tuple for each task.
@@ -405,7 +415,7 @@ the job waits until a task becomes ready or the timeout expires.
405415
Effect: the value of `task_state` changes to 't' (taken).
406416
The `take` request tells the system that the task is being worked on.
407417
It should be followed by an `ack` request when the work is finished.
408-
Additional effect: a tuple is added to the `_queue_taken` space.
418+
Additional effect: a tuple is added to the `_queue_taken_2` space.
409419

410420
Returns: the value of the taken tuple, or nil if none was found.
411421
The value of the first field in the tuple (`task_id`) is important

queue-scm-1.rockspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ build = {
1818
modules = {
1919
['queue.abstract'] = 'queue/abstract.lua',
2020
['queue.abstract.state'] = 'queue/abstract/state.lua',
21+
['queue.abstract.queue_session'] = 'queue/abstract/queue_session.lua',
2122
['queue.abstract.driver.fifottl'] = 'queue/abstract/driver/fifottl.lua',
2223
['queue.abstract.driver.utubettl'] = 'queue/abstract/driver/utubettl.lua',
2324
['queue.abstract.driver.fifo'] = 'queue/abstract/driver/fifo.lua',

queue/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract.lua
88
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/)
99
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/state.lua
1010
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/abstract)
11+
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/queue_session.lua
12+
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/abstract)
1113
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/driver/fifo.lua
1214
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/abstract/driver/)
1315
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/driver/utube.lua

queue/abstract.lua

Lines changed: 87 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
local log = require('log')
22
local fiber = require('fiber')
3+
local uuid = require('uuid')
34

5+
local session = require('queue.abstract.queue_session')
46
local state = require('queue.abstract.state')
57

68
local util = require('queue.util')
79
local qc = require('queue.compat')
810
local num_type = qc.num_type
911
local str_type = qc.str_type
1012

13+
-- The term "queue session" has been added to the queue. One "queue session"
14+
-- can include many connections (box.session). For clarity, the box.session
15+
-- will be referred to as connection below.
1116
local connection = box.session
1217

1318
local queue = {
@@ -52,13 +57,15 @@ local function tube_release_all_tasks(tube)
5257
log.info(prefix .. ('released %d tasks'):format(released))
5358
end
5459

55-
--- Check whether the task has been taken in a session with
56-
-- connection id == "conn_id".
60+
--- Check whether the task has been taken in a current session or in a session
61+
-- with session uuid = session_uuid.
5762
-- Throw an error if task is not take in the session.
58-
local function check_task_is_taken(tube_id, task_id, conn_id)
59-
local _taken = box.space._queue_taken.index.task:get{tube_id, task_id}
60-
if _taken == nil or _taken[1] ~= conn_id then
61-
error("Task was not taken in the session")
63+
local function check_task_is_taken(tube_id, task_id, session_uuid)
64+
local _taken = box.space._queue_taken_2.index.task:get{tube_id, task_id}
65+
66+
session_uuid = session_uuid or session.identify(connection.id())
67+
if _taken == nil or _taken[4] ~= session_uuid then
68+
error("Task was not taken")
6269
end
6370
end
6471

@@ -127,7 +134,7 @@ function tube.touch(self, id, delta)
127134
return
128135
end
129136

130-
check_task_is_taken(self.tube_id, id, connection.id())
137+
check_task_is_taken(self.tube_id, id)
131138

132139
local space_name = box.space._queue:get{self.name}[3]
133140
queue.stat[space_name]:inc('touch')
@@ -136,14 +143,13 @@ function tube.touch(self, id, delta)
136143
end
137144

138145
function tube.ack(self, id)
139-
local conn_id = connection.id()
140-
check_task_is_taken(self.tube_id, id, conn_id)
146+
check_task_is_taken(self.tube_id, id)
141147
local tube = box.space._queue:get{self.name}
142148
local space_name = tube[3]
143149

144150
self:peek(id)
145151
-- delete task
146-
box.space._queue_taken.index.task:delete{self.tube_id, id}
152+
box.space._queue_taken_2.index.task:delete{self.tube_id, id}
147153
local result = self.raw:normalize_task(
148154
self.raw:delete(id):transform(2, 1, state.DONE)
149155
)
@@ -153,17 +159,17 @@ function tube.ack(self, id)
153159
return result
154160
end
155161

156-
local function tube_release_internal(self, id, opts, connection_id)
162+
local function tube_release_internal(self, id, opts, session_uuid)
157163
opts = opts or {}
158-
check_task_is_taken(self.tube_id, id, connection_id)
164+
check_task_is_taken(self.tube_id, id, session_uuid)
159165

160-
box.space._queue_taken.index.task:delete{self.tube_id, id}
166+
box.space._queue_taken_2.index.task:delete{self.tube_id, id}
161167
self:peek(id)
162168
return self.raw:normalize_task(self.raw:release(id, opts))
163169
end
164170

165171
function tube.release(self, id, opts)
166-
return tube_release_internal(self, id, opts, connection.id())
172+
return tube_release_internal(self, id, opts)
167173
end
168174

169175
function tube.peek(self, id)
@@ -176,10 +182,9 @@ end
176182

177183
function tube.bury(self, id)
178184
local task = self:peek(id)
179-
local conn_id = connection.id()
180-
local is_taken, _ = pcall(check_task_is_taken, self.tube_id, id, conn_id)
185+
local is_taken, _ = pcall(check_task_is_taken, self.tube_id, id)
181186
if is_taken then
182-
box.space._queue_taken.index.task:delete{self.tube_id, id}
187+
box.space._queue_taken_2.index.task:delete{self.tube_id, id}
183188
end
184189
if task[2] == state.BURIED then
185190
return task
@@ -214,8 +219,8 @@ function tube.drop(self)
214219
error("There are consumers connected the tube")
215220
end
216221

217-
local taken = box.space._queue_taken.index.task:min{tube_id}
218-
if taken ~= nil and taken[2] == tube_id then
222+
local taken = box.space._queue_taken_2.index.task:min{tube_id}
223+
if taken ~= nil and taken[1] == tube_id then
219224
error("There are taken tasks in the tube")
220225
end
221226

@@ -260,10 +265,12 @@ function tube.grant(self, user, args)
260265

261266
tube_grant_space(user, '_queue', 'read')
262267
tube_grant_space(user, '_queue_consumers')
263-
tube_grant_space(user, '_queue_taken')
268+
tube_grant_space(user, '_queue_taken_2')
264269
tube_grant_space(user, self.name)
270+
session.grant(user)
265271

266272
if args.call then
273+
tube_grant_func(user, 'queue.identify')
267274
local prefix = (args.prefix or 'queue.tube') .. ('.%s:'):format(self.name)
268275
tube_grant_func(user, prefix .. 'take')
269276
tube_grant_func(user, prefix .. 'touch')
@@ -319,12 +326,12 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
319326
if task == nil then return end
320327

321328
local queue_consumers = box.space._queue_consumers
322-
local queue_taken = box.space._queue_taken
329+
local queue_taken = box.space._queue_taken_2
323330

324331
-- if task was taken and become other state
325332
local taken = queue_taken.index.task:get{tube_id, task[1]}
326333
if taken ~= nil then
327-
queue_taken:delete{taken[1], taken[2], taken[3]}
334+
queue_taken:delete{taken[1], taken[2]}
328335
end
329336
-- task switched to ready (or new task)
330337
if task[2] == state.READY then
@@ -342,8 +349,15 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
342349
end
343350
-- task switched to taken - register in taken space
344351
elseif task[2] == state.TAKEN then
345-
queue_taken:insert{connection.id(), self.tube_id, task[1],
346-
fiber.time64()}
352+
local conn_id = connection.id()
353+
local session_uuid = session.identify(conn_id)
354+
queue_taken:insert{
355+
self.tube_id,
356+
task[1],
357+
conn_id,
358+
session_uuid,
359+
fiber.time64()
360+
}
347361
end
348362
if stats_data ~= nil then
349363
queue.stat[space.name]:inc(stats_data)
@@ -375,49 +389,45 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
375389
end
376390

377391
--- Release all session tasks.
378-
local function release_session_tasks(connection_id)
379-
while true do
380-
local task = box.space._queue_taken.index.pk:min{connection_id}
381-
if task == nil or task[1] ~= connection_id then
382-
break
383-
end
392+
local function release_session_tasks(session_uuid)
393+
local taken_tasks = box.space._queue_taken_2.index.uuid:select{session_uuid}
384394

385-
local tube = box.space._queue.index.tube_id:get{task[2]}
395+
for _, task in pairs(taken_tasks) do
396+
local tube = box.space._queue.index.tube_id:get{task[1]}
386397
if tube == nil then
387-
log.error("Inconsistent queue state: tube %d not found", task[2])
388-
box.space._queue_taken.index.task:delete{task[2], task[3]}
398+
log.error("Inconsistent queue state: tube %d not found", task[1])
399+
box.space._queue_taken_2.index.task:delete{task[1], task[2]}
389400
else
390-
log.warn("Consumer %s disconnected, release task %s(%s)",
391-
connection_id, task[3], tube[1])
392-
393-
tube_release_internal(queue.tube[tube[1]], task[3], nil,
394-
connection_id)
401+
log.warn("Session %s closed, release task %s(%s)",
402+
uuid.frombin(session_uuid):str(), task[2], tube[1])
403+
tube_release_internal(queue.tube[tube[1]], task[2], nil,
404+
session_uuid)
395405
end
396406
end
397407
end
398408

399409
function method._on_consumer_disconnect()
400-
local waiter, fb, task, tube, id
401-
id = connection.id()
410+
local conn_id = connection.id()
411+
402412
-- wakeup all waiters
403413
while true do
404-
waiter = box.space._queue_consumers.index.pk:min{id}
414+
local waiter = box.space._queue_consumers.index.pk:min{conn_id}
405415
if waiter == nil then
406416
break
407417
end
408418
-- Don't touch the other consumers
409-
if waiter[1] ~= id then
419+
if waiter[1] ~= conn_id then
410420
break
411421
end
412-
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
422+
box.space._queue_consumers:delete{waiter[1], waiter[2]}
413423
local cond = conds[waiter[2]]
414424
if cond then
415425
releasing_connections[waiter[2]] = true
416426
cond:signal(waiter[2])
417427
end
418428
end
419429

420-
release_session_tasks(id)
430+
session.disconnect(conn_id)
421431
end
422432

423433
-- function takes tuples and recreates tube
@@ -532,27 +542,34 @@ function method.start()
532542
})
533543
end
534544

535-
local _taken = box.space._queue_taken
545+
-- Remove deprecated space
546+
if box.space._queue_taken ~= nil then
547+
box.space._queue_taken:drop()
548+
end
549+
550+
local _taken = box.space._queue_taken_2
536551
if _taken == nil then
537-
-- connection_id, tube_id, task_id, time
538-
_taken = box.schema.create_space('_queue_taken', {
552+
-- tube_id, task_id, connection_id, session_uuid, time
553+
_taken = box.schema.create_space('_queue_taken_2', {
539554
temporary = true,
540555
format = {
541-
{name = 'connection_id', type = num_type()},
542556
{name = 'tube_id', type = num_type()},
543557
{name = 'task_id', type = num_type()},
558+
{name = 'connection_id', type = num_type()},
559+
{name = 'session_uuid', type = str_type()},
544560
{name = 'taken_time', type = num_type()}
545561
}})
546-
_taken:create_index('pk', {
547-
type = 'tree',
548-
parts = {1, num_type(), 2, num_type(), 3, num_type()},
549-
unique = true})
550562

551-
_taken:create_index('task',{
563+
_taken:create_index('task', {
552564
type = 'tree',
553-
parts = {2, num_type(), 3, num_type()},
565+
parts = {1, num_type(), 2, num_type()},
554566
unique = true
555567
})
568+
_taken:create_index('uuid', {
569+
type = 'tree',
570+
parts = {4, str_type()},
571+
unique = false
572+
})
556573
end
557574

558575
for _, tube_tuple in _queue:pairs() do
@@ -565,6 +582,9 @@ function method.start()
565582
end
566583
end
567584

585+
session.on_session_remove(release_session_tasks)
586+
session.start()
587+
568588
connection.on_disconnect(queue._on_consumer_disconnect)
569589
return queue
570590
end
@@ -628,20 +648,32 @@ local function build_stats(space)
628648
return stats
629649
end
630650

651+
--- Identifies the connection and return the UUID of the current session.
652+
-- If session_uuid ~= nil: associate the connection with given session.
653+
function method.identify(session_uuid)
654+
return session.identify(connection.id(), session_uuid)
655+
end
656+
631657
--- Configure of the queue module.
632658
-- If an invalid value or an unknown option
633659
-- is used, an error will be thrown.
634660
local function cfg(self, opts)
635661
opts = opts or {}
662+
local session_opts = {}
636663

637664
-- Check all options before configuring so that
638665
-- the configuration is done transactionally.
639666
for key, val in pairs(opts) do
640-
if key ~= 'ttr' then
667+
if key == 'ttr' then
668+
session_opts[key] = val
669+
else
641670
error('Unknown option ' .. tostring(key))
642671
end
643672
end
644673

674+
-- Configuring the queue_session module.
675+
session.cfg(session_opts)
676+
645677
for key, val in pairs(opts) do
646678
self[key] = val
647679
end

0 commit comments

Comments
 (0)