Skip to content

Commit 19e839e

Browse files
committed
refactoring: rename session to connection
The term "queue session" will be added to the queue while working on #85. One "queue session" will include many connections (box.session). For clarity, box.session has been renamed to connection. Needed for #85
1 parent 66c9c4e commit 19e839e

File tree

2 files changed

+30
-28
lines changed

2 files changed

+30
-28
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ Consumers may be simply waiting for tasks to be put in the queues.
245245

246246
## Fields of the `_queue_consumers` space
247247

248-
1. `session` - session (connection) ID of the client
248+
1. `connection_id` - connection ID of the client
249249
1. `fid` - client fiber ID
250250
1. `tube_id` - queue ID, referring to the `tube_id` field in the `_queue`
251251
space; the client waits for tasks in this queue
@@ -257,8 +257,8 @@ processing a task in the queue.
257257

258258
## Fields of the `_queue_taken` space
259259

260-
1. `session` - session (connection) ID of the client, referring to the
261-
`session_id` field of the `_queue_consumers` space
260+
1. `connection_id` - connection ID of the client, referring to the
261+
`connection_id` field of the `_queue_consumers` space
262262
1. `tube_id` - queue ID, to which the task belongs
263263
1. `task_id` - task ID (of the task being taken)
264264
1. `time` - the time when the client began to execute the task

queue/abstract.lua

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ local qc = require('queue.compat')
77
local num_type = qc.num_type
88
local str_type = qc.str_type
99

10-
local session = box.session
10+
local connection = box.session
1111

