@@ -414,15 +414,11 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
414
414
return self
415
415
end
416
416
417
+ -- Sessions that don't have any active connections.
418
+ local inactive_sessions = {}
419
+
417
420
--- Release all session tasks if the session has no actual connections.
418
421
local function cleanup_session_tasks (queue_uuid )
419
- local sesion_ids = box .space ._queue_session_ids .index .queue_uuid :select (
420
- queue_uuid ,
421
- {limit = 1 })[1 ]
422
- if sesion_ids ~= nil then
423
- return
424
- end
425
-
426
422
-- Release all session tasks.
427
423
while true do
428
424
local task = box .space ._queue_taken .index .pk :min {queue_uuid }
@@ -444,6 +440,26 @@ local function cleanup_session_tasks(queue_uuid)
444
440
end
445
441
end
446
442
443
+ --- Remove a connection from the list of active connections and
444
+ -- release its tasks if necessary.
445
+ local function remove_connection (box_sid )
446
+ local queue_session_ids = box .space ._queue_session_ids
447
+ local quuid = get_quuid_by_sid (box_sid )
448
+
449
+ queue_session_ids :delete {box_sid }
450
+ local session_ids = queue_session_ids .index .queue_uuid :select (quuid ,
451
+ {limit = 1 })[1 ]
452
+
453
+ if session_ids == nil then
454
+ local ttr = queue .settings [' ttr' ] or 0
455
+ if ttr > 0 then
456
+ inactive_sessions [quuid ] = event_time (ttr )
457
+ else
458
+ cleanup_session_tasks (quuid )
459
+ end
460
+ end
461
+ end
462
+
447
463
function method ._on_consumer_disconnect ()
448
464
local sid = session .id ()
449
465
@@ -465,11 +481,7 @@ function method._on_consumer_disconnect()
465
481
end
466
482
end
467
483
468
- -- Remove connection from the list of active connections and
469
- -- release tasks if necessary.
470
- local quuid = get_quuid_by_sid (sid )
471
- box .space ._queue_session_ids :delete {sid }
472
- cleanup_session_tasks (quuid )
484
+ remove_connection (sid )
473
485
end
474
486
475
487
-- function takes tuples and recreates tube
@@ -574,6 +586,24 @@ local function identification_init()
574
586
end
575
587
end
576
588
589
+ --- Create an expiration fiber to cleanup expired sessions.
590
+ local function create_expiration_fiber ()
591
+ local exp_fiber = fiber .new (function ()
592
+ while true do
593
+ local cur_time = time ()
594
+ for quuid , exp_time in pairs (inactive_sessions ) do
595
+ if cur_time >= exp_time then
596
+ cleanup_session_tasks (quuid )
597
+ inactive_sessions [quuid ] = nil
598
+ end
599
+ end
600
+ fiber .sleep (1 )
601
+ end
602
+ end )
603
+
604
+ return exp_fiber
605
+ end
606
+
577
607
-- create or join infrastructure
578
608
function method .start ()
579
609
-- tube_name, tube_id, space_name, tube_type, opts
@@ -667,7 +697,7 @@ function method.start()
667
697
end
668
698
669
699
identification_init ()
670
-
700
+ queue . expiration_fiber = create_expiration_fiber ()
671
701
session .on_disconnect (queue ._on_consumer_disconnect )
672
702
return queue
673
703
end
@@ -746,20 +776,29 @@ function method.identificate(queue_uuid)
746
776
queue_session_ids :insert {sid , quuid }
747
777
elseif queue_uuid ~= nil then
748
778
-- Identificate using a previously created session.
779
+ -- Check that a session with this uuid exists.
780
+ local ids_by_uuid = queue_session_ids .index .queue_uuid :select (
781
+ queue_uuid , {limit = 1 })[1 ]
782
+ if ids_by_uuid == nil and inactive_sessions [queue_uuid ] == nil then
783
+ error (" The UUID " .. uuid .frombin (queue_uuid ):str () ..
784
+ " is unknown." )
785
+ end
749
786
750
787
-- Validate UUID.
751
788
uuid .frombin (queue_uuid )
752
789
753
790
if quuid ~= queue_uuid then
754
- queue_session_ids :upsert ({sid , queue_uuid }, {{' =' , 2 , queue_uuid }})
755
791
if quuid ~= nil then
756
- -- Cleanup old session tasks.
757
- cleanup_session_tasks (quuid )
792
+ remove_connection (sid )
758
793
end
794
+ queue_session_ids :insert ({sid , queue_uuid })
759
795
quuid = queue_uuid
760
796
end
761
797
end
762
798
799
+ -- Exclude the session from inactive.
800
+ inactive_sessions [quuid ] = nil
801
+
763
802
return quuid
764
803
end
765
804
0 commit comments