Skip to content

Commit 2747a8e

Browse files
committed
add "shared" sessions to queue
Before the patch a connection to server was synonym of the queue session. Now the session has a unique UUID (returned by the "queue.identificate()" method), and one session can have many connections. The session will be deleted (all session tasks will be released) after the last connection is disconnected. To connect to an existing session, call "queue.identificate(uuid)" with the previously obtained UUID. Part of #85
1 parent 532243b commit 2747a8e

File tree

3 files changed

+235
-38
lines changed

3 files changed

+235
-38
lines changed

README.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ align="right">
1919
* [Fields of the \_queue space](#fields-of-the-_queue-space)
2020
* [Fields of the \_queue\_consumers space](#fields-of-the-_queue_consumers-space)
2121
* [Fields of the \_queue\_taken space](#fields-of-the-_queue_taken-space)
22+
* [Fields of the \_queue\_session\_ids space](#fields-of-the-_queue_session_ids-space)
2223
* [Fields of the space associated with each queue](#fields-of-the-space-associated-with-each-queue)
2324
* [Installing](#installing)
2425
* [Using the queue module](#using-the-queue-module)
@@ -245,7 +246,7 @@ Consumers may be simply waiting for tasks to be put in the queues.
245246

246247
## Fields of the `_queue_consumers` space
247248

248-
1. `session` - session (connection) ID of the client
249+
1. `session` - connection ID of the client
249250
1. `fid` - client fiber ID
250251
1. `tube_id` - queue ID, referring to the `tube_id` field in the `_queue`
251252
space; the client waits for tasks in this queue
@@ -257,8 +258,7 @@ processing a task in the queue.
257258

258259
## Fields of the `_queue_taken` space
259260

260-
1. `session` - session (connection) ID of the client, referring to the
261-
`session_id` field of the `_queue_consumers` space
261+
1. `session_uuid` - session UUID (string)
262262
1. `tube_id` - queue ID, to which the task belongs
263263
1. `task_id` - task ID (of the task being taken)
264264
1. `time` - the time when the client began to execute the task
@@ -267,6 +267,14 @@ Also, there is a space which is associated with each queue,
267267
which is named in the `space` field of the `_queue` space.
268268
The associated space contains one tuple for each task.
269269

270+
The `_queue_session_ids` temporary space contains a map: box session id to the
271+
session UUID.
272+
273+
## Fields of the `_queue_session_ids` space
274+
275+
1. box_session_id - connection id (numeric)
276+
2. queue_session_uuid - session UUID (string)
277+
270278
## Fields of the space associated with each queue
271279

272280
1. task_id - numeric - see below

queue/abstract.lua

Lines changed: 139 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
local log = require('log')
22
local fiber = require('fiber')
3+
local uuid = require('uuid')
34

45
local state = require('queue.abstract.state')
56

@@ -75,6 +76,19 @@ local function tube_release_all_tasks(tube)
7576
log.info(prefix .. ('released %d tasks'):format(released))
7677
end
7778

79+
--- Get session UUID by box session ID.
80+
local function get_quuid_by_sid(box_sid)
81+
local session_ids = box.space._queue_session_ids.index.box_sid:get(box_sid)
82+
local quuid
83+
if session_ids ~= nil then
84+
quuid = session_ids[2]
85+
else
86+
quuid = queue.identificate()
87+
end
88+
89+
return quuid
90+
end
91+
7892
-- tube methods
7993
local tube = {}
8094

@@ -140,7 +154,8 @@ function tube.touch(self, id, delta)
140154
return
141155
end
142156

143-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
157+
local _taken = box.space._queue_taken:get{get_quuid_by_sid(session.id()),
158+
self.tube_id, id}
144159
if _taken == nil then
145160
error("Task was not taken in the session")
146161
end
@@ -152,7 +167,8 @@ function tube.touch(self, id, delta)
152167
end
153168

154169
function tube.ack(self, id)
155-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
170+
local quuid = get_quuid_by_sid(session.id())
171+
local _taken = box.space._queue_taken:get{quuid, self.tube_id, id}
156172
if _taken == nil then
157173
error("Task was not taken in the session")
158174
end
@@ -161,7 +177,7 @@ function tube.ack(self, id)
161177

162178
self:peek(id)
163179
-- delete task
164-
box.space._queue_taken:delete{session.id(), self.tube_id, id}
180+
box.space._queue_taken:delete{quuid, self.tube_id, id}
165181
local result = self.raw:normalize_task(
166182
self.raw:delete(id):transform(2, 1, state.DONE)
167183
)
@@ -171,20 +187,20 @@ function tube.ack(self, id)
171187
return result
172188
end
173189

174-
local function tube_release_internal(self, id, opts, session_id)
190+
local function tube_release_internal(self, id, opts, queue_uuid)
175191
opts = opts or {}
176-
local _taken = box.space._queue_taken:get{session_id, self.tube_id, id}
192+
local _taken = box.space._queue_taken:get{queue_uuid, self.tube_id, id}
177193
if _taken == nil then
178194
error("Task was not taken in the session")
179195
end
180196

181-
box.space._queue_taken:delete{session_id, self.tube_id, id}
197+
box.space._queue_taken:delete{queue_uuid, self.tube_id, id}
182198
self:peek(id)
183199
return self.raw:normalize_task(self.raw:release(id, opts))
184200
end
185201

186202
function tube.release(self, id, opts)
187-
return tube_release_internal(self, id, opts, session.id())
203+
return tube_release_internal(self, id, opts, get_quuid_by_sid(session.id()))
188204
end
189205

190206
function tube.peek(self, id)
@@ -197,9 +213,10 @@ end
197213

198214
function tube.bury(self, id)
199215
local task = self:peek(id)
200-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
216+
local quuid = get_quuid_by_sid(session.id())
217+
local _taken = box.space._queue_taken:get{quuid, self.tube_id, id}
201218
if _taken ~= nil then
202-
box.space._queue_taken:delete{session.id(), self.tube_id, id}
219+
box.space._queue_taken:delete{quuid, self.tube_id, id}
203220
end
204221
if task[2] == state.BURIED then
205222
return task
@@ -281,9 +298,11 @@ function tube.grant(self, user, args)
281298
tube_grant_space(user, '_queue', 'read')
282299
tube_grant_space(user, '_queue_consumers')
283300
tube_grant_space(user, '_queue_taken')
301+
tube_grant_space(user, '_queue_session_ids')
284302
tube_grant_space(user, self.name)
285303

286304
if args.call then
305+
tube_grant_func(user, 'queue.identificate')
287306
local prefix = (args.prefix or 'queue.tube') .. ('.%s:'):format(self.name)
288307
tube_grant_func(user, prefix .. 'take')
289308
tube_grant_func(user, prefix .. 'touch')
@@ -362,7 +381,8 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
362381
end
363382
-- task switched to taken - register in taken space
364383
elseif task[2] == state.TAKEN then
365-
queue_taken:insert{session.id(), self.tube_id, task[1], fiber.time64()}
384+
queue_taken:insert{get_quuid_by_sid(session.id()), self.tube_id,
385+
task[1], fiber.time64()}
366386
end
367387
if stats_data ~= nil then
368388
queue.stat[space.name]:inc(stats_data)
@@ -393,17 +413,47 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
393413
return self
394414
end
395415

416+
--- Release all session tasks if the session has no actual connections.
417+
local function cleanup_session_tasks(queue_uuid)
418+
local sesion_ids = box.space._queue_session_ids.index.queue_uuid:select(
419+
queue_uuid,
420+
{limit = 1})[1]
421+
if sesion_ids ~= nil then
422+
return
423+
end
424+
425+
-- Release all session tasks.
426+
while true do
427+
local task = box.space._queue_taken.index.pk:min{queue_uuid}
428+
if task == nil or task[1] ~= queue_uuid then
429+
break
430+
end
431+
432+
local tube_row = box.space._queue.index.tube_id:get{task[2]}
433+
if tube_row == nil then
434+
log.error("Inconsistent queue state: tube %d not found", task[2])
435+
box.space._queue_taken:delete{queue_uuid, task[2], task[3]}
436+
else
437+
log.warn("Session %s closed, release task %s",
438+
uuid.frombin(queue_uuid):str(), task[3])
439+
440+
tube_release_internal(queue.tube[tube_row[1]], task[3], nil,
441+
queue_uuid)
442+
end
443+
end
444+
end
445+
396446
function method._on_consumer_disconnect()
397-
local waiter, fb, task, tube, id
398-
id = session.id()
447+
local sid = session.id()
448+
399449
-- wakeup all waiters
400450
while true do
401-
waiter = box.space._queue_consumers.index.pk:min{id}
451+
local waiter = box.space._queue_consumers.index.pk:min{sid}
402452
if waiter == nil then
403453
break
404454
end
405455
-- Don't touch the other consumers
406-
if waiter[1] ~= id then
456+
if waiter[1] ~= sid then
407457
break
408458
end
409459
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
@@ -414,24 +464,11 @@ function method._on_consumer_disconnect()
414464
end
415465
end
416466

417-
-- release all session tasks
418-
while true do
419-
task = box.space._queue_taken.index.pk:min{id}
420-
if task == nil or task[1] ~= id then
421-
break
422-
end
423-
424-
tube = box.space._queue.index.tube_id:get{task[2]}
425-
if tube == nil then
426-
log.error("Inconsistent queue state: tube %d not found", task[2])
427-
box.space._queue_taken:delete{task[1], task[2], task[3] }
428-
else
429-
log.warn("Consumer %s disconnected, release task %s(%s)",
430-
id, task[3], tube[1])
431-
432-
tube_release_internal(queue.tube[tube[1]], task[3], nil, id)
433-
end
434-
end
467+
-- Remove connection from the list of active connections and
468+
-- release tasks if necessary.
469+
local quuid = get_quuid_by_sid(sid)
470+
box.space._queue_session_ids:delete{sid}
471+
cleanup_session_tasks(quuid)
435472
end
436473

437474
-- function takes tuples and recreates tube
@@ -495,6 +532,31 @@ function method.create_tube(tube_name, tube_type, opts)
495532
return self
496533
end
497534

535+
--- Create everything that's needed to work with "shared" sessions.
536+
local function identification_init()
537+
local queue_session_ids = box.space._queue_session_ids
538+
if queue_session_ids == nil then
539+
queue_session_ids = box.schema.create_space('_queue_session_ids', {
540+
temporary = true,
541+
format = {
542+
{name = 'box_session_id', type = num_type()},
543+
{name = 'queue_session_uuid', type = str_type()}
544+
}
545+
})
546+
547+
queue_session_ids:create_index('box_sid', {
548+
type = 'tree',
549+
parts = {1, num_type()},
550+
unique = true
551+
})
552+
queue_session_ids:create_index('queue_uuid', {
553+
type = 'tree',
554+
parts = {2, str_type()},
555+
unique = false
556+
})
557+
end
558+
end
559+
498560
-- create or join infrastructure
499561
function method.start()
500562
-- tube_name, tube_id, space_name, tube_type, opts
@@ -527,7 +589,7 @@ function method.start()
527589
_cons = box.schema.create_space('_queue_consumers', {
528590
temporary = true,
529591
format = {
530-
{name = 'session_id', type = num_type()},
592+
{name = 'box_session_id', type = num_type()},
531593
{name = 'fiber_id', type = num_type()},
532594
{name = 'tube_id', type = num_type()},
533595
{name = 'event_time', type = num_type()},
@@ -547,19 +609,27 @@ function method.start()
547609
end
548610

549611
local _taken = box.space._queue_taken
612+
613+
-- Format of the temporary space has been changed.
614+
-- So recreate it if needed.
615+
if _taken ~= nil and _taken:format()[1]['type'] == 'unsigned' then
616+
_taken:drop()
617+
_taken = nil
618+
end
619+
550620
if _taken == nil then
551621
-- session_id, tube_id, task_id, time
552622
_taken = box.schema.create_space('_queue_taken', {
553623
temporary = true,
554624
format = {
555-
{name = 'session_id', type = num_type()},
625+
{name = 'queue_session_uuid', type = str_type()},
556626
{name = 'tube_id', type = num_type()},
557627
{name = 'task_id', type = num_type()},
558628
{name = 'taken_time', type = num_type()}
559629
}})
560630
_taken:create_index('pk', {
561631
type = 'tree',
562-
parts = {1, num_type(), 2, num_type(), 3, num_type()},
632+
parts = {1, str_type(), 2, num_type(), 3, num_type()},
563633
unique = true})
564634

565635
_taken:create_index('task',{
@@ -575,6 +645,8 @@ function method.start()
575645
tube_release_all_tasks(tube)
576646
end
577647

648+
identification_init()
649+
578650
session.on_disconnect(queue._on_consumer_disconnect)
579651
return queue
580652
end
@@ -613,6 +685,38 @@ local function build_stats(space)
613685
return stats
614686
end
615687

688+
--- Identificate the session.
689+
-- If "queue_uuid" == nil - create a new UUID for the session
690+
-- and return it, otherwise connect to the existing one.
691+
function method.identificate(queue_uuid)
692+
local queue_session_ids = box.space._queue_session_ids
693+
local sid = session.id()
694+
local session_ids = queue_session_ids.index.box_sid:get(sid)
695+
local quuid = session_ids and session_ids[2]
696+
697+
if queue_uuid == nil and quuid == nil then
698+
-- Generate new UUID for the session.
699+
quuid = uuid.bin()
700+
queue_session_ids:insert{sid, quuid}
701+
elseif queue_uuid ~= nil then
702+
-- Identificate using a previously created session.
703+
704+
--Validate UUID.
705+
uuid.frombin(queue_uuid)
706+
707+
if quuid ~= queue_uuid then
708+
queue_session_ids:upsert({sid, queue_uuid}, {{'=', 2, queue_uuid}})
709+
if quuid ~= nil then
710+
-- Cleanup old session tasks.
711+
cleanup_session_tasks(quuid)
712+
end
713+
quuid = queue_uuid
714+
end
715+
end
716+
717+
return quuid
718+
end
719+
616720
queue.statistics = function(space)
617721
if space ~= nil then
618722
return build_stats(space)

0 commit comments

Comments
 (0)