1212
local queue = {
1313
tube = setmetatable({}, {
@@ -85,7 +85,7 @@ function tube.put(self, data, opts)
8585
end
8686

8787
local conds = {}
88-
local releasing_sessions = {}
88+
local releasing_connections = {}
8989

9090
function tube.take(self, timeout)
9191
timeout = time(timeout or TIMEOUT_INFINITY)
@@ -99,18 +99,18 @@ function tube.take(self, timeout)
9999
local time = event_time(timeout)
100100
local tid = self.tube_id
101101
local fid = fiber.id()
102-
local sid = session.id()
102+
local conn_id = connection.id()
103103

104-
box.space._queue_consumers:insert{sid, fid, tid, time, started}
104+
box.space._queue_consumers:insert{conn_id, fid, tid, time, started}
105105
conds[fid] = qc.waiter()
106106
conds[fid]:wait(tonumber(timeout) / 1000000)
107107
conds[fid]:free()
108-
box.space._queue_consumers:delete{ sid, fid }
108+
box.space._queue_consumers:delete{conn_id, fid}
109109

110-
-- We don't take a task if the session is in a
110+
-- We don't take a task if the connection is in a
111111
-- disconnecting state.
112-
if releasing_sessions[fid] then
113-
releasing_sessions[fid] = nil
112+
if releasing_connections[fid] then
113+
releasing_connections[fid] = nil
114114
return nil
115115
end
116116

@@ -140,7 +140,7 @@ function tube.touch(self, id, delta)
140140
return
141141
end
142142

143-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
143+
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
144144
if _taken == nil then
145145
error("Task was not taken in the session")
146146
end
@@ -152,7 +152,8 @@ function tube.touch(self, id, delta)
152152
end
153153

154154
function tube.ack(self, id)
155-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
155+
local conn_id = connection.id()
156+
local _taken = box.space._queue_taken:get{conn_id, self.tube_id, id}
156157
if _taken == nil then
157158
error("Task was not taken in the session")
158159
end
@@ -161,7 +162,7 @@ function tube.ack(self, id)
161162

162163
self:peek(id)
163164
-- delete task
164-
box.space._queue_taken:delete{session.id(), self.tube_id, id}
165+
box.space._queue_taken:delete{conn_id, self.tube_id, id}
165166
local result = self.raw:normalize_task(
166167
self.raw:delete(id):transform(2, 1, state.DONE)
167168
)
@@ -171,20 +172,20 @@ function tube.ack(self, id)
171172
return result
172173
end
173174

174-
local function tube_release_internal(self, id, opts, session_id)
175+
local function tube_release_internal(self, id, opts, connection_id)
175176
opts = opts or {}
176-
local _taken = box.space._queue_taken:get{session_id, self.tube_id, id}
177+
local _taken = box.space._queue_taken:get{connection_id, self.tube_id, id}
177178
if _taken == nil then
178179
error("Task was not taken in the session")
179180
end
180181

181-
box.space._queue_taken:delete{session_id, self.tube_id, id}
182+
box.space._queue_taken:delete{connection_id, self.tube_id, id}
182183
self:peek(id)
183184
return self.raw:normalize_task(self.raw:release(id, opts))
184185
end
185186

186187
function tube.release(self, id, opts)
187-
return tube_release_internal(self, id, opts, session.id())
188+
return tube_release_internal(self, id, opts, connection.id())
188189
end
189190

190191
function tube.peek(self, id)
@@ -197,9 +198,9 @@ end
197198

198199
function tube.bury(self, id)
199200
local task = self:peek(id)
200-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
201+
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
201202
if _taken ~= nil then
202-
box.space._queue_taken:delete{session.id(), self.tube_id, id}
203+
box.space._queue_taken:delete{connection.id(), self.tube_id, id}
203204
end
204205
if task[2] == state.BURIED then
205206
return task
@@ -362,7 +363,8 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
362363
end
363364
-- task switched to taken - register in taken space
364365
elseif task[2] == state.TAKEN then
365-
queue_taken:insert{session.id(), self.tube_id, task[1], fiber.time64()}
366+
queue_taken:insert{connection.id(), self.tube_id, task[1],
367+
fiber.time64()}
366368
end
367369
if stats_data ~= nil then
368370
queue.stat[space.name]:inc(stats_data)
@@ -395,7 +397,7 @@ end
395397

396398
function method._on_consumer_disconnect()
397399
local waiter, fb, task, tube, id
398-
id = session.id()
400+
id = connection.id()
399401
-- wakeup all waiters
400402
while true do
401403
waiter = box.space._queue_consumers.index.pk:min{id}
@@ -409,7 +411,7 @@ function method._on_consumer_disconnect()
409411
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
410412
local cond = conds[waiter[2]]
411413
if cond then
412-
releasing_sessions[waiter[2]] = true
414+
releasing_connections[waiter[2]] = true
413415
cond:signal(waiter[2])
414416
end
415417
end
@@ -523,11 +525,11 @@ function method.start()
523525

524526
local _cons = box.space._queue_consumers
525527
if _cons == nil then
526-
-- session, fid, tube, time
528+
-- connection, fid, tube, time
527529
_cons = box.schema.create_space('_queue_consumers', {
528530
temporary = true,
529531
format = {
530-
{name = 'session_id', type = num_type()},
532+
{name = 'connection_id', type = num_type()},
531533
{name = 'fiber_id', type = num_type()},
532534
{name = 'tube_id', type = num_type()},
533535
{name = 'event_time', type = num_type()},
@@ -548,11 +550,11 @@ function method.start()
548550

549551
local _taken = box.space._queue_taken
550552
if _taken == nil then
551-
-- session_id, tube_id, task_id, time
553+
-- connection_id, tube_id, task_id, time
552554
_taken = box.schema.create_space('_queue_taken', {
553555
temporary = true,
554556
format = {
555-
{name = 'session_id', type = num_type()},
557+
{name = 'connection_id', type = num_type()},
556558
{name = 'tube_id', type = num_type()},
557559
{name = 'task_id', type = num_type()},
558560
{name = 'taken_time', type = num_type()}
@@ -579,7 +581,7 @@ function method.start()
579581
end
580582
end
581583

582-
session.on_disconnect(queue._on_consumer_disconnect)
584+
connection.on_disconnect(queue._on_consumer_disconnect)
583585
return queue
584586
end
585587

0 commit comments

Comments
 (0)