@@ -374,6 +374,28 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
374
374
return self
375
375
end
376
376
377
+ --- Release all session tasks.
378
+ local function release_session_tasks (connection_id )
379
+ while true do
380
+ local task = box .space ._queue_taken .index .pk :min {connection_id }
381
+ if task == nil or task [1 ] ~= connection_id then
382
+ break
383
+ end
384
+
385
+ local tube = box .space ._queue .index .tube_id :get {task [2 ]}
386
+ if tube == nil then
387
+ log .error (" Inconsistent queue state: tube %d not found" , task [2 ])
388
+ box .space ._queue_taken .index .task :delete {task [2 ], task [3 ]}
389
+ else
390
+ log .warn (" Consumer %s disconnected, release task %s(%s)" ,
391
+ connection_id , task [3 ], tube [1 ])
392
+
393
+ tube_release_internal (queue .tube [tube [1 ]], task [3 ], nil ,
394
+ connection_id )
395
+ end
396
+ end
397
+ end
398
+
377
399
function method ._on_consumer_disconnect ()
378
400
local waiter , fb , task , tube , id
379
401
id = connection .id ()
@@ -395,24 +417,7 @@ function method._on_consumer_disconnect()
395
417
end
396
418
end
397
419
398
- -- release all session tasks
399
- while true do
400
- task = box .space ._queue_taken .index .pk :min {id }
401
- if task == nil or task [1 ] ~= id then
402
- break
403
- end
404
-
405
- tube = box .space ._queue .index .tube_id :get {task [2 ]}
406
- if tube == nil then
407
- log .error (" Inconsistent queue state: tube %d not found" , task [2 ])
408
- box .space ._queue_taken .index .task :delete {task [2 ], task [3 ]}
409
- else
410
- log .warn (" Consumer %s disconnected, release task %s(%s)" ,
411
- id , task [3 ], tube [1 ])
412
-
413
- tube_release_internal (queue .tube [tube [1 ]], task [3 ], nil , id )
414
- end
415
- end
420
+ release_session_tasks (id )
416
421
end
417
422
418
423
-- function takes tuples and recreates tube
0 commit comments