129
129
-- connection id == "conn_id".
130
130
-- Throw an error on failure.
131
131
local function check_task_on_taken (tube_id , task_id , conn_id )
132
- local _taken = box .space ._queue_taken :get {conn_id , tube_id , task_id }
133
- if _taken == nil then
132
+ local _taken = box .space ._queue_taken . index . task :get {tube_id , task_id }
133
+ if _taken == nil or _taken [ 1 ] ~= conn_id then
134
134
error (" Task was not taken in the session" )
135
135
end
136
136
end
@@ -166,7 +166,7 @@ function tube.ack(self, id)
166
166
167
167
self :peek (id )
168
168
-- delete task
169
- box .space ._queue_taken :delete {conn_id , self .tube_id , id }
169
+ box .space ._queue_taken . index . task :delete {self .tube_id , id }
170
170
local result = self .raw :normalize_task (
171
171
self .raw :delete (id ):transform (2 , 1 , state .DONE )
172
172
)
@@ -180,7 +180,7 @@ local function tube_release_internal(self, id, opts, connection_id)
180
180
opts = opts or {}
181
181
check_task_on_taken (self .tube_id , id , connection_id )
182
182
183
- box .space ._queue_taken :delete {connection_id , self .tube_id , id }
183
+ box .space ._queue_taken . index . task :delete {self .tube_id , id }
184
184
self :peek (id )
185
185
return self .raw :normalize_task (self .raw :release (id , opts ))
186
186
end
@@ -202,7 +202,7 @@ function tube.bury(self, id)
202
202
local conn_id = connection .id ()
203
203
local is_taken , _ = pcall (check_task_on_taken , self .tube_id , id , conn_id )
204
204
if is_taken then
205
- box .space ._queue_taken :delete {conn_id , self .tube_id , id }
205
+ box .space ._queue_taken . index . task :delete {self .tube_id , id }
206
206
end
207
207
if task [2 ] == state .BURIED then
208
208
return task
@@ -428,7 +428,7 @@ function method._on_consumer_disconnect()
428
428
tube = box .space ._queue .index .tube_id :get {task [2 ]}
429
429
if tube == nil then
430
430
log .error (" Inconsistent queue state: tube %d not found" , task [2 ])
431
- box .space ._queue_taken :delete {task [1 ], task [ 2 ], task [3 ] }
431
+ box .space ._queue_taken . index . task :delete {task [2 ], task [3 ]}
432
432
else
433
433
log .warn (" Consumer %s disconnected, release task %s(%s)" ,
434
434
id , task [3 ], tube [1 ])
0 commit comments