Skip to content

Commit b2481f7

Browse files
committed
refactoring: move task check to "taken" into a separate function
Needed for #85
1 parent 19e839e commit b2481f7

File tree

1 file changed

+17
-15
lines changed

1 file changed

+17
-15
lines changed

queue/abstract.lua

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ local function tube_release_all_tasks(tube)
7575
log.info(prefix .. ('released %d tasks'):format(released))
7676
end
7777

78+
--- Check whether the task has been taken in a session with
79+
-- connection id == "conn_id".
80+
-- Throw an error if task is not take in the session.
81+
local function check_task_is_taken(tube_id, task_id, conn_id)
82+
local _taken = box.space._queue_taken:get{conn_id, tube_id, task_id}
83+
if _taken == nil then
84+
error("Task was not taken in the session")
85+
end
86+
end
87+
7888
-- tube methods
7989
local tube = {}
8090

@@ -140,10 +150,7 @@ function tube.touch(self, id, delta)
140150
return
141151
end
142152

143-
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
144-
if _taken == nil then
145-
error("Task was not taken in the session")
146-
end
153+
check_task_is_taken(self.tube_id, id, connection.id())
147154

148155
local space_name = box.space._queue:get{self.name}[3]
149156
queue.stat[space_name]:inc('touch')
@@ -153,10 +160,7 @@ end
153160

154161
function tube.ack(self, id)
155162
local conn_id = connection.id()
156-
local _taken = box.space._queue_taken:get{conn_id, self.tube_id, id}
157-
if _taken == nil then
158-
error("Task was not taken in the session")
159-
end
163+
check_task_is_taken(self.tube_id, id, conn_id)
160164
local tube = box.space._queue:get{self.name}
161165
local space_name = tube[3]
162166

@@ -174,10 +178,7 @@ end
174178

175179
local function tube_release_internal(self, id, opts, connection_id)
176180
opts = opts or {}
177-
local _taken = box.space._queue_taken:get{connection_id, self.tube_id, id}
178-
if _taken == nil then
179-
error("Task was not taken in the session")
180-
end
181+
check_task_is_taken(self.tube_id, id, connection_id)
181182

182183
box.space._queue_taken:delete{connection_id, self.tube_id, id}
183184
self:peek(id)
@@ -198,9 +199,10 @@ end
198199

199200
function tube.bury(self, id)
200201
local task = self:peek(id)
201-
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
202-
if _taken ~= nil then
203-
box.space._queue_taken:delete{connection.id(), self.tube_id, id}
202+
local conn_id = connection.id()
203+
local is_taken, _ = pcall(check_task_is_taken, self.tube_id, id, conn_id)
204+
if is_taken then
205+
box.space._queue_taken:delete{conn_id, self.tube_id, id}
204206
end
205207
if task[2] == state.BURIED then
206208
return task

0 commit comments

Comments
 (0)