Skip to content

Commit e567794

Browse files
committed
Rework reactor interface. Use Scheduler as base class.
1 parent 345010d commit e567794

17 files changed

+229
-345
lines changed

.github/workflows/development.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
- macos
1515

1616
ruby:
17-
- 3.0
17+
- head
1818

1919
experimental: [false]
2020
env: [""]

async.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ Gem::Specification.new do |spec|
1313

1414
spec.files = Dir.glob('{lib}/**/*', File::FNM_DOTMATCH, base: __dir__)
1515

16-
spec.required_ruby_version = ">= 3.0.1"
16+
spec.required_ruby_version = ">= 3.1.0"
1717

1818
spec.add_dependency "console", "~> 1.10"
1919

gems.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
gemspec
66

7+
# gem "event", path: "../event"
8+
# gem "async-rspec", path: "../async-rspec"
9+
710
group :maintenance, optional: true do
811
gem "bake-bundler"
912
gem "bake-modernize"

lib/async.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@
2121
# THE SOFTWARE.
2222

2323
require_relative "async/version"
24-
require_relative "async/logger"
2524
require_relative "async/reactor"
2625

2726
require_relative "kernel/async"
2827
require_relative "kernel/sync"
2928

3029
module Async
3130
# Invoke `Reactor.run` with all arguments/block.
32-
def self.run(*arguments, &block)
33-
Reactor.run(*arguments, &block)
31+
def self.run(...)
32+
Reactor.run(...)
3433
end
3534
end

lib/async/logger.rb renamed to lib/async/debug/selector.rb

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# frozen_string_literal: true
22

3-
# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com>
3+
# Copyright, 2021, by Samuel G. D. Williams. <http://www.codeotaku.com>
44
#
55
# Permission is hereby granted, free of charge, to any person obtaining a copy
66
# of this software and associated documentation files (the "Software"), to deal
@@ -20,9 +20,15 @@
2020
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
2121
# THE SOFTWARE.
2222

23-
require 'console'
24-
require_relative 'task'
23+
require 'fiber'
24+
require 'event/debug/selector'
2525

2626
module Async
27-
extend Console
27+
module Debug
28+
class Selector < Event::Debug::Selector
29+
def initialize(selector = nil)
30+
super(selector || Event::Backend.new(Fiber.current))
31+
end
32+
end
33+
end
2834
end

lib/async/scheduler/interrupt.rb renamed to lib/async/interrupt.rb

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,32 @@
1919
# THE SOFTWARE.
2020

2121
module Async
22-
class Scheduler
23-
# A thread safe synchronisation primative.
24-
class Interrupt
25-
def initialize(scheduler)
26-
@scheduler = scheduler
27-
@input, @output = IO.pipe
28-
29-
@fiber = Fiber.new do
30-
while true
31-
@scheduler.io_wait(@fiber, @input, ::Event::READABLE)
32-
@input.read_nonblock(1)
33-
end
34-
end
35-
36-
@fiber.transfer
37-
end
22+
# A thread safe synchronisation primative.
23+
class Interrupt
24+
def initialize(scheduler, &block)
25+
@scheduler = scheduler
26+
@input, @output = IO.pipe
3827

39-
def signal
40-
@output.write('.')
41-
@output.flush
28+
@fiber = Fiber.new do
29+
while true
30+
@scheduler.io_wait(@fiber, @input, ::Event::READABLE)
31+
block.call(@input.read_nonblock(1))
32+
end
4233
end
4334

44-
def close
45-
@input.close
46-
@output.close
47-
end
35+
@fiber.transfer
36+
end
37+
38+
def signal(event = '.')
39+
@output.write('.')
40+
@output.flush
41+
end
42+
43+
def close
44+
@input.close
45+
@output.close
4846
end
4947
end
48+
49+
private_constant :Interrupt
5050
end

lib/async/node.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ def initialize(parent = nil, annotation: nil, transient: false)
175175
end
176176
end
177177

178+
def root
179+
@parent&.root || self
180+
end
181+
178182
# You should not directly rely on these pointers but instead use `#children`.
179183
# List pointers:
180184
attr_accessor :head
@@ -225,7 +229,7 @@ def backtrace(*arguments)
225229
end
226230

227231
def to_s
228-
"\#<#{description}>"
232+
"\#<#{self.description}>"
229233
end
230234

231235
# Change the parent of this node.
@@ -306,6 +310,10 @@ def stop
306310
@children&.each(&:stop)
307311
end
308312

313+
def stopped?
314+
@children.nil?
315+
end
316+
309317
def print_hierarchy(out = $stdout, backtrace: true)
310318
self.traverse do |node, level|
311319
indent = "\t" * level

lib/async/notification.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class Notification < Condition
3030
def signal(value = nil, task: Task.current)
3131
return if @waiting.empty?
3232

33-
task.reactor << Signal.new(@waiting, value)
33+
Fiber.scheduler << Signal.new(@waiting, value)
3434

3535
@waiting = []
3636

lib/async/reactor.rb

