Skip to content

Commit d8de8b9

Browse files
committed
refactoring: move time functions to util
The same time functions are used in several files, so move them to a separate file. Needed for #85
1 parent 211e4e8 commit d8de8b9

File tree

6 files changed

+71
-84
lines changed

6 files changed

+71
-84
lines changed

queue-scm-1.rockspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ build = {
2424
['queue.abstract.driver.utube'] = 'queue/abstract/driver/utube.lua',
2525
['queue.abstract.driver.limfifottl'] = 'queue/abstract/driver/limfifottl.lua',
2626
['queue.compat'] = 'queue/compat.lua',
27+
['queue.util'] = 'queue/util.lua',
2728
['queue'] = 'queue/init.lua'
2829
}
2930
}

queue/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/init.lua
22
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/)
33
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/compat.lua
44
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/)
5+
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/util.lua
6+
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/)
57
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract.lua
68
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/)
79
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/state.lua

queue/abstract.lua

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ local fiber = require('fiber')
33

44
local state = require('queue.abstract.state')
55

6+
local util = require('queue.util')
67
local qc = require('queue.compat')
78
local num_type = qc.num_type
89
local str_type = qc.str_type
@@ -29,30 +30,6 @@ local queue = {
2930
}),
3031
stat = {}
3132
}
32-
local MAX_TIMEOUT = 365 * 86400 * 100 -- MAX_TIMEOUT == 100 years
33-
local TIMEOUT_INFINITY = 18446744073709551615ULL -- Set to TIMEOUT_INFINITY
34-
-- instead
35-
-- returns time for next event
36-
local function time(tm)
37-
if tm == nil then
38-
tm = fiber.time64()
39-
elseif tm < 0 then
40-
tm = 0
41-
else
42-
tm = tm * 1000000
43-
end
44-
return 0ULL + tm
45-
end
46-
47-
local function event_time(tm)
48-
if tm == nil or tm < 0 then
49-
tm = 0
50-
elseif tm > MAX_TIMEOUT then
51-
return TIMEOUT_INFINITY
52-
end
53-
tm = 0ULL + tm * 1000000 + fiber.time64()
54-
return tm
55-
end
5633

5734
local function tube_release_all_tasks(tube)
5835
local prefix = ('queue: [tube "%s"] '):format(tube.name)
@@ -88,15 +65,15 @@ local conds = {}
8865
local releasing_connections = {}
8966

9067
function tube.take(self, timeout)
91-
timeout = time(timeout or TIMEOUT_INFINITY)
68+
timeout = util.time(timeout or util.timeout_infinity)
9269
local task = self.raw:take()
9370
if task ~= nil then
9471
return self.raw:normalize_task(task)
9572
end
9673

9774
while timeout > 0 do
9875
local started = fiber.time64()
99-
local time = event_time(timeout)
76+
local time = util.event_time(timeout)
10077
local tid = self.tube_id
10178
local fid = fiber.id()
10279
local conn_id = connection.id()
@@ -141,8 +118,8 @@ function tube.touch(self, id, delta)
141118
end
142119
if delta < 0 then -- if delta is lesser then 0, then it's zero
143120
delta = 0
144-
elseif delta > MAX_TIMEOUT then -- no ttl/ttr for this task
145-
delta = TIMEOUT_INFINITY
121+
elseif delta > util.max_timeout then -- no ttl/ttr for this task
122+
delta = util.timeout_infinity
146123
else -- convert to usec
147124
delta = delta * 1000000
148125
end

queue/abstract/driver/fifottl.lua

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ local log = require('log')
22
local fiber = require('fiber')
33
local state = require('queue.abstract.state')
44

5+
local util = require('queue.util')
56
local qc = require('queue.compat')
67
local num_type = qc.num_type
78
local str_type = qc.str_type
@@ -20,24 +21,6 @@ local i_pri = 6
2021
local i_created = 7
2122
local i_data = 8
2223

