Skip to content

Commit 8efc8c5

Browse files
committed
refactoring: rename session to connection
The term "queue session" will be added to the queue while working on #85. One "queue session" will include many connections (box.session). For clarity, box.session has been renamed to connection. Needed for #85
1 parent d8126d9 commit 8efc8c5

File tree

2 files changed

+29
-28
lines changed

2 files changed

+29
-28
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ Consumers may be simply waiting for tasks to be put in the queues.
245245

246246
## Fields of the `_queue_consumers` space
247247

248-
1. `session` - session (connection) ID of the client
248+
1. `connection_id` - connection ID of the client
249249
1. `fid` - client fiber ID
250250
1. `tube_id` - queue ID, referring to the `tube_id` field in the `_queue`
251251
space; the client waits for tasks in this queue
@@ -257,8 +257,8 @@ processing a task in the queue.
257257

258258
## Fields of the `_queue_taken` space
259259

260-
1. `session` - session (connection) ID of the client, referring to the
261-
`session_id` field of the `_queue_consumers` space
260+
1. `connection_id` - connection ID of the client, referring to the
261+
`connection_id` field of the `_queue_consumers` space
262262
1. `tube_id` - queue ID, to which the task belongs
263263
1. `task_id` - task ID (of the task being taken)
264264
1. `time` - the time when the client began to execute the task

queue/abstract.lua

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ local qc = require('queue.compat')
77
local num_type = qc.num_type
88
local str_type = qc.str_type
99

10-
local session = box.session
10+
local connection = box.session
1111