Lines changed: 15 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
# frozen_string_literal: true
2-
3-
# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com>
1+
# Copyright, 2020, by Samuel G. D. Williams. <http://www.codeotaku.com>
42
#
53
# Permission is hereby granted, free of charge, to any person obtaining a copy
64
# of this software and associated documentation files (the "Software"), to deal
@@ -20,25 +18,11 @@
2018
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
2119
# THE SOFTWARE.
2220

23-
require_relative 'logger'
24-
require_relative 'task'
2521
require_relative 'scheduler'
26-
27-
require 'timers'
28-
require 'forwardable'
22+
require_relative 'task'
2923

3024
module Async
31-
# Raised if a timeout occurs on a specific Fiber. Handled gracefully by `Task`.
32-
class TimeoutError < StandardError
33-
def initialize(message = "execution expired")
34-
super
35-
end
36-
end
37-
38-
# An asynchronous, cooperatively scheduled event reactor.
39-
class Reactor < Node
40-
extend Forwardable
41-
25+
class Reactor < Scheduler
4226
# The preferred method to invoke asynchronous behavior at the top level.
4327
#
4428
# - When invoked within an existing reactor task, it will run the given block
@@ -60,24 +44,17 @@ def self.run(*arguments, **options, &block)
6044
end
6145
end
6246

63-
def initialize(parent = nil, selector: nil, logger: nil)
64-
super(parent)
47+
def initialize(...)
48+
super
6549

66-
@scheduler = Scheduler.new(self, selector)
67-
@logger = logger
50+
@closing = false
51+
self.set!
6852
end
6953

70-
attr :scheduler
71-
attr :logger
72-
7354
def to_s
7455
"\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>"
7556
end
7657

77-
def stopped?
78-
@children.nil?
79-
end
80-
8158
# Start an asynchronous task within the specified reactor. The task will be
8259
# executed until the first blocking call, at which point it will yield and
8360
# and this method will return.
@@ -102,95 +79,17 @@ def async(*arguments, **options, &block)
10279
return task
10380
end
10481

105-
# Interrupt the reactor at the earliest convenience. Can be called from a different thread safely.
106-
def interrupt
107-
@scheduler.interrupt
108-
end
109-
110-
# Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
111-
# @param fiber [#resume] The object to be resumed on the next iteration of the run-loop.
112-
def << fiber
113-
@scheduler << fiber
114-
end
115-
116-
# Yield the current fiber and resume it on the next iteration of the event loop.
117-
def yield
118-
@scheduler.yield
119-
end
120-
121-
def transfer
122-
@scheduler.transfer
123-
end
124-
125-
def resume(fiber)
126-
@scheduler.resume(fiber)
127-
end
128-
129-
def finished?
130-
super && @scheduler.finished?
131-
end
132-
133-
# Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
134-
def run(*arguments, **options, &block)
135-
raise RuntimeError, 'Reactor has been closed' if @scheduler.nil?
136-
137-
@scheduler&.set!
138-
139-
initial_task = self.async(*arguments, **options, &block) if block_given?
140-
141-
@scheduler.run
142-
143-
return initial_task
144-
ensure
145-
@scheduler&.clear!
146-
Console.logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."}
147-
end
148-
149-
def stop(later = true)
150-
@children&.each do |child|
151-
# We don't want this process to propagate `Async::Stop` exceptions, so we schedule tasks to stop later.
152-
child.stop(later)
153-
end
154-
end
155-
156-
# Stop each of the children tasks and close the selector.
157-
#
158-
# @return [void]
15982
def close
160-
# This is a critical step. Because tasks could be stored as instance variables, and since the reactor is (probably) going out of scope, we need to ensure they are stopped. Otherwise, the tasks will belong to a reactor that will never run again and are not stopped.
161-
self.stop(false)
162-
163-
@scheduler.close
164-
@scheduler = nil
165-
end
166-
167-
# Check if the selector has been closed.
168-
# @return [Boolean]
169-
def closed?
170-
@scheduler.nil?
171-
end
172-
173-
# Put the calling fiber to sleep for a given ammount of time.
174-
# @param duration [Numeric] The time in seconds, to sleep for.
175-
def sleep(duration)
176-
@scheduler.kernel_sleep(duration)
83+
if @closing
84+
super
85+
else
86+
@closing = true
87+
self.clear!
88+
end
17789
end
17890

179-
# Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
180-
# @param duration [Numeric] The time in seconds, in which the task should
181-
# complete.
182-
def with_timeout(timeout, exception = TimeoutError)
183-
fiber = Fiber.current
184-
185-
timer = @scheduler.after(timeout) do
186-
if fiber.alive?
187-
fiber.raise(exception, "execution expired")
188-
end
189-
end
190-
191-
yield timer
192-
ensure
193-
timer.cancel if timer
91+
def with_timeout(timeout, exception = TimeoutError, message = "execution expired", &block)
92+
timeout_after(timeout, exception, message, &block)
19493
end
19594
end
19695
end

0 commit comments

Comments
 (0)