23-
local function time(tm)
24-
if tm == nil then
25-
tm = fiber.time64()
26-
elseif tm < 0 then
27-
tm = 0
28-
else
29-
tm = tm * 1000000
30-
end
31-
return 0ULL + tm
32-
end
33-
34-
local function event_time(tm)
35-
if tm == nil or tm < 0 then
36-
tm = 0
37-
end
38-
return 0ULL + tm * 1000000 + fiber.time64()
39-
end
40-
4124
local function is_expired(task)
4225
local dead_event = task[i_created] + task[i_ttl]
4326
return (dead_event <= fiber.time64())
@@ -107,7 +90,7 @@ local ttl_states = { state.READY, state.BURIED }
10790
local ttr_state = { state.TAKEN }
10891

10992
local function fifottl_fiber_iteration(self, processed)
110-
local now = time()
93+
local now = util.time()
11194
local task = nil
11295
local estimated = TIMEOUT_INFINITY
11396

@@ -234,20 +217,20 @@ function method.put(self, data, opts)
234217
if opts.delay ~= nil and opts.delay > 0 then
235218
status = state.DELAYED
236219
ttl = ttl + opts.delay
237-
next_event = event_time(opts.delay)
220+
next_event = util.event_time(opts.delay)
238221
else
239222
status = state.READY
240-
next_event = event_time(ttl)
223+
next_event = util.event_time(ttl)
241224
end
242225

243226
local task = self.space:insert{
244227
id,
245228
status,
246229
next_event,
247-
time(ttl),
248-
time(ttr),
230+
util.time(ttl),
231+
util.time(ttr),
249232
pri,
250-
time(),
233+
util.time(),
251234
data
252235
}
253236
self:on_task_change(task, 'put')
@@ -288,7 +271,7 @@ function method.take(self)
288271
return
289272
end
290273

