Skip to content

Commit bcb0653

Browse files
committed
Event loop updates.
1 parent 117904a commit bcb0653

File tree

9 files changed

+50
-55
lines changed

9 files changed

+50
-55
lines changed

async.gemspec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ Gem::Specification.new do |spec|
1313

1414
spec.files = Dir.glob('{lib}/**/*', File::FNM_DOTMATCH, base: __dir__)
1515

16-
spec.required_ruby_version = ">= 3.1.0"
16+
spec.required_ruby_version = ">= 3.0.2"
1717

1818
spec.add_dependency "console", "~> 1.10"
1919

20-
spec.add_dependency "event", "~> 0.5.0"
20+
spec.add_dependency "event", "~> 0.9.3"
2121
spec.add_dependency "timers", "~> 4.1"
2222

2323
spec.add_development_dependency "async-rspec", "~> 1.1"

lib/async/debug/selector.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ module Async
2727
module Debug
2828
class Selector < Event::Debug::Selector
2929
def initialize(selector = nil)
30-
super(selector || Event::Backend.new(Fiber.current))
30+
super(selector || Event::Selector.new(Fiber.current))
3131
end
3232
end
3333
end

lib/async/scheduler.rb

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,14 @@ def self.supported?
3636
def initialize(parent = nil, selector: nil)
3737
super(parent)
3838

39-
@selector = selector || Event::Backend.new(Fiber.current)
40-
@timers = Timers::Group.new
41-
42-
@ready = []
43-
@running = []
39+
@selector = selector || ::Event::Selector.new(Fiber.current)
40+
@timers = ::Timers::Group.new
4441

4542
@guard = Mutex.new
4643
@interrupted = false
4744
@blocked = 0
4845
@unblocked = []
4946

50-
@loop = nil
51-
5247
@interrupt = Interrupt.new(@selector) do |event|
5348
case event
5449
when '!'
@@ -59,44 +54,47 @@ def initialize(parent = nil, selector: nil)
5954

6055
def set!
6156
Fiber.set_scheduler(self)
62-
@loop = Fiber.current
6357
end
6458

6559
def clear!
6660
Fiber.set_scheduler(nil)
67-
@loop = nil
6861
end
6962

63+
# Interrupt the event loop.
7064
def interrupt
7165
@interrupt.signal('!')
7266
end
7367

74-
# Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
75-
# @param fiber [#resume] The object to be resumed on the next iteration of the run-loop.
76-
def << fiber
77-
@ready << fiber
68+
# Transfer from the calling fiber to the event loop.
69+
def transfer
70+
@selector.transfer
7871
end
7972

8073
# Yield the current fiber and resume it on the next iteration of the event loop.
8174
def yield
82-
@ready << Fiber.current
83-
@loop.transfer
75+
@selector.yield
76+
end
77+
78+
# Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
79+
# @param fiber [#resume] The object to be resumed on the next iteration of the run-loop.
80+
def push(fiber)
81+
@selector.push(fiber)
82+
end
83+
84+
alias << push
85+
86+
def raise(*arguments)
87+
@selector.raise(*arguments)
8488
end
8589

8690
def resume(fiber, *arguments)
87-
if @loop
88-
@ready << Fiber.current
89-
fiber.transfer(*arguments)
91+
if Fiber.scheduler
92+
@selector.resume(fiber, *arguments)
9093
else
91-
@ready << fiber
94+
@selector.push(fiber)
9295
end
9396
end
9497

95-
# Transfer from te calling fiber to the event loop.
96-
def transfer
97-
@loop.transfer
98-
end
99-
10098
# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
10199
# @reentrant Not thread safe.
102100
def block(blocker, timeout)
@@ -113,7 +111,7 @@ def block(blocker, timeout)
113111

114112
begin
115113
@blocked += 1
116-
@loop.transfer
114+
@selector.transfer
117115
ensure
118116
@blocked -= 1
119117
end
@@ -159,33 +157,27 @@ def io_wait(io, events, timeout = nil)
159157
timer&.cancel
160158
end
161159

160+
# def io_read(io, buffer, length)
161+
# @selector.io_read(Fiber.current, io, buffer, length)
162+
# end
163+
#
164+
# def io_write(io, buffer, length)
165+
# @selector.io_write(Fiber.current, io, buffer, length)
166+
# end
167+
162168
# Wait for the specified process ID to exit.
163169
# @parameter pid [Integer] The process ID to wait for.
164170
# @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
165171
# @returns [Process::Status] A process status instance.
166172
def process_wait(pid, flags)
167-
fiber = Fiber.current
168-
169-
return @selector.process_wait(fiber, pid, flags)
173+
return @selector.process_wait(Fiber.current, pid, flags)
170174
end
171175

