1
1
local log = require (' log' )
2
2
local fiber = require (' fiber' )
3
+ local uuid = require (' uuid' )
3
4
5
+ local session = require (' queue.abstract.queue_session' )
4
6
local state = require (' queue.abstract.state' )
5
7
6
8
local util = require (' queue.util' )
7
9
local qc = require (' queue.compat' )
8
10
local num_type = qc .num_type
9
11
local str_type = qc .str_type
10
12
13
+ -- The term "queue session" has been added to the queue. One "queue session"
14
+ -- can include many connections (box.session). For clarity, the box.session
15
+ -- will be referred to as connection below.
11
16
local connection = box .session
12
17
13
18
local queue = {
@@ -102,13 +107,15 @@ function tube.take(self, timeout)
102
107
end
103
108
end
104
109
105
- --- Checks is the task has been taken in a session with
106
- -- connection id == "conn_id" .
110
+ --- Checks is the task has been taken in a current session or in a session
111
+ -- with session uuid = session_uuid .
107
112
-- Throw an error on failure.
108
- local function check_task_on_taken (tube_id , task_id , conn_id )
109
- local _taken = box .space ._queue_taken .index .task :get {tube_id , task_id }
110
- if _taken == nil or _taken [1 ] ~= conn_id then
111
- error (" Task was not taken in the session" )
113
+ local function check_task_on_taken (tube_id , task_id , session_uuid )
114
+ local _taken = box .space ._queue_taken_2 .index .task :get {tube_id , task_id }
115
+
116
+ if _taken == nil or ((session_uuid ~= nil and _taken [4 ] ~= session_uuid ) and
117
+ (_taken [4 ] ~= session .get_uuid (connection .id ()))) then
118
+ error (" Task was not taken" )
112
119
end
113
120
end
114
121
@@ -127,7 +134,7 @@ function tube.touch(self, id, delta)
127
134
return
128
135
end
129
136
130
- check_task_on_taken (self .tube_id , id , connection . id () )
137
+ check_task_on_taken (self .tube_id , id )
131
138
132
139
local space_name = box .space ._queue :get {self .name }[3 ]
133
140
queue .stat [space_name ]:inc (' touch' )
@@ -136,14 +143,13 @@ function tube.touch(self, id, delta)
136
143
end
137
144
138
145
function tube .ack (self , id )
139
- local conn_id = connection .id ()
140
- check_task_on_taken (self .tube_id , id , conn_id )
146
+ check_task_on_taken (self .tube_id , id )
141
147
local tube = box .space ._queue :get {self .name }
142
148
local space_name = tube [3 ]
143
149
144
150
self :peek (id )
145
151
-- delete task
146
- box .space ._queue_taken .index .task :delete {self .tube_id , id }
152
+ box .space ._queue_taken_2 .index .task :delete {self .tube_id , id }
147
153
local result = self .raw :normalize_task (
148
154
self .raw :delete (id ):transform (2 , 1 , state .DONE )
149
155
)
@@ -153,17 +159,17 @@ function tube.ack(self, id)
153
159
return result
154
160
end
155
161
156
- local function tube_release_internal (self , id , opts , connection_id )
162
+ local function tube_release_internal (self , id , opts , session_uuid )
157
163
opts = opts or {}
158
- check_task_on_taken (self .tube_id , id , connection_id )
164
+ check_task_on_taken (self .tube_id , id , session_uuid )
159
165
160
- box .space ._queue_taken .index .task :delete {self .tube_id , id }
166
+ box .space ._queue_taken_2 .index .task :delete {self .tube_id , id }
161
167
self :peek (id )
162
168
return self .raw :normalize_task (self .raw :release (id , opts ))
163
169
end
164
170
165
171
function tube .release (self , id , opts )
166
- return tube_release_internal (self , id , opts , connection . id () )
172
+ return tube_release_internal (self , id , opts )
167
173
end
168
174
169
175
function tube .peek (self , id )
176
182
177
183
function tube .bury (self , id )
178
184
local task = self :peek (id )
179
- local conn_id = connection .id ()
180
- local is_taken , _ = pcall (check_task_on_taken , self .tube_id , id , conn_id )
185
+ local is_taken , _ = pcall (check_task_on_taken , self .tube_id , id )
181
186
if is_taken then
182
- box .space ._queue_taken .index .task :delete {self .tube_id , id }
187
+ box .space ._queue_taken_2 .index .task :delete {self .tube_id , id }
183
188
end
184
189
if task [2 ] == state .BURIED then
185
190
return task
@@ -214,8 +219,8 @@ function tube.drop(self)
214
219
error (" There are consumers connected the tube" )
215
220
end
216
221
217
- local taken = box .space ._queue_taken .index .task :min {tube_id }
218
- if taken ~= nil and taken [2 ] == tube_id then
222
+ local taken = box .space ._queue_taken_2 .index .task :min {tube_id }
223
+ if taken ~= nil and taken [1 ] == tube_id then
219
224
error (" There are taken tasks in the tube" )
220
225
end
221
226
@@ -260,10 +265,12 @@ function tube.grant(self, user, args)
260
265
261
266
tube_grant_space (user , ' _queue' , ' read' )
262
267
tube_grant_space (user , ' _queue_consumers' )
263
- tube_grant_space (user , ' _queue_taken ' )
268
+ tube_grant_space (user , ' _queue_taken_2 ' )
264
269
tube_grant_space (user , self .name )
270
+ session .session_grant (user )
265
271
266
272
if args .call then
273
+ tube_grant_func (user , ' queue.identify' )
267
274
local prefix = (args .prefix or ' queue.tube' ) .. (' .%s:' ):format (self .name )
268
275
tube_grant_func (user , prefix .. ' take' )
269
276
tube_grant_func (user , prefix .. ' touch' )
@@ -319,12 +326,12 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
319
326
if task == nil then return end
320
327
321
328
local queue_consumers = box .space ._queue_consumers
322
- local queue_taken = box .space ._queue_taken
329
+ local queue_taken = box .space ._queue_taken_2
323
330
324
331
-- if task was taken and become other state
325
332
local taken = queue_taken .index .task :get {tube_id , task [1 ]}
326
333
if taken ~= nil then
327
- queue_taken :delete {taken [1 ], taken [2 ], taken [ 3 ] }
334
+ queue_taken :delete {taken [1 ], taken [2 ]}
328
335
end
329
336
-- task switched to ready (or new task)
330
337
if task [2 ] == state .READY then
@@ -342,8 +349,15 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
342
349
end
343
350
-- task switched to taken - register in taken space
344
351
elseif task [2 ] == state .TAKEN then
345
- queue_taken :insert {connection .id (), self .tube_id , task [1 ],
346
- fiber .time64 ()}
352
+ local conn_id = connection .id ()
353
+ local session_uuid = session .get_uuid (conn_id )
354
+ queue_taken :insert {
355
+ self .tube_id ,
356
+ task [1 ],
357
+ conn_id ,
358
+ session_uuid ,
359
+ fiber .time64 ()
360
+ }
347
361
end
348
362
if stats_data ~= nil then
349
363
queue .stat [space .name ]:inc (stats_data )
@@ -375,49 +389,45 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
375
389
end
376
390
377
391
--- Release all session tasks.
378
- local function release_session_tasks (connection_id )
379
- while true do
380
- task = box .space ._queue_taken .index .pk :min {connection_id }
381
- if task == nil or task [1 ] ~= connection_id then
382
- break
383
- end
392
+ local function release_session_tasks (session_uuid )
393
+ local taken_tasks = box .space ._queue_taken_2 .index .uuid :select (session_uuid )
384
394
385
- local tube = box .space ._queue .index .tube_id :get {task [2 ]}
395
+ for _ , task in pairs (taken_tasks ) do
396
+ local tube = box .space ._queue .index .tube_id :get {task [1 ]}
386
397
if tube == nil then
387
- log .error (" Inconsistent queue state: tube %d not found" , task [2 ])
388
- box .space ._queue_taken .index .task :delete {task [2 ], task [3 ]}
398
+ log .error (" Inconsistent queue state: tube %d not found" , task [1 ])
399
+ box .space ._queue_taken_2 .index .task :delete {task [1 ], task [2 ]}
389
400
else
390
- log .warn (" Consumer %s disconnected, release task %s(%s)" ,
391
- connection_id , task [3 ], tube [1 ])
392
-
393
- tube_release_internal (queue .tube [tube [1 ]], task [3 ], nil ,
394
- connection_id )
401
+ log .warn (" Session %s closed, release task %s(%s)" ,
402
+ uuid .frombin (session_uuid ):str (), task [2 ], tube [1 ])
403
+ tube_release_internal (queue .tube [tube [1 ]], task [2 ], nil ,
404
+ session_uuid )
395
405
end
396
406
end
397
407
end
398
408
399
409
function method ._on_consumer_disconnect ()
400
- local waiter , fb , task , tube , id
401
- id = connection . id ()
410
+ local conn_id = connection . id ()
411
+
402
412
-- wakeup all waiters
403
413
while true do
404
- waiter = box .space ._queue_consumers .index .pk :min {id }
414
+ waiter = box .space ._queue_consumers .index .pk :min {conn_id }
405
415
if waiter == nil then
406
416
break
407
417
end
408
418
-- Don't touch the other consumers
409
- if waiter [1 ] ~= id then
419
+ if waiter [1 ] ~= conn_id then
410
420
break
411
421
end
412
- box .space ._queue_consumers :delete { waiter [1 ], waiter [2 ] }
422
+ box .space ._queue_consumers :delete {waiter [1 ], waiter [2 ]}
413
423
local cond = conds [waiter [2 ]]
414
424
if cond then
415
425
releasing_connections [waiter [2 ]] = true
416
426
cond :signal (waiter [2 ])
417
427
end
418
428
end
419
429
420
- release_session_tasks ( id )
430
+ session . remove_connection ( conn_id )
421
431
end
422
432
423
433
-- function takes tuples and recreates tube
@@ -483,13 +493,17 @@ end
483
493
484
494
function method .set_settings (opts )
485
495
opts = opts or {}
496
+ local session_opts = {}
486
497
487
498
for key , val in pairs (opts ) do
488
- if key ~= ' ttr' then
499
+ if key == ' ttr' then
500
+ session_opts [key ] = val
501
+ else
489
502
error (' Unknown option ' .. key )
490
503
end
491
504
end
492
505
506
+ session .set_settings (session_opts )
493
507
queue .settings = opts
494
508
end
495
509
@@ -544,27 +558,34 @@ function method.start()
544
558
})
545
559
end
546
560
547
- local _taken = box .space ._queue_taken
561
+ -- Remove deprecated space
562
+ if box .space ._queue_taken ~= nil then
563
+ box .space ._queue_taken :drop ()
564
+ end
565
+
566
+ local _taken = box .space ._queue_taken_2
548
567
if _taken == nil then
549
- -- connection_id, tube_id, task_id, time
550
- _taken = box .schema .create_space (' _queue_taken ' , {
568
+ -- tube_id, task_id, connection_id, session_uuid , time
569
+ _taken = box .schema .create_space (' _queue_taken_2 ' , {
551
570
temporary = true ,
552
571
format = {
553
- {name = ' connection_id' , type = num_type ()},
554
572
{name = ' tube_id' , type = num_type ()},
555
573
{name = ' task_id' , type = num_type ()},
574
+ {name = ' connection_id' , type = num_type ()},
575
+ {name = ' session_uuid' , type = str_type ()},
556
576
{name = ' taken_time' , type = num_type ()}
557
577
}})
558
- _taken :create_index (' pk' , {
559
- type = ' tree' ,
560
- parts = {1 , num_type (), 2 , num_type (), 3 , num_type ()},
561
- unique = true })
562
578
563
- _taken :create_index (' task' ,{
579
+ _taken :create_index (' task' , {
564
580
type = ' tree' ,
565
- parts = {2 , num_type (), 3 , num_type ()},
581
+ parts = {1 , num_type (), 2 , num_type ()},
566
582
unique = true
567
583
})
584
+ _taken :create_index (' uuid' , {
585
+ type = ' tree' ,
586
+ parts = {4 , str_type ()},
587
+ unique = false
588
+ })
568
589
end
569
590
570
591
for _ , tube_tuple in _queue :pairs () do
@@ -577,6 +598,9 @@ function method.start()
577
598
end
578
599
end
579
600
601
+ session .on_session_remove (release_session_tasks )
602
+ session .start ()
603
+
580
604
connection .on_disconnect (queue ._on_consumer_disconnect )
581
605
return queue
582
606
end
@@ -640,6 +664,13 @@ local function build_stats(space)
640
664
return stats
641
665
end
642
666
667
+ --- identify the session.
668
+ -- If "session_uuid" == nil - create a new UUID for the session
669
+ -- and return it, otherwise connect to the existing one.
670
+ function method .identify (session_uuid )
671
+ return session .identify (connection .id (), session_uuid )
672
+ end
673
+
643
674
queue .statistics = function (space )
644
675
if space ~= nil then
645
676
return build_stats (space )
0 commit comments