1212
local queue = {
1313
tube = setmetatable({}, {
@@ -85,7 +85,7 @@ function tube.put(self, data, opts)
8585
end
8686

8787
local conds = {}
88-
local releasing_sessions = {}
88+
local releasing_connections = {}
8989

9090
function tube.take(self, timeout)
9191
timeout = time(timeout or TIMEOUT_INFINITY)
@@ -99,18 +99,18 @@ function tube.take(self, timeout)
9999
local time = event_time(timeout)
100100
local tid = self.tube_id
101101
local fid = fiber.id()
102-
local sid = session.id()
102+
local conn_id = connection.id()
103103

104-
box.space._queue_consumers:insert{sid, fid, tid, time, started}
104+
box.space._queue_consumers:insert{conn_id, fid, tid, time, started}
105105
conds[fid] = qc.waiter()
106106
conds[fid]:wait(tonumber(timeout) / 1000000)
107107
conds[fid]:free()
108-
box.space._queue_consumers:delete{ sid, fid }
108+
box.space._queue_consumers:delete{conn_id, fid}
109109

110-
-- We don't take a task if the session is in a
110+
-- We don't take a task if the connection is in a
111111
-- disconnecting state.
112-
if releasing_sessions[fid] then
113-
releasing_sessions[fid] = nil
112+
if releasing_connections[fid] then
113+
releasing_connections[fid] = nil
114114
return nil
115115
end
116116

@@ -140,7 +140,7 @@ function tube.touch(self, id, delta)
140140
return
141141
end
142142

143-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
143+
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
144144
if _taken == nil then
145145
error("Task was not taken in the session")
146146
end
@@ -152,7 +152,7 @@ function tube.touch(self, id, delta)
152152
end
153153

154154
function tube.ack(self, id)
155-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
155+
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
156156
if _taken == nil then
157157
error("Task was not taken in the session")
158158
end
@@ -161,7 +161,7 @@ function tube.ack(self, id)
161161

162162
self:peek(id)
163163
-- delete task
164-
box.space._queue_taken:delete{session.id(), self.tube_id, id}
164+
box.space._queue_taken:delete{connection.id(), self.tube_id, id}
165165
local result = self.raw:normalize_task(
166166
self.raw:delete(id):transform(2, 1, state.DONE)
167167
)
@@ -171,20 +171,20 @@ function tube.ack(self, id)
171171
return result
172172
end
173173

174-
local function tube_release_internal(self, id, opts, session_id)
174+
local function tube_release_internal(self, id, opts, connection_id)
175175
opts = opts or {}
176-
local _taken = box.space._queue_taken:get{session_id, self.tube_id, id}
176+
local _taken = box.space._queue_taken:get{connection_id, self.tube_id, id}
177177
if _taken == nil then
178178
error("Task was not taken in the session")
179179
end
180180

181-
box.space._queue_taken:delete{session_id, self.tube_id, id}
181+
box.space._queue_taken:delete{connection_id, self.tube_id, id}
182182
self:peek(id)
183183
return self.raw:normalize_task(self.raw:release(id, opts))
184184
end
185185

186186
function tube.release(self, id, opts)
187-
return tube_release_internal(self, id, opts, session.id())
187+
return tube_release_internal(self, id, opts, connection.id())
188188
end
189189

190190
function tube.peek(self, id)
@@ -197,9 +197,9 @@ end
197197

198198
function tube.bury(self, id)
199199
local task = self:peek(id)
200-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
200+
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
201201
if _taken ~= nil then
202-
box.space._queue_taken:delete{session.id(), self.tube_id, id}
202+
box.space._queue_taken:delete{connection.id(), self.tube_id, id}
203203
end
204204
if task[2] == state.BURIED then
205205
return task
@@ -362,7 +362,8 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
362362
end
363363
-- task switched to taken - register in taken space
364364
elseif task[2] == state.TAKEN then
365-
queue_taken:insert{session.id(), self.tube_id, task[1], fiber.time64()}
365+
queue_taken:insert{connection.id(), self.tube_id, task[1],
366+
fiber.time64()}
366367
end
367368
if stats_data ~= nil then
368369
queue.stat[space.name]:inc(stats_data)
@@ -395,7 +396,7 @@ end
395396

396397
function method._on_consumer_disconnect()
397398
local waiter, fb, task, tube, id
398-
id = session.id()
399+
id = connection.id()
399400
-- wakeup all waiters
400401
while true do
401402
waiter = box.space._queue_consumers.index.pk:min{id}
@@ -409,7 +410,7 @@ function method._on_consumer_disconnect()
409410
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
410411
local cond = conds[waiter[2]]
411412
if cond then
412-
releasing_sessions[waiter[2]] = true
413+
releasing_connections[waiter[2]] = true
413414
cond:signal(waiter[2])
414415
end
415416
end
@@ -523,11 +524,11 @@ function method.start()
523524

524525
local _cons = box.space._queue_consumers
525526
if _cons == nil then
526-
-- session, fid, tube, time
527+
-- connection, fid, tube, time
527528
_cons = box.schema.create_space('_queue_consumers', {
528529
temporary = true,
529530
format = {
530-
{name = 'session_id', type = num_type()},
531+
{name = 'connection_id', type = num_type()},
531532
{name = 'fiber_id', type = num_type()},
532533
{name = 'tube_id', type = num_type()},
533534
{name = 'event_time', type = num_type()},
@@ -548,11 +549,11 @@ function method.start()
548549

549550
local _taken = box.space._queue_taken
550551
if _taken == nil then
551-
-- session_id, tube_id, task_id, time
552+
-- connection_id, tube_id, task_id, time
552553
_taken = box.schema.create_space('_queue_taken', {
553554
temporary = true,
554555
format = {
555-
{name = 'session_id', type = num_type()},
556+
{name = 'connection_id', type = num_type()},
556557
{name = 'tube_id', type = num_type()},
557558
{name = 'task_id', type = num_type()},
558559
{name = 'taken_time', type = num_type()}
@@ -579,7 +580,7 @@ function method.start()
579580
end
580581
end
581582

582-
session.on_disconnect(queue._on_consumer_disconnect)
583+
connection.on_disconnect(queue._on_consumer_disconnect)
583584
return queue
584585
end
585586

0 commit comments

Comments
 (0)