Skip to content

Commit 94c0997

Browse files
committed
Release all taken tasks on start
All taken tasks will be released after the server restart If some tasks have been taken and don't released before shutdown of the tarantool instance (for example: tarantool instance has been killed) such task go to 'hung' state (noone can take the task now). So, we must release all taken tasks on start of the queue module. Fixes #66
1 parent a812c10 commit 94c0997

File tree

10 files changed

+154
-5
lines changed

10 files changed

+154
-5
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,7 @@ If there are no 'ready' tasks in the queue, returns nil.
637637
* `tube:bury(task_id)` - buries a task
638638
* `tube:kick(count)` - digs out `count` tasks
639639
* `tube:peek(task_id)` - return the task state by ID
640+
* `tube:tasks_by_state(task_state)` - return the iterator to tasks in a certain state
640641
* `tube:truncate()` - delete all tasks from the tube. Note that `tube:truncate`
641642
must be called only by the user who created this tube (has space ownership) OR
642643
under a `setuid` function. Read more about `setuid` functions

queue/abstract.lua

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,16 @@ function method.start()
511511
})
512512
end
513513

514-
_queue:pairs():each(recreate_tube)
514+
for _, tube_tuple in _queue:pairs() do
515+
local space_name = tube_tuple[3]
516+
tube = recreate_tube(tube_tuple)
517+
for _, task in tube.raw:tasks_by_state(state.TAKEN) do
518+
-- Release all taken tasks on start
519+
-- See https://github.com/tarantool/queue/issues/66
520+
-- for more information
521+
tube.raw:release(task[1])
522+
end
523+
end
515524

516525
session.on_disconnect(queue._on_consumer_disconnect)
517526
return queue

queue/abstract/driver/fifo.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ function method.peek(self, id)
121121
return self.space:get{id}
122122
end
123123

124+
-- get iterator to tasks in a certain state
125+
function method.tasks_by_state(self, task_state)
126+
return self.space.index.status:pairs(task_state)
127+
end
128+
124129
function method.truncate(self)
125130
self.space:truncate()
126131
end

queue/abstract/driver/fifottl.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,11 @@ function method.peek(self, id)
348348
return self.space:get{id}
349349
end
350350

351+
-- get iterator to tasks in a certain state
352+
function method.tasks_by_state(self, task_state)
353+
return self.space.index.status:pairs(task_state)
354+
end
355+
351356
function method.truncate(self)
352357
self.space:truncate()
353358
end

queue/abstract/driver/utube.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ function method.peek(self, id)
151151
return self.space:get{id}
152152
end
153153

154+
-- get iterator to tasks in a certain state
155+
function method.tasks_by_state(self, task_state)
156+
return self.space.index.status:pairs(task_state)
157+
end
158+
154159
function method.truncate(self)
155160
self.space:truncate()
156161
end

queue/abstract/driver/utubettl.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,11 @@ function method.peek(self, id)
370370
return self.space:get{id}
371371
end
372372

373+
-- get iterator to tasks in a certain state
374+
function method.tasks_by_state(self, task_state)
375+
return self.space.index.status:pairs(task_state)
376+
end
377+
373378
function method.truncate(self)
374379
self.space:truncate()
375380
end

t/010-fifo.t

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ local yaml = require('yaml')
33
local fiber = require('fiber')
44

55
local test = require('tap').test()
6-
test:plan(14)
6+
test:plan(15)
77

88
local queue = require('queue')
99
local state = require('queue.abstract.state')
@@ -292,6 +292,36 @@ test:test('if_not_exists test', function(test)
292292
test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists")
293293
end)
294294

295+
test:test('Get tasks by state test', function(test)
296+
test:plan(2)
297+
local tube = queue.create_tube('test_task_it', 'fifo')
298+
299+
for i = 1, 10 do
300+
tube:put('test_data' .. tostring(i))
301+
end
302+
for i = 1, 4 do
303+
tube:take(0.001)
304+
end
305+
306+
local count_taken = 0
307+
local count_ready = 0
308+
309+
for _, task in tube.raw:tasks_by_state(state.READY) do
310+
if task[2] == state.READY then
311+
count_ready = count_ready + 1
312+
end
313+
end
314+
315+
for _, task in tube.raw:tasks_by_state(state.TAKEN) do
316+
if task[2] == state.TAKEN then
317+
count_taken = count_taken + 1
318+
end
319+
end
320+
321+
test:is(count_ready, 6, 'Check tasks count in a ready state')
322+
test:is(count_taken, 4, 'Check tasks count in a taken state')
323+
end)
324+
295325
tnt.finish()
296326
os.exit(test:check() == true and 0 or -1)
297327
-- vim: set ft=lua :