291-
local next_event = time() + task[i_ttr]
274+
local next_event = util.time() + task[i_ttr]
292275
local dead_event = task[i_created] + task[i_ttl]
293276
if next_event > dead_event then
294277
next_event = dead_event
@@ -322,8 +305,8 @@ function method.release(self, id, opts)
322305
if opts.delay ~= nil and opts.delay > 0 then
323306
task = self.space:update(id, {
324307
{ '=', i_status, state.DELAYED },
325-
{ '=', i_next_event, event_time(opts.delay) },
326-
{ '+', i_ttl, time(opts.delay) }
308+
{ '=', i_next_event, util.event_time(opts.delay) },
309+
{ '+', i_ttl, util.time(opts.delay) }
327310
})
328311
else
329312
task = self.space:update(id, {

queue/abstract/driver/utubettl.lua

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ local fiber = require('fiber')
33

44
local state = require('queue.abstract.state')
55

6+
local util = require('queue.util')
67
local qc = require('queue.compat')
78
local num_type = qc.num_type
89
local str_type = qc.str_type
@@ -22,24 +23,6 @@ local i_created = 7
2223
local i_utube = 8
2324
local i_data = 9
2425

25-
local function time(tm)
26-
if tm == nil then
27-
tm = fiber.time64()
28-
elseif tm < 0 then
29-
tm = 0
30-
else
31-
tm = tm * 1000000
32-
end
33-
return 0ULL + tm
34-
end
35-
36-
local function event_time(tm)
37-
if tm == nil or tm < 0 then
38-
tm = 0
39-
end
40-
return 0ULL + tm * 1000000 + fiber.time64()
41-
end
42-
4326
local function is_expired(task)
4427
local dead_event = task[i_created] + task[i_ttl]
4528
return (dead_event <= fiber.time64())
@@ -114,7 +97,7 @@ local ttl_states = { state.READY, state.BURIED }
11497
local ttr_state = { state.TAKEN }
11598

11699
local function utubettl_fiber_iteration(self, processed)
117-
local now = time()
100+
local now = util.time()
118101
local task = nil
119102
local estimated = TIMEOUT_INFINITY
120103

@@ -242,28 +225,28 @@ function method.put(self, data, opts)
242225
if opts.delay ~= nil and opts.delay > 0 then
243226
status = state.DELAYED
244227
ttl = ttl + opts.delay
245-
next_event = event_time(opts.delay)
228+
next_event = util.event_time(opts.delay)
246229
else
247230
status = state.READY
248-
next_event = event_time(ttl)
231+
next_event = util.event_time(ttl)
249232
end
250233

251234
local task = self.space:insert{
252235
id,
253236
status,
254237
next_event,
255-
time(ttl),
256-
time(ttr),
238+
util.time(ttl),
239+
util.time(ttr),
257240
pri,
258-
time(),
241+
util.time(),
259242
tostring(opts.utube),
260243
data
261244
}
262245
self:on_task_change(task, 'put')
263246
return task
264247
end
265248

266-
local TIMEOUT_INFINITY_TIME = time(TIMEOUT_INFINITY)
249+
local TIMEOUT_INFINITY_TIME = util.time(TIMEOUT_INFINITY)
267250

268251
-- touch task
269252
function method.touch(self, id, delta)
@@ -291,7 +274,7 @@ function method.take(self)
291274
if t[2] ~= state.READY then
292275
break
293276
elseif not is_expired(t) then
294-
local next_event = time() + t[i_ttr]
277+
local next_event = util.time() + t[i_ttr]
295278
local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
296279
if taken == nil or taken[i_status] ~= state.TAKEN then
297280
t = self.space:update(t[1], {
@@ -344,16 +327,16 @@ function method.release(self, id, opts)
344327
if opts.delay ~= nil and opts.delay > 0 then
345328
task = self.space:update(id, {
346329
{ '=', i_status, state.DELAYED },
347-
{ '=', i_next_event, event_time(opts.delay) },
348-
{ '+', i_ttl, time(opts.delay) }
330+
{ '=', i_next_event, util.event_time(opts.delay) },
331+
{ '+', i_ttl, util.time(opts.delay) }
349332
})
350333
if task ~= nil then
351334
return process_neighbour(self, task, 'release')
352335
end
353336
else
354337
task = self.space:update(id, {
355338
{ '=', i_status, state.READY },
356-
{ '=', i_next_event, time(task[i_created] + task[i_ttl]) }
339+
{ '=', i_next_event, util.time(task[i_created] + task[i_ttl]) }
357340
})
358341
end
359342
self:on_task_change(task, 'release')

queue/util.lua

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
local fiber = require('fiber')
2+
3+
-- MAX_TIMEOUT == 100 years
4+
local MAX_TIMEOUT = 365 * 86400 * 100
5+
-- Set to TIMEOUT_INFINITY
6+
-- instead returns time for next event
7+
local TIMEOUT_INFINITY = 18446744073709551615ULL
8+
9+
local function time(tm)
10+
if tm == nil then
11+
tm = fiber.time64()
12+
elseif tm < 0 then
13+
tm = 0
14+
else
15+
tm = tm * 1000000
16+
end
17+
return 0ULL + tm
18+
end
19+
20+
local function event_time(tm)
21+
if tm == nil or tm < 0 then
22+
tm = 0
23+
elseif tm > MAX_TIMEOUT then
24+
return TIMEOUT_INFINITY
25+
end
26+
tm = 0ULL + tm * 1000000 + fiber.time64()
27+
return tm
28+
end
29+
30+
local util = {
31+
max_timeout = MAX_TIMEOUT,
32+
timeout_infinity = TIMEOUT_INFINITY
33+
}
34+
35+
-- methods
36+
local method = {
37+
time = time,
38+
event_time = event_time
39+
}
40+
41+
return setmetatable(util, { __index = method })

0 commit comments

Comments
 (0)