@@ -7,7 +7,7 @@ local qc = require('queue.compat')
7
7
local num_type = qc .num_type
8
8
local str_type = qc .str_type
9
9
10
- local session = box .session
10
+ local connection = box .session
11
11
12
12
local queue = {
13
13
tube = setmetatable ({}, {
@@ -85,7 +85,7 @@ function tube.put(self, data, opts)
85
85
end
86
86
87
87
local conds = {}
88
- local releasing_sessions = {}
88
+ local releasing_connections = {}
89
89
90
90
function tube .take (self , timeout )
91
91
timeout = time (timeout or TIMEOUT_INFINITY )
@@ -99,18 +99,18 @@ function tube.take(self, timeout)
99
99
local time = event_time (timeout )
100
100
local tid = self .tube_id
101
101
local fid = fiber .id ()
102
- local sid = session .id ()
102
+ local conn_id = connection .id ()
103
103
104
- box .space ._queue_consumers :insert {sid , fid , tid , time , started }
104
+ box .space ._queue_consumers :insert {conn_id , fid , tid , time , started }
105
105
conds [fid ] = qc .waiter ()
106
106
conds [fid ]:wait (tonumber (timeout ) / 1000000 )
107
107
conds [fid ]:free ()
108
- box .space ._queue_consumers :delete { sid , fid }
108
+ box .space ._queue_consumers :delete {conn_id , fid }
109
109
110
- -- We don't take a task if the session is in a
110
+ -- We don't take a task if the connection is in a
111
111
-- disconnecting state.
112
- if releasing_sessions [fid ] then
113
- releasing_sessions [fid ] = nil
112
+ if releasing_connections [fid ] then
113
+ releasing_connections [fid ] = nil
114
114
return nil
115
115
end
116
116
@@ -140,7 +140,7 @@ function tube.touch(self, id, delta)
140
140
return
141
141
end
142
142
143
- local _taken = box .space ._queue_taken :get {session .id (), self .tube_id , id }
143
+ local _taken = box .space ._queue_taken :get {connection .id (), self .tube_id , id }
144
144
if _taken == nil then
145
145
error (" Task was not taken in the session" )
146
146
end
@@ -152,7 +152,8 @@ function tube.touch(self, id, delta)
152
152
end
153
153
154
154
function tube .ack (self , id )
155
- local _taken = box .space ._queue_taken :get {session .id (), self .tube_id , id }
155
+ local conn_id = connection .id ()
156
+ local _taken = box .space ._queue_taken :get {conn_id , self .tube_id , id }
156
157
if _taken == nil then
157
158
error (" Task was not taken in the session" )
158
159
end
@@ -161,7 +162,7 @@ function tube.ack(self, id)
161
162
162
163
self :peek (id )
163
164
-- delete task
164
- box .space ._queue_taken :delete {session . id () , self .tube_id , id }
165
+ box .space ._queue_taken :delete {conn_id , self .tube_id , id }
165
166
local result = self .raw :normalize_task (
166
167
self .raw :delete (id ):transform (2 , 1 , state .DONE )
167
168
)
@@ -171,20 +172,20 @@ function tube.ack(self, id)
171
172
return result
172
173
end
173
174
174
- local function tube_release_internal (self , id , opts , session_id )
175
+ local function tube_release_internal (self , id , opts , connection_id )
175
176
opts = opts or {}
176
- local _taken = box .space ._queue_taken :get {session_id , self .tube_id , id }
177
+ local _taken = box .space ._queue_taken :get {connection_id , self .tube_id , id }
177
178
if _taken == nil then
178
179
error (" Task was not taken in the session" )
179
180
end
180
181
181
- box .space ._queue_taken :delete {session_id , self .tube_id , id }
182
+ box .space ._queue_taken :delete {connection_id , self .tube_id , id }
182
183
self :peek (id )
183
184
return self .raw :normalize_task (self .raw :release (id , opts ))
184
185
end
185
186
186
187
function tube .release (self , id , opts )
187
- return tube_release_internal (self , id , opts , session .id ())
188
+ return tube_release_internal (self , id , opts , connection .id ())
188
189
end
189
190
190
191
function tube .peek (self , id )
197
198
198
199
function tube .bury (self , id )
199
200
local task = self :peek (id )
200
- local _taken = box .space ._queue_taken :get {session .id (), self .tube_id , id }
201
+ local _taken = box .space ._queue_taken :get {connection .id (), self .tube_id , id }
201
202
if _taken ~= nil then
202
- box .space ._queue_taken :delete {session .id (), self .tube_id , id }
203
+ box .space ._queue_taken :delete {connection .id (), self .tube_id , id }
203
204
end
204
205
if task [2 ] == state .BURIED then
205
206
return task
@@ -362,7 +363,8 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
362
363
end
363
364
-- task switched to taken - register in taken space
364
365
elseif task [2 ] == state .TAKEN then
365
- queue_taken :insert {session .id (), self .tube_id , task [1 ], fiber .time64 ()}
366
+ queue_taken :insert {connection .id (), self .tube_id , task [1 ],
367
+ fiber .time64 ()}
366
368
end
367
369
if stats_data ~= nil then
368
370
queue .stat [space .name ]:inc (stats_data )
395
397
396
398
function method ._on_consumer_disconnect ()
397
399
local waiter , fb , task , tube , id
398
- id = session .id ()
400
+ id = connection .id ()
399
401
-- wakeup all waiters
400
402
while true do
401
403
waiter = box .space ._queue_consumers .index .pk :min {id }
@@ -409,7 +411,7 @@ function method._on_consumer_disconnect()
409
411
box .space ._queue_consumers :delete { waiter [1 ], waiter [2 ] }
410
412
local cond = conds [waiter [2 ]]
411
413
if cond then
412
- releasing_sessions [waiter [2 ]] = true
414
+ releasing_connections [waiter [2 ]] = true
413
415
cond :signal (waiter [2 ])
414
416
end
415
417
end
@@ -523,11 +525,11 @@ function method.start()
523
525
524
526
local _cons = box .space ._queue_consumers
525
527
if _cons == nil then
526
- -- session , fid, tube, time
528
+ -- connection , fid, tube, time
527
529
_cons = box .schema .create_space (' _queue_consumers' , {
528
530
temporary = true ,
529
531
format = {
530
- {name = ' session_id ' , type = num_type ()},
532
+ {name = ' connection_id ' , type = num_type ()},
531
533
{name = ' fiber_id' , type = num_type ()},
532
534
{name = ' tube_id' , type = num_type ()},
533
535
{name = ' event_time' , type = num_type ()},
@@ -548,11 +550,11 @@ function method.start()
548
550
549
551
local _taken = box .space ._queue_taken
550
552
if _taken == nil then
551
- -- session_id , tube_id, task_id, time
553
+ -- connection_id , tube_id, task_id, time
552
554
_taken = box .schema .create_space (' _queue_taken' , {
553
555
temporary = true ,
554
556
format = {
555
- {name = ' session_id ' , type = num_type ()},
557
+ {name = ' connection_id ' , type = num_type ()},
556
558
{name = ' tube_id' , type = num_type ()},
557
559
{name = ' task_id' , type = num_type ()},
558
560
{name = ' taken_time' , type = num_type ()}
@@ -579,7 +581,7 @@ function method.start()
579
581
end
580
582
end
581
583
582
- session .on_disconnect (queue ._on_consumer_disconnect )
584
+ connection .on_disconnect (queue ._on_consumer_disconnect )
583
585
return queue
584
586
end
585
587
0 commit comments