@@ -63,6 +63,26 @@ queue.driver = {
63
63
limfifottl = require (' queue.abstract.driver.limfifottl' )
64
64
}
65
65
66
+ local function tube_release_all_tasks (tube )
67
+ local prefix = (' queue: [tube "%s"] ' ):format (tube .name )
68
+
69
+ -- We lean on stable iterators in this function.
70
+ -- https://github.com/tarantool/tarantool/issues/1796
71
+ if not qc .check_version ({1 , 7 , 5 }) then
72
+ log .error (prefix .. ' no stable iterator support: skip task releasing' )
73
+ log .error (prefix .. ' some tasks may stuck in taken state perpetually' )
74
+ log .error (prefix .. ' update tarantool to >= 1.7.5 or take the risk' )
75
+ end
76
+
77
+ log .info (prefix .. ' releasing all taken task (may take a while)' )
78
+ local released = 0
79
+ for _ , task in tube .raw :tasks_by_state (state .TAKEN ) do
80
+ tube .raw :release (task [1 ])
81
+ released = released + 1
82
+ end
83
+ log .info (prefix .. (' released %d tasks' ):format (released ))
84
+ end
85
+
66
86
-- tube methods
67
87
local tube = {}
68
88
@@ -512,14 +532,9 @@ function method.start()
512
532
end
513
533
514
534
for _ , tube_tuple in _queue :pairs () do
515
- local space_name = tube_tuple [3 ]
516
- tube = recreate_tube (tube_tuple )
517
- for _ , task in tube .raw :tasks_by_state (state .TAKEN ) do
518
- -- Release all taken tasks on start
519
- -- See https://github.com/tarantool/queue/issues/66
520
- -- for more information
521
- tube .raw :release (task [1 ])
522
- end
535
+ local tube = recreate_tube (tube_tuple )
536
+ -- gh-66: release all taken tasks on start
537
+ tube_release_all_tasks (tube )
523
538
end
524
539
525
540
session .on_disconnect (queue ._on_consumer_disconnect )
0 commit comments