t/020-fifottl.t

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
local fiber = require('fiber')
33

44
local test = require('tap').test()
5-
test:plan(15)
5+
test:plan(16)
66

77
local queue = require('queue')
88
local state = require('queue.abstract.state')
@@ -262,6 +262,35 @@ test:test('buried task in a dropped queue', function(test)
262262
test:ok(true, 'queue does not hang')
263263
end)
264264

265+
test:test('Get tasks by state test', function(test)
266+
test:plan(2)
267+
local tube = queue.create_tube('test_task_it', 'fifottl')
268+
269+
for i = 1, 10 do
270+
tube:put('test_data' .. tostring(i))
271+
end
272+
for i = 1, 4 do
273+
tube:take(0.001)
274+
end
275+
276+
local count_taken = 0
277+
local count_ready = 0
278+
279+
for _, task in tube.raw:tasks_by_state(state.READY) do
280+
if task[2] == state.READY then
281+
count_ready = count_ready + 1
282+
end
283+
end
284+
285+
for _, task in tube.raw:tasks_by_state(state.TAKEN) do
286+
if task[2] == state.TAKEN then
287+
count_taken = count_taken + 1
288+
end
289+
end
290+
291+
test:is(count_ready, 6, 'Check tasks count in a ready state')
292+
test:is(count_taken, 4, 'Check tasks count in a taken state')
293+
end)
265294

266295
tnt.finish()
267296
os.exit(test:check() == true and 0 or -1)

t/030-utube.t

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ local yaml = require('yaml')
33
local fiber = require('fiber')
44

55
local test = (require('tap')).test()
6-
test:plan(11)
6+
test:plan(12)
77

88
local queue = require('queue')
99
local state = require('queue.abstract.state')
@@ -156,6 +156,36 @@ test:test('if_not_exists test', function(test)
156156
test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists")
157157
end)
158158

159+
test:test('Get tasks by state test', function(test)
160+
test:plan(2)
161+
local tube = queue.create_tube('test_task_it', 'utube')
162+
163+
for i = 1, 10 do
164+
tube:put('test_data' .. tostring(i), { utube = i })
165+
end
166+
for i = 1, 4 do
167+
tube:take(0.001)
168+
end
169+
170+
local count_taken = 0
171+
local count_ready = 0
172+
173+
for _, task in tube.raw:tasks_by_state(state.READY) do
174+
if task[2] == state.READY then
175+
count_ready = count_ready + 1
176+
end
177+
end
178+
179+
for _, task in tube.raw:tasks_by_state(state.TAKEN) do
180+
if task[2] == state.TAKEN then
181+
count_taken = count_taken + 1
182+
end
183+
end
184+
185+
test:is(count_ready, 6, 'Check tasks count in a ready state')
186+
test:is(count_taken, 4, 'Check tasks count in a taken state')
187+
end)
188+
159189
tnt.finish()
160190
os.exit(test:check() == true and 0 or -1)
161191
-- vim: set ft=lua :

t/040-utubettl.t

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ local yaml = require('yaml')
33
local fiber = require('fiber')
44

55
local test = (require('tap')).test()
6-
test:plan(16)
6+
test:plan(17)
77

88
local queue = require('queue')
99
local state = require('queue.abstract.state')
@@ -257,6 +257,36 @@ test:test('ttl after delay test', function(test)
257257
test:is(task.ttr, TTR * 1000000, 'check TTR after release')
258258
end)
259259

260+
test:test('Get tasks by state test', function(test)
261+
test:plan(2)
262+
local tube = queue.create_tube('test_task_it', 'utubettl')
263+
264+
for i = 1, 10 do
265+
tube:put('test_data' .. tostring(i), { utube = i })
266+
end
267+
for i = 1, 4 do
268+
tube:take(0.001)
269+
end
270+
271+
local count_taken = 0
272+
local count_ready = 0
273+
274+
for _, task in tube.raw:tasks_by_state(state.READY) do
275+
if task[2] == state.READY then
276+
count_ready = count_ready + 1
277+
end
278+
end
279+
280+
for _, task in tube.raw:tasks_by_state(state.TAKEN) do
281+
if task[2] == state.TAKEN then
282+
count_taken = count_taken + 1
283+
end
284+
end
285+
286+
test:is(count_ready, 6, 'Check tasks count in a ready state')
287+
test:is(count_taken, 4, 'Check tasks count in a taken state')
288+
end)
289+
260290
tnt.finish()
261291
os.exit(test:check() == true and 0 or -1)
262292
-- vim: set ft=lua :

0 commit comments

Comments
 (0)