172176
# Run one iteration of the event loop.
173177
# @param timeout [Float | nil] the maximum timeout, or if nil, indefinite.
174178
# @return [Boolean] whether there is more work to do.
175179
def run_once(timeout = nil)
176-
raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
177-
# Console.logger.info(self) {"@ready = #{@ready} @running = #{@running}"}
178-
179-
if @ready.any?
180-
# running used to correctly answer on `finished?`, and to reuse Array object.
181-
@running, @ready = @ready, @running
182-
183-
@running.each do |fiber|
184-
fiber.transfer if fiber.alive?
185-
end
186-
187-
@running.clear
188-
end
180+
Kernel::raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
189181

190182
if @unblocked.any?
191183
unblocked = Array.new
@@ -199,7 +191,7 @@ def run_once(timeout = nil)
199191
end
200192
end
201193

202-
if @ready.empty? and @unblocked.empty?
194+
if !@selector.ready? and @unblocked.empty?
203195
interval = @timers.wait_interval
204196
else
205197
# if there are tasks ready to execute, don't sleep:
@@ -244,7 +236,7 @@ def run_once(timeout = nil)
244236

245237
# Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
246238
def run(*arguments, **options, &block)
247-
raise RuntimeError, 'Reactor has been closed' if @selector.nil?
239+
Kernel::raise RuntimeError, 'Reactor has been closed' if @selector.nil?
248240

249241
initial_task = self.async(*arguments, **options, &block) if block_given?
250242

@@ -261,7 +253,7 @@ def close
261253
# This is a critical step. Because tasks could be stored as instance variables, and since the reactor is (probably) going out of scope, we need to ensure they are stopped. Otherwise, the tasks will belong to a reactor that will never run again and are not stopped.
262254
self.terminate
263255

264-
raise "Closing scheduler with blocked operations!" if @blocked > 0
256+
Kernel::raise "Closing scheduler with blocked operations!" if @blocked > 0
265257

266258
@guard.synchronize do
267259
@interrupt.close

lib/async/task.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,7 @@ def stop(later = false)
190190
end
191191
elsif @fiber&.alive?
192192
begin
193-
Fiber.scheduler << Fiber.current
194-
@fiber.raise(Stop)
193+
Fiber.scheduler.raise(@fiber, Stop)
195194
rescue FiberError
196195
Fiber.scheduler << Stop::Later.new(self)
197196
end

spec/async/barrier_spec.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
require 'async/barrier'
2424
require 'async/clock'
2525
require 'async/rspec'
26-
2726
require 'async/semaphore'
2827

2928
require_relative 'chainable_async_examples'

spec/async/reactor_spec.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,12 @@
149149

150150
subject.async do |task|
151151
events << true
152+
thread.join
152153
end
153154

154155
subject.run
155156

156-
puts "join"
157-
thread.join
158-
157+
expect(thread).to_not be_alive
159158
expect(subject).to be_stopped
160159
end
161160
end

spec/async/scheduler/thread_spec.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
include_context Async::RSpec::Reactor
2525

2626
describe ::Thread do
27+
# I saw this hang.
2728
it "can wait for value" do
2829
value = Thread.new do
2930
sleep(0)

spec/async/scheduler_spec.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,6 @@
116116

117117
expect(waiting).to be == 3
118118
queue.close
119-
120-
puts "Done."
121119
end
122120
end
123121

spec/async/task_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
require 'async'
2424
require 'async/clock'
25+
require 'async/queue'
2526

2627
RSpec.describe Async::Task do
2728
let(:reactor) {Async::Reactor.new}
@@ -229,10 +230,13 @@
229230
reactor.run do
230231
reactor.async do |task|
231232
parent_task = task
233+
232234
task.async do |task|
233235
child_task = task
236+
234237
task.sleep(10)
235238
end
239+
236240
task.sleep(10)
237241
end
238242

@@ -244,6 +248,9 @@
244248

245249
parent_task.stop
246250

251+
# We need to yield here to allow the tasks to be terminated. The parent task raises an exception in the child task and adds itself to the selector ready queue. It takes at least one iteration for the parent task to exit as well:
252+
reactor.yield
253+
247254
expect(parent_task).to_not be_alive
248255
expect(child_task).to_not be_alive
249256
end

0 commit comments

Comments
 (0)