Skip to content

Commit 31d45ea

Browse files
committed
minimize using of the "pk" index of the "_queue_taken" space
In the future, we will refuse to use the "pk" index of the "_queue_taken" space to improve performance (the conclusion about the improvement in performance is based on the benchmarks that will be added in one of the next commits). So, let's minimize its use. Needed for #85
1 parent fd83a27 commit 31d45ea

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

queue/abstract.lua

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ end
7979
-- connection id == "conn_id".
8080
-- Throw an error if task is not take in the session.
8181
local function check_task_is_taken(tube_id, task_id, conn_id)
82-
local _taken = box.space._queue_taken:get{conn_id, tube_id, task_id}
83-
if _taken == nil then
82+
local _taken = box.space._queue_taken.index.task:get{tube_id, task_id}
83+
if _taken == nil or _taken[1] ~= conn_id then
8484
error("Task was not taken in the session")
8585
end
8686
end
@@ -166,7 +166,7 @@ function tube.ack(self, id)
166166

167167
self:peek(id)
168168
-- delete task
169-
box.space._queue_taken:delete{conn_id, self.tube_id, id}
169+
box.space._queue_taken.index.task:delete{self.tube_id, id}
170170
local result = self.raw:normalize_task(
171171
self.raw:delete(id):transform(2, 1, state.DONE)
172172
)
@@ -180,7 +180,7 @@ local function tube_release_internal(self, id, opts, connection_id)
180180
opts = opts or {}
181181
check_task_is_taken(self.tube_id, id, connection_id)
182182

183-
box.space._queue_taken:delete{connection_id, self.tube_id, id}
183+
box.space._queue_taken.index.task:delete{self.tube_id, id}
184184
self:peek(id)
185185
return self.raw:normalize_task(self.raw:release(id, opts))
186186
end
@@ -202,7 +202,7 @@ function tube.bury(self, id)
202202
local conn_id = connection.id()
203203
local is_taken, _ = pcall(check_task_is_taken, self.tube_id, id, conn_id)
204204
if is_taken then
205-
box.space._queue_taken:delete{conn_id, self.tube_id, id}
205+
box.space._queue_taken.index.task:delete{self.tube_id, id}
206206
end
207207
if task[2] == state.BURIED then
208208
return task
@@ -428,7 +428,7 @@ function method._on_consumer_disconnect()
428428
tube = box.space._queue.index.tube_id:get{task[2]}
429429
if tube == nil then
430430
log.error("Inconsistent queue state: tube %d not found", task[2])
431-
box.space._queue_taken:delete{task[1], task[2], task[3] }
431+
box.space._queue_taken.index.task:delete{task[2], task[3]}
432432
else
433433
log.warn("Consumer %s disconnected, release task %s(%s)",
434434
id, task[3], tube[1])

0 commit comments

Comments
 (0)