From a6d8e1a897090076af6a448e085c86670cc52b07 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 24 Apr 2021 21:36:55 +1200 Subject: [PATCH 1/9] Rework implementation to use `Fiber#transfer`. Fixes #23. Ruby 3 allows mixing `Fiber#resume/#yield` and `Fiber#transfer`. We can take advantage of that to minimise the impact of non-blocking operations on user flow control. Previously, non-blocking operations would invoke `Fiber.yield` and this was a user-visible side-effect. We did take advantage of it, but it also meant that integration of Async with existing usage of Fiber could be problematic. We tracked the most obvious issues in `enumerator_spec.rb`. Now, non-blocking operations transfer directly to the scheduler fiber and thus don't impact other usage of resume/yield. --- .github/workflows/development.yml | 7 +- async.gemspec | 5 +- examples/fibers.rb | 2 +- gems.rb | 3 + lib/async.rb | 5 +- lib/async/condition.rb | 8 +- lib/async/debug/selector.rb | 60 +--- lib/async/{debug/monitor.rb => interrupt.rb} | 45 +-- lib/async/logger.rb | 28 -- lib/async/node.rb | 10 +- lib/async/notification.rb | 6 +- lib/async/reactor.rb | 278 +------------------ lib/async/scheduler.rb | 267 +++++++++++++++--- lib/async/semaphore.rb | 4 +- lib/async/task.rb | 115 ++++---- lib/async/wrapper.rb | 190 +------------ spec/async/condition_spec.rb | 4 +- spec/async/logger_spec.rb | 74 ----- spec/async/performance_spec.rb | 72 ----- spec/async/queue_spec.rb | 9 +- spec/async/reactor_spec.rb | 49 ++-- spec/async/scheduler/io_spec.rb | 4 +- spec/async/scheduler_spec.rb | 32 ++- spec/async/semaphore_spec.rb | 6 +- spec/async/task_spec.rb | 276 +++++++++++------- spec/async/wrapper_spec.rb | 203 -------------- spec/enumerator_spec.rb | 71 ++--- spec/spec_helper.rb | 2 + 28 files changed, 651 insertions(+), 1184 deletions(-) rename lib/async/{debug/monitor.rb => interrupt.rb} (67%) delete mode 100644 lib/async/logger.rb delete mode 100644 spec/async/logger_spec.rb delete mode 100644 spec/async/performance_spec.rb delete mode 100644 spec/async/wrapper_spec.rb diff --git a/.github/workflows/development.yml b/.github/workflows/development.yml index 37175b59..6e331018 100644 --- a/.github/workflows/development.yml +++ b/.github/workflows/development.yml @@ -14,10 +14,7 @@ jobs: - macos ruby: - - 2.5 - - 2.6 - - 2.7 - - 3.0 + - head experimental: [false] env: [""] @@ -34,7 +31,7 @@ jobs: ruby: head experimental: true - os: ubuntu - ruby: 2.6 + ruby: head env: COVERAGE=PartialSummary,Coveralls experimental: true diff --git a/async.gemspec b/async.gemspec index f5625e5c..964004dc 100644 --- a/async.gemspec +++ b/async.gemspec @@ -13,10 +13,11 @@ Gem::Specification.new do |spec| spec.files = Dir.glob('{lib}/**/*', File::FNM_DOTMATCH, base: __dir__) - spec.required_ruby_version = ">= 2.5.0" + spec.required_ruby_version = ">= 3.1.0" spec.add_dependency "console", "~> 1.10" - spec.add_dependency "nio4r", "~> 2.3" + + spec.add_dependency "event" spec.add_dependency "timers", "~> 4.1" spec.add_development_dependency "async-rspec", "~> 1.1" diff --git a/examples/fibers.rb b/examples/fibers.rb index 17c03bad..6413f112 100644 --- a/examples/fibers.rb +++ b/examples/fibers.rb @@ -50,7 +50,7 @@ def timeout(duration) timer = reactor.add_timer(duration) do if self.alive? - error = Fiber::TimeoutError.new("execution expired") + error = Fiber::TimeoutError.new error.set_backtrace backtrace self.resume error end diff --git a/gems.rb b/gems.rb index 6110fab5..32ea77e1 100644 --- a/gems.rb +++ b/gems.rb @@ -4,6 +4,9 @@ gemspec +# gem "event", path: "../event" +# gem "async-rspec", path: "../async-rspec" + group :maintenance, optional: true do gem "bake-bundler" gem "bake-modernize" diff --git a/lib/async.rb b/lib/async.rb index 2a1393a8..cdfb217b 100644 --- a/lib/async.rb +++ b/lib/async.rb @@ -21,7 +21,6 @@ # THE SOFTWARE. require_relative "async/version" -require_relative "async/logger" require_relative "async/reactor" require_relative "kernel/async" @@ -29,7 +28,7 @@ module Async # Invoke `Reactor.run` with all arguments/block. - def self.run(*arguments, &block) - Reactor.run(*arguments, &block) + def self.run(...) + Reactor.run(...) end end diff --git a/lib/async/condition.rb b/lib/async/condition.rb index 0be6c1e6..5da7400b 100644 --- a/lib/async/condition.rb +++ b/lib/async/condition.rb @@ -37,11 +37,9 @@ def wait fiber = Fiber.current @waiting << fiber - Task.yield - - # It would be nice if there was a better construct for this. We only need to invoke #delete if the task was not resumed normally. This can only occur with `raise` and `throw`. But there is no easy way to detect this. - # ensure when not return or ensure when raise, throw + Fiber.scheduler.transfer rescue Exception + # It would be nice if there was a better construct for this. We only need to invoke #delete if the task was not resumed normally. This can only occur with `raise` and `throw`. But there is no easy way to detect this. @waiting.delete(fiber) raise end @@ -61,7 +59,7 @@ def signal(value = nil) @waiting = [] waiting.each do |fiber| - fiber.resume(value) if fiber.alive? + Fiber.scheduler.resume(fiber, value) if fiber.alive? end return nil diff --git a/lib/async/debug/selector.rb b/lib/async/debug/selector.rb index c3887930..d3bed797 100644 --- a/lib/async/debug/selector.rb +++ b/lib/async/debug/selector.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -# Copyright, 2018, by Samuel G. D. Williams. +# Copyright, 2021, by Samuel G. D. Williams. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -20,62 +20,14 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -require_relative 'monitor' -require_relative '../logger' - -require 'nio' -require 'set' +require 'fiber' +require 'event/debug/selector' module Async module Debug - class LeakError < RuntimeError - def initialize(monitors) - super "Trying to close selector with active monitors: #{monitors.inspect}! This may cause your socket or file descriptor to leak." - end - end - - class Selector - def initialize(selector = NIO::Selector.new) - @selector = selector - @monitors = Set.new - end - - def register(object, interests) - Async.logger.debug(self) {"Registering #{object.inspect} for #{interests}."} - - unless io = ::IO.try_convert(object) - raise RuntimeError, "Could not convert #{io} into IO!" - end - - monitor = Monitor.new(@selector.register(object, interests), self) - - @monitors.add(monitor) - - return monitor - end - - def deregister(monitor) - Async.logger.debug(self) {"Deregistering #{monitor.inspect}."} - - unless @monitors.delete?(monitor) - raise RuntimeError, "Trying to remove monitor for #{monitor.inspect} but it was not registered!" - end - end - - def wakeup - @selector.wakeup - end - - def close - if @monitors.any? - raise LeakError, @monitors - end - ensure - @selector.close - end - - def select(*arguments) - @selector.select(*arguments) + class Selector < Event::Debug::Selector + def initialize(selector = nil) + super(selector || Event::Backend.new(Fiber.current)) end end end diff --git a/lib/async/debug/monitor.rb b/lib/async/interrupt.rb similarity index 67% rename from lib/async/debug/monitor.rb rename to lib/async/interrupt.rb index 99837004..c4566b55 100644 --- a/lib/async/debug/monitor.rb +++ b/lib/async/interrupt.rb @@ -1,6 +1,4 @@ -# frozen_string_literal: true - -# Copyright, 2018, by Samuel G. D. Williams. +# Copyright, 2020, by Samuel G. D. Williams. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -20,28 +18,33 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -require 'delegate' - module Async - module Debug - class Monitor < Delegator - def initialize(monitor, selector) - @monitor = monitor - @selector = selector - end - - def __getobj__ - @monitor - end + # A thread safe synchronisation primative. + class Interrupt + def initialize(scheduler, &block) + @scheduler = scheduler + @input, @output = IO.pipe - def close - @selector.deregister(self) - @monitor.close + @fiber = Fiber.new do + while true + @scheduler.io_wait(@fiber, @input, ::Event::READABLE) + block.call(@input.read_nonblock(1)) + end end - def inspect - "\#<#{self.class} io=#{@monitor.io.inspect} interests=#{@monitor.interests.inspect} readiness=#{@monitor.readiness.inspect}>" - end + @fiber.transfer + end + + def signal(event = '.') + @output.write('.') + @output.flush + end + + def close + @input.close + @output.close end end + + private_constant :Interrupt end diff --git a/lib/async/logger.rb b/lib/async/logger.rb deleted file mode 100644 index 8f9ed9a4..00000000 --- a/lib/async/logger.rb +++ /dev/null @@ -1,28 +0,0 @@ -# frozen_string_literal: true - -# Copyright, 2017, by Samuel G. D. Williams. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -require 'console' -require_relative 'task' - -module Async - extend Console -end diff --git a/lib/async/node.rb b/lib/async/node.rb index a75c38b4..0c331790 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -175,6 +175,10 @@ def initialize(parent = nil, annotation: nil, transient: false) end end + def root + @parent&.root || self + end + # You should not directly rely on these pointers but instead use `#children`. # List pointers: attr_accessor :head @@ -225,7 +229,7 @@ def backtrace(*arguments) end def to_s - "\#<#{description}>" + "\#<#{self.description}>" end # Change the parent of this node. @@ -329,6 +333,10 @@ def stop(later = false) end end + def stopped? + @children.nil? + end + def print_hierarchy(out = $stdout, backtrace: true) self.traverse do |node, level| indent = "\t" * level diff --git a/lib/async/notification.rb b/lib/async/notification.rb index 5ae057c9..6f0eb97e 100644 --- a/lib/async/notification.rb +++ b/lib/async/notification.rb @@ -30,7 +30,7 @@ class Notification < Condition def signal(value = nil, task: Task.current) return if @waiting.empty? - task.reactor << Signal.new(@waiting, value) + Fiber.scheduler << Signal.new(@waiting, value) @waiting = [] @@ -42,9 +42,9 @@ def alive? true end - def resume + def transfer waiting.each do |fiber| - fiber.resume(value) if fiber.alive? + fiber.transfer(value) if fiber.alive? end end end diff --git a/lib/async/reactor.rb b/lib/async/reactor.rb index 41caf91f..c5a6863a 100644 --- a/lib/async/reactor.rb +++ b/lib/async/reactor.rb @@ -1,6 +1,4 @@ -# frozen_string_literal: true - -# Copyright, 2017, by Samuel G. D. Williams. +# Copyright, 2020, by Samuel G. D. Williams. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -20,24 +18,11 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -require_relative 'logger' -require_relative 'task' -require_relative 'wrapper' require_relative 'scheduler' - -require 'nio' -require 'timers' -require 'forwardable' +require_relative 'task' module Async - # Raised if a timeout occurs on a specific Fiber. Handled gracefully by `Task`. - class TimeoutError < StandardError - end - - # An asynchronous, cooperatively scheduled event reactor. - class Reactor < Node - extend Forwardable - + class Reactor < Scheduler # The preferred method to invoke asynchronous behavior at the top level. # # - When invoked within an existing reactor task, it will run the given block @@ -59,89 +44,17 @@ def self.run(*arguments, **options, &block) end end - def self.selector - if backend = ENV['ASYNC_BACKEND']&.to_sym - if NIO::Selector.backends.include?(backend) - return NIO::Selector.new(backend) - else - warn "Could not find ASYNC_BACKEND=#{backend}!" - end - end - - return NIO::Selector.new - end - - def initialize(parent = nil, selector: self.class.selector, logger: nil) - super(parent) - - @selector = selector - @timers = Timers::Group.new - @logger = logger - - @ready = [] - @running = [] - - if Scheduler.supported? - @scheduler = Scheduler.new(self) - else - @scheduler = nil - end - - @interrupted = false - @guard = Mutex.new - @blocked = 0 - @unblocked = [] - end - - attr :scheduler - attr :logger - - # @reentrant Not thread safe. - def block(blocker, timeout) - fiber = Fiber.current - - if timeout - timer = @timers.after(timeout) do - if fiber.alive? - fiber.resume(false) - end - end - end + def initialize(...) + super - begin - @blocked += 1 - Task.yield - ensure - @blocked -= 1 - end - ensure - timer&.cancel - end - - # @reentrant Thread safe. - def unblock(blocker, fiber) - @guard.synchronize do - @unblocked << fiber - @selector.wakeup - end - end - - def fiber(&block) - if @scheduler - Fiber.new(blocking: false, &block) - else - Fiber.new(&block) - end + @closing = false + self.set! end def to_s "\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>" end - def stopped? - @children.nil? - end - # Start an asynchronous task within the specified reactor. The task will be # executed until the first blocking call, at which point it will yield and # and this method will return. @@ -166,180 +79,19 @@ def async(*arguments, **options, &block) return task end - def register(io, interest, value = Fiber.current) - monitor = @selector.register(io, interest) - monitor.value = value - - return monitor - end - - # Interrupt the reactor at the earliest convenience. Can be called from a different thread safely. - def interrupt - @guard.synchronize do - unless @interrupted - @interrupted = true - @selector.wakeup - end - end - end - - # Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor. - # @param fiber [#resume] The object to be resumed on the next iteration of the run-loop. - def << fiber - @ready << fiber - end - - # Yield the current fiber and resume it on the next iteration of the event loop. - def yield(fiber = Fiber.current) - @ready << fiber - - Task.yield - end - - def finished? - # TODO I'm not sure if checking `@running.empty?` is really required. - super && @ready.empty? && @running.empty? && @blocked.zero? - end - - # Run one iteration of the event loop. - # @param timeout [Float | nil] the maximum timeout, or if nil, indefinite. - # @return [Boolean] whether there is more work to do. - def run_once(timeout = nil) - # Console.logger.debug(self) {"@ready = #{@ready} @running = #{@running}"} - - if @ready.any? - # running used to correctly answer on `finished?`, and to reuse Array object. - @running, @ready = @ready, @running - - @running.each do |fiber| - fiber.resume if fiber.alive? - end - - @running.clear - end - - if @unblocked.any? - unblocked = Array.new - - @guard.synchronize do - unblocked, @unblocked = @unblocked, unblocked - end - - while fiber = unblocked.pop - fiber.resume if fiber.alive? - end - end - - if @ready.empty? - interval = @timers.wait_interval + def close + if @closing + super else - # if there are tasks ready to execute, don't sleep: - interval = 0 - end - - # If we are finished, we stop the task tree and exit: - if self.finished? - return false - end - - # If there is no interval to wait (thus no timers), and no tasks, we could be done: - if interval.nil? - # Allow the user to specify a maximum interval if we would otherwise be sleeping indefinitely: - interval = timeout - elsif interval < 0 - # We have timers ready to fire, don't sleep in the selctor: - interval = 0 - elsif timeout and interval > timeout - interval = timeout - end - - # Console.logger.info(self) {"Selecting with #{@children&.size} children with interval = #{interval ? interval.round(2) : 'infinite'}..."} - if monitors = @selector.select(interval) - monitors.each do |monitor| - monitor.value.resume - end - end - - @timers.fire - - # We check and clear the interrupted flag here: - if @interrupted - @guard.synchronize do - @interrupted = false - end - - return false + @closing = true + self.clear! end - - # The reactor still has work to do: - return true - end - - # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided. - def run(*arguments, **options, &block) - raise RuntimeError, 'Reactor has been closed' if @selector.nil? - - @scheduler&.set! - - initial_task = self.async(*arguments, **options, &block) if block_given? - - while self.run_once - # Round and round we go! - end - - return initial_task - ensure - @scheduler&.clear! - Console.logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end - # Stop each of the children tasks and close the selector. - def close - # This is a critical step. Because tasks could be stored as instance variables, and since the reactor is (probably) going out of scope, we need to ensure they are stopped. Otherwise, the tasks will belong to a reactor that will never run again and are not stopped: - self.terminate - - @selector.close - @selector = nil - end + public :sleep - # Check if the selector has been closed. - # @returns [Boolean] - def closed? - @selector.nil? - end - - # Put the calling fiber to sleep for a given ammount of time. - # @parameter duration [Numeric] The time in seconds, to sleep for. - def sleep(duration) - fiber = Fiber.current - - timer = @timers.after(duration) do - if fiber.alive? - fiber.resume - end - end - - Task.yield - ensure - timer.cancel if timer - end - - # Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception. - # @param duration [Numeric] The time in seconds, in which the task should - # complete. - def with_timeout(timeout, exception = TimeoutError) - fiber = Fiber.current - - timer = @timers.after(timeout) do - if fiber.alive? - error = exception.new("execution expired") - fiber.resume(error) - end - end - - yield timer - ensure - timer.cancel if timer + def with_timeout(timeout, exception = TimeoutError, message = "execution expired", &block) + timeout_after(timeout, exception, message, &block) end end end diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index e6baa96a..39236559 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -19,59 +19,138 @@ # THE SOFTWARE. require_relative 'clock' +require_relative 'interrupt' +require_relative 'node' + +require 'console' +require 'event' +require 'timers' module Async - class Scheduler - if Fiber.respond_to?(:set_scheduler) - def self.supported? - true + class Scheduler < Node + def self.supported? + true + end + + def initialize(parent = nil, selector: nil) + super(parent) + + @selector = selector || Event::Backend.new(Fiber.current) + @timers = Timers::Group.new + + @ready = [] + @running = [] + + @guard = Mutex.new + @interrupted = false + @blocked = 0 + @unblocked = [] + + @loop = nil + + @interrupt = Interrupt.new(@selector) do |event| + case event + when '!' + @interrupted = true + end end - else - def self.supported? - false + end + + def interrupt + @interrupt.signal('!') + end + + # Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor. + # @param fiber [#resume] The object to be resumed on the next iteration of the run-loop. + def << fiber + @ready << fiber + end + + # Yield the current fiber and resume it on the next iteration of the event loop. + def yield + @ready << Fiber.current + @loop.transfer + end + + def resume(fiber, *arguments) + if @loop + @ready << Fiber.current + fiber.transfer(*arguments) + else + @ready << fiber end end - def initialize(reactor) - @reactor = reactor + # Transfer from te calling fiber to the event loop. + def transfer + @loop.transfer end - attr :wrappers + def kernel_sleep(duration) + self.block(nil, duration) + end + + # @reentrant Not thread safe. + def block(blocker, timeout) + # $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})" + fiber = Fiber.current + + if timeout + timer = @timers.after(timeout) do + if fiber.alive? + fiber.transfer(false) + end + end + end + + begin + @blocked += 1 + @loop.transfer + ensure + @blocked -= 1 + end + ensure + timer&.cancel + end + + # @reentrant Thread safe. + def unblock(blocker, fiber) + # $stderr.puts "unblock(#{blocker}, #{fiber})" + + @guard.synchronize do + @unblocked << fiber + @interrupt&.signal + end + end def set! Fiber.set_scheduler(self) + @loop = Fiber.current end def clear! Fiber.set_scheduler(nil) - end - - private def from_io(io) - Wrapper.new(io, @reactor) + @loop = nil end def io_wait(io, events, timeout = nil) - wrapper = from_io(io) + fiber = Fiber.current - if events == ::IO::READABLE - if wrapper.wait_readable(timeout) - return ::IO::READABLE - end - elsif events == ::IO::WRITABLE - if wrapper.wait_writable(timeout) - return ::IO::WRITABLE - end - else - if wrapper.wait_any(timeout) - return events + if timeout + timer = @timers.after(timeout) do + fiber.raise(TimeoutError) end end - return false + # Console.logger.info(self, "-> io_wait", fiber, io, events) + events = @selector.io_wait(fiber, io, events) + # Console.logger.info(self, "<- io_wait", fiber, io, events) + + return events rescue TimeoutError - return nil + return false ensure - wrapper.reactor = nil + timer&.cancel end # Wait for the specified process ID to exit. @@ -84,29 +163,139 @@ def process_wait(pid, flags) end.value end - def kernel_sleep(duration) - self.block(nil, duration) + def finished? + super && @ready.empty? && @running.empty? && @blocked.zero? end - def block(blocker, timeout) - @reactor.block(blocker, timeout) + # Run one iteration of the event loop. + # @param timeout [Float | nil] the maximum timeout, or if nil, indefinite. + # @return [Boolean] whether there is more work to do. + def run_once(timeout = nil) + raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? + # Console.logger.info(self) {"@ready = #{@ready} @running = #{@running}"} + + if @ready.any? + # running used to correctly answer on `finished?`, and to reuse Array object. + @running, @ready = @ready, @running + + @running.each do |fiber| + fiber.transfer if fiber.alive? + end + + @running.clear + end + + if @unblocked.any? + unblocked = Array.new + + @guard.synchronize do + unblocked, @unblocked = @unblocked, unblocked + end + + while fiber = unblocked.pop + fiber.transfer if fiber.alive? + end + end + + if @ready.empty? and @unblocked.empty? + interval = @timers.wait_interval + else + # if there are tasks ready to execute, don't sleep: + interval = 0 + end + + # If we are finished, we stop the task tree and exit: + if self.finished? + return false + end + + # If there is no interval to wait (thus no timers), and no tasks, we could be done: + if interval.nil? + # Allow the user to specify a maximum interval if we would otherwise be sleeping indefinitely: + interval = timeout + elsif interval < 0 + # We have timers ready to fire, don't sleep in the selctor: + interval = 0 + elsif timeout and interval > timeout + interval = timeout + end + + begin + # Console.logger.info(self) {"@selector.select(#{interval ? interval.round(2) : 'forever'})..."} + @selector.select(interval) + rescue Errno::EINTR + # Ignore. + end + + @timers.fire + + # We check and clear the interrupted flag here: + if @interrupted + @interrupted = false + + return false + end + + # The reactor still has work to do: + return true end - def unblock(blocker, fiber) - @reactor.unblock(blocker, fiber) + # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided. + def run(*arguments, **options, &block) + raise RuntimeError, 'Reactor has been closed' if @selector.nil? + + initial_task = self.async(*arguments, **options, &block) if block_given? + + while self.run_once + # Round and round we go! + end + + return initial_task + ensure + Console.logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end def close + # This is a critical step. Because tasks could be stored as instance variables, and since the reactor is (probably) going out of scope, we need to ensure they are stopped. Otherwise, the tasks will belong to a reactor that will never run again and are not stopped. + self.terminate + + raise "Closing scheduler with blocked operations!" if @blocked > 0 + + @guard.synchronize do + @interrupt.close + @interrupt = nil + + @selector.close + @selector = nil + end + end + + def closed? + @selector.nil? end def fiber(&block) - task = Task.new(@reactor, &block) - - fiber = task.fiber + task = Task.new(Task.current? || self, &block) task.run - return fiber + return task.fiber + end + + # Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception. + # @param duration [Numeric] The time in seconds, in which the task should complete. + def timeout_after(timeout, exception, message, &block) + fiber = Fiber.current + + timer = @timers.after(timeout) do + if fiber.alive? + fiber.raise(exception, message) + end + end + + yield timer + ensure + timer.cancel if timer end end end diff --git a/lib/async/semaphore.rb b/lib/async/semaphore.rb index 87ad49c9..c4687c37 100644 --- a/lib/async/semaphore.rb +++ b/lib/async/semaphore.rb @@ -89,7 +89,7 @@ def release while (@limit - @count) > 0 and fiber = @waiting.shift if fiber.alive? - fiber.resume + Fiber.scheduler.resume(fiber) end end end @@ -102,7 +102,7 @@ def wait if blocking? @waiting << fiber - Task.yield while blocking? + Fiber.scheduler.transfer while blocking? end rescue Exception @waiting.delete(fiber) diff --git a/lib/async/task.rb b/lib/async/task.rb index 6be28a05..3b43d122 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -21,7 +21,6 @@ # THE SOFTWARE. require 'fiber' -require 'forwardable' require_relative 'node' require_relative 'condition' @@ -38,55 +37,66 @@ def alive? true end - def resume + def transfer @task.stop end end end + # Raised if a timeout occurs on a specific Fiber. Handled gracefully by `Task`. + class TimeoutError < StandardError + def initialize(message = "execution expired") + super + end + end + # A task represents the state associated with the execution of an asynchronous # block. class Task < Node - extend Forwardable - - # Yield the unerlying `result` for the task. If the result - # is an Exception, then that result will be raised an its - # exception. - # @return [Object] result of the task - # @raise [Exception] if the result is an exception - # @yield [result] result of the task if a block if given. def self.yield - if block_given? - result = yield - else - result = Fiber.yield - end - - if result.is_a? Exception - raise result + Fiber.scheduler.transfer + end + + # The preferred method to invoke asynchronous behavior at the top level. + # + # - When invoked within an existing reactor task, it will run the given block + # asynchronously. Will return the task once it has been scheduled. + # - When invoked at the top level, will create and run a reactor, and invoke + # the block as an asynchronous task. Will block until the reactor finishes + # running. + def self.run(*arguments, **options, &block) + if current = self.current? + return current.async(*arguments, **options, &block) else - return result + scheduler = Scheduler.new + scheduler.set! + + begin + Fiber.schedule(&block) + return self.run(*arguments, **options, &block) + ensure + scheduler.clear! + end end end # Create a new task. # @param reactor [Async::Reactor] the reactor this task will run within. # @param parent [Async::Task] the parent task. - def initialize(reactor, parent = Task.current?, logger: nil, finished: nil, **options, &block) - super(parent || reactor, **options) - - @reactor = reactor + def initialize(parent = Task.current?, finished: nil, **options, &block) + super(parent, **options) @status = :initialized @result = nil @finished = finished - @logger = logger || @parent.logger - - @fiber = make_fiber(&block) + @block = block + @fiber = nil end - attr :logger + def reactor + self.root + end if Fiber.current.respond_to?(:backtrace) def backtrace(*arguments) @@ -98,14 +108,17 @@ def to_s "\#<#{self.description} (#{@status})>" end - # @attr ios [Reactor] The reactor the task was created within. - attr :reactor + def sleep(duration = nil) + super + end - def_delegators :@reactor, :with_timeout, :sleep + def with_timeout(timeout, exception = TimeoutError, message = "execution expired", &block) + Fiber.scheduler.timeout_after(timeout, exception, message, &block) + end # Yield back to the reactor and allow other fibers to execute. def yield - Task.yield{reactor.yield} + Fiber.scheduler.yield end # @attr fiber [Fiber] The fiber which is being used for the execution of this task. @@ -123,14 +136,14 @@ def run(*arguments) if @status == :initialized @status = :running - @fiber.resume(*arguments) + schedule(arguments) else raise RuntimeError, "Task already running!" end end def async(*arguments, **options, &block) - task = Task.new(@reactor, self, **options, &block) + task = Task.new(self, **options, &block) task.run(*arguments) @@ -141,19 +154,25 @@ def async(*arguments, **options, &block) # @raise [RuntimeError] if the task's fiber is the current fiber. # @return [Object] the final expression/result of the task's block. def wait - raise RuntimeError, "Cannot wait on own fiber" if Fiber.current.equal?(@fiber) + raise "Cannot wait on own fiber" if Fiber.current.equal?(@fiber) if running? + raise "Cannot wait outside of reactor" unless Fiber.scheduler + @finished ||= Condition.new @finished.wait + end + + case @result + when Exception + raise @result else - Task.yield{@result} + return @result end end - # Deprecated. - alias result wait - # Soon to become attr :result + # Access the result of the task without waiting. May be nil if the task is not completed. + attr :result # Stop the task and all of its children. def stop(later = false) @@ -165,15 +184,16 @@ def stop(later = false) if self.running? if self.current? if later - @reactor << Stop::Later.new(self) + Fiber.scheduler << Stop::Later.new(self) else raise Stop, "Stopping current task!" end elsif @fiber&.alive? begin - @fiber.resume(Stop.new) + Fiber.scheduler << Fiber.current + @fiber.raise(Stop) rescue FiberError - @reactor << Stop::Later.new(self) + Fiber.scheduler << Stop::Later.new(self) end end else @@ -246,18 +266,18 @@ def fail!(exception = nil, propagate = true) end def stop! - # logger.debug(self) {"Task was stopped with #{@children&.size.inspect} children!"} + # Console.logger.info(self, self.annotation) {"Task was stopped with #{@children&.size.inspect} children!"} @status = :stopped stop_children(true) end - def make_fiber(&block) - Fiber.new do |*arguments| + def schedule(arguments) + @fiber = Fiber.new do set! begin - @result = yield(self, *arguments) + @result = @block.call(self, *arguments) @status = :complete # Console.logger.debug(self) {"Task was completed with #{@children.size} children!"} rescue Stop @@ -267,10 +287,12 @@ def make_fiber(&block) rescue Exception => exception fail!(exception, true) ensure - # Console.logger.debug(self) {"Task ensure $!=#{$!} with #{@children.size} children!"} + # Console.logger.info(self) {"Task ensure $! = #{$!} with #{@children&.size.inspect} children!"} finish! end end + + self.root.resume(@fiber) end # Finish the current task, and all bound bound IO objects. @@ -291,7 +313,6 @@ def finish! def set! # This is actually fiber-local: Thread.current[:async_task] = self - Console.logger = @logger if @logger end end end diff --git a/lib/async/wrapper.rb b/lib/async/wrapper.rb index 0eeec35c..08140f3b 100644 --- a/lib/async/wrapper.rb +++ b/lib/async/wrapper.rb @@ -20,220 +20,58 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -require 'nio' - module Async # Represents an asynchronous IO within a reactor. class Wrapper class Cancelled < StandardError - class From - def initialize - @backtrace = caller[5..-1] - end - - attr :backtrace - - def cause - nil - end - - def message - "Cancelled" - end - end - - def initialize - super "The operation has been cancelled!" - - @cause = From.new - end - - attr :cause - end - - # wait_readable, wait_writable and wait_any are not re-entrant, and will raise this failure. - class WaitError < StandardError - def initialize - super "A fiber is already waiting!" - end end # @param io the native object to wrap. # @param reactor [Reactor] the reactor that is managing this wrapper, or not specified, it's looked up by way of {Task.current}. def initialize(io, reactor = nil) @io = io - @reactor = reactor - @monitor = nil - @readable = nil - @writable = nil - @any = nil + @timeout = nil end - def dup - self.class.new(@io.dup, @reactor) - end + attr_accessor :reactor - def resume(*arguments) - # It's possible that the monitor was closed before calling resume. - return unless @monitor - - readiness = @monitor.readiness - - if @readable and (readiness == :r or readiness == :rw) - @readable.resume(*arguments) - end - - if @writable and (readiness == :w or readiness == :rw) - @writable.resume(*arguments) - end - - if @any - @any.resume(*arguments) - end + def dup + self.class.new(@io.dup) end # The underlying native `io`. attr :io - # The reactor this wrapper is associated with, if any. - attr :reactor - - # The monitor for this wrapper, if any. - attr :monitor - - # Bind this wrapper to a different reactor. Assign nil to convert to an unbound wrapper (can be used from any reactor/task but with slightly increased overhead.) - # Binding to a reactor is purely a performance consideration. Generally, I don't like APIs that exist only due to optimisations. This is borderline, so consider this functionality semi-private. - def reactor= reactor - return if @reactor.equal?(reactor) - - cancel_monitor - - @reactor = reactor + # Wait for the io to become readable. + def wait_readable(timeout = @timeout) + @io.to_io.wait_readable(timeout) or raise TimeoutError end - # Wait for the io to become readable. - def wait_readable(timeout = nil) - raise WaitError if @readable - - self.reactor = Task.current.reactor - - begin - @readable = Fiber.current - wait_for(timeout) - ensure - @readable = nil - @monitor.interests = interests if @monitor - end + # Wait for the io to become writable. + def wait_priority(timeout = @timeout) + @io.to_io.wait_priority(timeout) or raise TimeoutError end # Wait for the io to become writable. - def wait_writable(timeout = nil) - raise WaitError if @writable - - self.reactor = Task.current.reactor - - begin - @writable = Fiber.current - wait_for(timeout) - ensure - @writable = nil - @monitor.interests = interests if @monitor - end + def wait_writable(timeout = @timeout) + @io.to_io.wait_writable(timeout) or raise TimeoutError end # Wait fo the io to become either readable or writable. # @param duration [Float] timeout after the given duration if not `nil`. - def wait_any(timeout = nil) - raise WaitError if @any - - self.reactor = Task.current.reactor - - begin - @any = Fiber.current - wait_for(timeout) - ensure - @any = nil - @monitor.interests = interests if @monitor - end + def wait_any(timeout = @timeout) + @io.wait_any(timeout) or raise TimeoutError end # Close the io and monitor. def close - cancel_monitor - @io.close end def closed? @io.closed? end - - private - - # What an abomination. - def interests - if @any - return :rw - elsif @readable - if @writable - return :rw - else - return :r - end - elsif @writable - return :w - end - - return nil - end - - def cancel_monitor - if @readable - readable = @readable - @readable = nil - - readable.resume(Cancelled.new) - end - - if @writable - writable = @writable - @writable = nil - - writable.resume(Cancelled.new) - end - - if @any - any = @any - @any = nil - - any.resume(Cancelled.new) - end - - if @monitor - @monitor.close - @monitor = nil - end - end - - def wait_for(timeout) - if @monitor - @monitor.interests = interests - else - @monitor = @reactor.register(@io, interests, self) - end - - # If the user requested an explicit timeout for this operation: - if timeout - @reactor.with_timeout(timeout) do - Task.yield - end - else - Task.yield - end - - return true - end end end diff --git a/spec/async/condition_spec.rb b/spec/async/condition_spec.rb index b3a59275..7509a8ad 100644 --- a/spec/async/condition_spec.rb +++ b/spec/async/condition_spec.rb @@ -25,7 +25,7 @@ require_relative 'condition_examples' -RSpec.describe Async::Condition do +RSpec.describe Async::Condition, timeout: 1000 do include_context Async::RSpec::Reactor it 'should continue after condition is signalled' do @@ -39,8 +39,6 @@ subject.signal expect(task.status).to be :complete - - task.stop end it 'can stop nested task' do diff --git a/spec/async/logger_spec.rb b/spec/async/logger_spec.rb deleted file mode 100644 index 7d3d8498..00000000 --- a/spec/async/logger_spec.rb +++ /dev/null @@ -1,74 +0,0 @@ -# frozen_string_literal: true - -# Copyright, 2017, by Samuel G. D. Williams. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -require 'async' -require 'async/logger' -require 'console/capture' - -RSpec.describe 'Async.logger' do - let(:name) {"nested"} - let(:message) {"Talk is cheap. Show me the code."} - - let(:capture) {Console::Capture.new} - let(:logger) {Console::Logger.new(capture, name: name)} - - it "can use nested logger" do - Async(logger: logger) do |task| - expect(task.logger).to be == logger - logger.warn message - end.wait - - expect(capture.last).to include({ - severity: :warn, - name: name, - subject: message, - }) - end - - it "can change nested logger" do - Async do |parent| - parent.async(logger: logger) do |task| - expect(task.logger).to be == logger - expect(Async.logger).to be == logger - expect(Console.logger).to be == logger - end.wait - end.wait - end - - it "can use parent logger" do - current_logger = Console.logger - child = nil - - Async(logger: logger) do |parent| - child = parent.async{|task| task.yield} - - expect(parent.logger).to be == logger - expect(child.logger).to be == logger - expect(Async.logger).to be == logger - expect(Console.logger).to be == logger - end.wait - - expect(child.logger).to be == logger - - expect(Console.logger).to be == current_logger - end -end diff --git a/spec/async/performance_spec.rb b/spec/async/performance_spec.rb deleted file mode 100644 index 4bf99a0f..00000000 --- a/spec/async/performance_spec.rb +++ /dev/null @@ -1,72 +0,0 @@ -# frozen_string_literal: true - -# Copyright, 2019, by Samuel G. D. Williams. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -require 'benchmark/ips' -require 'async' - -RSpec.describe Async::Wrapper do - let(:pipe) {IO.pipe} - - after do - pipe.each(&:close) - end - - let(:input) {described_class.new(pipe.first)} - let(:output) {described_class.new(pipe.last)} - - it "should be fast to wait until readable" do - Benchmark.ips do |x| - x.report('Wrapper#wait_readable') do |repeats| - Async do |task| - input = Async::Wrapper.new(pipe.first, task.reactor) - output = pipe.last - - repeats.times do - output.write(".") - input.wait_readable - input.io.read(1) - end - - input.reactor = nil - end - end - - x.report('Reactor#register') do |repeats| - Async do |task| - input = pipe.first - monitor = task.reactor.register(input, :r) - output = pipe.last - - repeats.times do - output.write(".") - Async::Task.yield - input.read(1) - end - - monitor.close - end - end - - x.compare! - end - end -end diff --git a/spec/async/queue_spec.rb b/spec/async/queue_spec.rb index 1e62a00c..205dcb15 100644 --- a/spec/async/queue_spec.rb +++ b/spec/async/queue_spec.rb @@ -55,12 +55,9 @@ describe '#size' do it 'returns queue size' do - reactor.async do |task| - 10.times do |i| - subject.enqueue(i) - expect(subject.size).to be i + 1 - end - end + expect(subject.size).to be == 0 + subject.enqueue("Hello World") + expect(subject.size).to be == 1 end end diff --git a/spec/async/reactor_spec.rb b/spec/async/reactor_spec.rb index 46990da6..17865649 100644 --- a/spec/async/reactor_spec.rb +++ b/spec/async/reactor_spec.rb @@ -49,7 +49,7 @@ end end - describe '#run_once' do + describe '#run' do it "can run the reactor" do # Run the reactor for 1 second: task = subject.async do |task| @@ -59,23 +59,23 @@ expect(task).to be_running # This will resume the task, and then the reactor will be finished. - expect(subject.run_once).to be false + subject.run expect(task).to be_finished end it "can run one iteration" do - state = nil + state = :started subject.async do |task| - state = :started task.yield state = :finished end expect(state).to be :started - subject.run_once + subject.run + expect(state).to be :finished end end @@ -84,31 +84,35 @@ it "can print hierarchy" do subject.async do |parent| parent.async do |child| - child.sleep 1 + child.yield end - parent.sleep 1 + output = StringIO.new + subject.print_hierarchy(output, backtrace: false) + lines = output.string.lines + + expect(lines[0]).to be =~ /# producer.resume condition.signal(item) # (4) consumer.resume(value) @@ -290,7 +329,7 @@ producer.stop # (5) [producer is resumed already] producer.stop end - expect(items).to be == [1] + expect(items).to be == [1, 2] end end @@ -384,8 +423,7 @@ def sleep_forever expect{task.wait}.to raise_error(Async::TimeoutError) - # TODO replace this with task.result - task.wait rescue error = $! + error = task.result expect(error.backtrace).to include(/sleep_forever/) end end @@ -438,17 +476,19 @@ def sleep_forever it "will raise exceptions when checking result" do error_task = nil - error_task = reactor.async do |task| - raise RuntimeError, "brain not provided" + reactor.run do + error_task = reactor.async do |task| + raise RuntimeError, "brain not provided" + end + + expect do + error_task.wait + end.to raise_exception(RuntimeError, /brain/) end - - expect do - error_task.wait - end.to raise_exception(RuntimeError, /brain/) end it "will propagate exceptions after async operation" do - error_task = nil + error_task = innocent_task = nil error_task = reactor.async do |task| task.sleep(0.1) @@ -469,6 +509,30 @@ def sleep_forever end end + describe '#result' do + it 'does not raise exception' do + reactor.async do + task = reactor.async do + raise "The space time converter has failed." + end + + expect(task.result).to be_kind_of(RuntimeError) + end + end + + it 'does not wait for task completion' do + reactor.async do + task = reactor.async do |task| + task.sleep(1) + end + + expect(task.result).to be_nil + + task.stop + end + end + end + describe '#children' do it "enumerates children in same order they are created" do tasks = 10.times.map do |i| @@ -489,10 +553,12 @@ def sleep_forever end it "should show complete" do - apples_task = reactor.async do |task| + reactor.run do + apples_task = reactor.async do |task| + end + + expect(apples_task.to_s).to include "complete" end - - expect(apples_task.to_s).to include "complete" end end end diff --git a/spec/async/wrapper_spec.rb b/spec/async/wrapper_spec.rb deleted file mode 100644 index 8740e146..00000000 --- a/spec/async/wrapper_spec.rb +++ /dev/null @@ -1,203 +0,0 @@ -# frozen_string_literal: true - -# Copyright, 2017, by Samuel G. D. Williams. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -require "async/rspec" - -RSpec.describe Async::Wrapper do - include_context Async::RSpec::Reactor - - let(:pipe) {IO.pipe} - let(:input) {Async::Wrapper.new(pipe.last)} - let(:output) {Async::Wrapper.new(pipe.first)} - - after(:each) do - input.close unless input.closed? - output.close unless output.closed? - - expect(input.monitor).to be_nil - expect(output.monitor).to be_nil - end - - describe '#wait_readable' do - it "can wait to be readable" do - reader = reactor.async do - expect(output.wait_readable).to be_truthy - end - - input.io.write('Hello World') - reader.wait - end - - it "can timeout if no event occurs" do - expect do - output.wait_readable(0.1) - end.to raise_exception(Async::TimeoutError) - end - - it "can wait for readability in sequential tasks" do - reactor.async do - input.wait_writable(1) - input.io.write('Hello World') - end - - 2.times do - reactor.async do - expect(output.wait_readable(1)).to be_truthy - end.wait - end - end - - it "can be cancelled" do - reactor.async do - expect do - output.wait_readable - end.to raise_exception(Async::Wrapper::Cancelled) - end - - expect(output.monitor).to_not be_nil - end - end - - describe '#wait_writable' do - it "can wait to be writable" do - expect(input.wait_writable).to be_truthy - end - - it "can be cancelled while waiting to be readable" do - reactor.async do - expect do - input.wait_readable - end.to raise_exception(Async::Wrapper::Cancelled) - end - - # This reproduces the race condition that can occur if two tasks are resumed in sequence. - - # Resume task 1 which closes IO: - output.close - - # Resume task 2: - expect do - output.resume - end.to_not raise_exception - end - - it "can be cancelled" do - reactor.async do - expect do - input.wait_readable - end.to raise_exception(Async::Wrapper::Cancelled) - end - - expect(input.monitor).to_not be_nil - end - end - - describe "#wait_any" do - it "can wait for any events" do - reactor.async do - input.wait_any(1) - input.io.write('Hello World') - end - - expect(output.wait_readable(1)).to be_truthy - end - - it "can wait for readability in one task and writability in another" do - reactor.async do - expect do - input.wait_readable(1) - end.to raise_exception(Async::Wrapper::Cancelled) - end - - expect(input.monitor.interests).to be == :r - - reactor.async do - input.wait_writable - - input.close - output.close - end.wait - end - - it "fails if waiting on from multiple tasks" do - input.reactor = reactor - - reactor.async do - expect do - input.wait_readable - end.to raise_exception(Async::Wrapper::Cancelled) - end - - expect(input.monitor.interests).to be == :r - - reactor.async do - expect do - input.wait_readable - end.to raise_exception(Async::Wrapper::WaitError) - end - end - end - - describe '#reactor=' do - it 'can assign a wrapper to a reactor' do - input.reactor = reactor - - expect(input.reactor).to be == reactor - end - - it 'assigns current reactor when waiting for events' do - input.wait_writable - - expect(input.reactor).to be == reactor - end - end - - describe '#dup' do - let(:dup) {input.dup} - - it 'dups the underlying io' do - expect(dup.io).to_not eq input.io - - dup.close - - expect(input).to_not be_closed - end - end - - describe '#close' do - it "closes monitor when closing wrapper" do - input.wait_writable - expect(input.monitor).to_not be_nil - input.close - expect(input.monitor).to be_nil - end - - it "can't wait on closed wrapper" do - input.close - output.close - - expect do - output.wait_readable - end.to raise_exception(IOError, /closed stream/) - end - end -end diff --git a/spec/enumerator_spec.rb b/spec/enumerator_spec.rb index 1a8c5e47..a62068b3 100644 --- a/spec/enumerator_spec.rb +++ b/spec/enumerator_spec.rb @@ -27,57 +27,62 @@ def some_yielder(task) task.sleep(0.002) yield 2 end - + def enum(task) to_enum(:some_yielder, task) end - + it "should play well with Enumerator as internal iterator" do # no fiber really used in internal iterator, # but let this test be here for completness - ar = nil + result = nil + Async do |task| - ar = enum(task).to_a + result = enum(task).to_a end - expect(ar).to be == [1, 2] + + expect(result).to be == [1, 2] end - - it "should play well with Enumerator as external iterator", pending: "expected failure" do - ar = [] + + it "should play well with Enumerator as external iterator" do + result = [] + Async do |task| - en = enum(task) - ar << en.next - ar << en.next - ar << begin en.next rescue $! end + enumerator = enum(task) + result << enumerator.next + result << enumerator.next + result << begin enumerator.next rescue $! end end - expect(ar[0]).to be == 1 - expect(ar[1]).to be == 2 - expect(ar[2]).to be_a StopIteration + + expect(result[0]).to be == 1 + expect(result[1]).to be == 2 + expect(result[2]).to be_a StopIteration end - - it "should play well with Enumerator.zip(Enumerator) method", pending: "expected failure" do + + it "should play well with Enumerator.zip(Enumerator) method" do Async do |task| - ar = [:a, :b, :c, :d].each.zip(enum(task)) - expect(ar).to be == [[:a, 1], [:b, 2], [:c, nil], [:d, nil]] - end.wait + result = [:a, :b, :c, :d].each.zip(enum(task)) + expect(result).to be == [[:a, 1], [:b, 2], [:c, nil], [:d, nil]] + end end - - it "should play with explicit Fiber usage", pending: "expected failure" do - ar = [] + + it "should play well with explicit Fiber usage" do + result = [] + Async do |task| - fib = Fiber.new { + fiber = Fiber.new do Fiber.yield 1 task.sleep(0.002) Fiber.yield 2 - } - ar << fib.resume - ar << fib.resume - ar << fib.resume - ar << begin en.next rescue $! end + end + + result << fiber.resume + result << fiber.resume + result << fiber.resume end - expect(ar[0]).to be == 1 - expect(ar[1]).to be == 2 - expect(ar[2]).to be nil - expect(ar[3]).to be_a FiberError + + expect(result[0]).to be == 1 + expect(result[1]).to be == 2 + expect(result[2]).to be nil end end \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 677bd84a..4c29aa44 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -3,6 +3,8 @@ require 'async/rspec' require 'covered/rspec' +require 'console' + if RUBY_PLATFORM =~ /darwin/ Q = 20 else From 3a5cd18fbdef3265e03f936e07c4428b2bc20d53 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 6 Jun 2021 16:15:07 +1200 Subject: [PATCH 2/9] Don't wait for blocking operations to complete as this can stall transient tasks from finishing. --- lib/async/scheduler.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 39236559..ba66a430 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -90,6 +90,7 @@ def kernel_sleep(duration) self.block(nil, duration) end + # Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue. # @reentrant Not thread safe. def block(blocker, timeout) # $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})" @@ -164,7 +165,7 @@ def process_wait(pid, flags) end def finished? - super && @ready.empty? && @running.empty? && @blocked.zero? + super && @ready.empty? && @running.empty? end # Run one iteration of the event loop. From 7be3ed2c7da686bcdac3ff85385587992f398835 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 7 Jun 2021 13:12:30 +1200 Subject: [PATCH 3/9] We should only need to onsider active tasks. --- lib/async/scheduler.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index ba66a430..fecfb9cb 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -164,10 +164,6 @@ def process_wait(pid, flags) end.value end - def finished? - super && @ready.empty? && @running.empty? - end - # Run one iteration of the event loop. # @param timeout [Float | nil] the maximum timeout, or if nil, indefinite. # @return [Boolean] whether there is more work to do. From 32698e331ff24da4ba1260a50b0199f07ff667e7 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 12 Jun 2021 14:29:50 +1200 Subject: [PATCH 4/9] Use native implementation of `process_wait`. --- async.gemspec | 2 +- lib/async/scheduler.rb | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/async.gemspec b/async.gemspec index 964004dc..6c79a210 100644 --- a/async.gemspec +++ b/async.gemspec @@ -17,7 +17,7 @@ Gem::Specification.new do |spec| spec.add_dependency "console", "~> 1.10" - spec.add_dependency "event" + spec.add_dependency "event", "~> 0.5.0" spec.add_dependency "timers", "~> 4.1" spec.add_development_dependency "async-rspec", "~> 1.1" diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index fecfb9cb..64022e4d 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -159,9 +159,9 @@ def io_wait(io, events, timeout = nil) # @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`. # @returns [Process::Status] A process status instance. def process_wait(pid, flags) - Thread.new do - ::Process::Status.wait(pid, flags) - end.value + fiber = Fiber.current + + return @selector.process_wait(fiber, pid, flags) end # Run one iteration of the event loop. From 6bcbc682c16dc39d6b78862c719ee20378152b11 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 23 Jun 2021 18:52:53 +1200 Subject: [PATCH 5/9] Better organisaion of methods. # Conflicts: # lib/async/scheduler.rb --- lib/async/scheduler.rb | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 64022e4d..446c2c1c 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -56,6 +56,16 @@ def initialize(parent = nil, selector: nil) end end + def set! + Fiber.set_scheduler(self) + @loop = Fiber.current + end + + def clear! + Fiber.set_scheduler(nil) + @loop = nil + end + def interrupt @interrupt.signal('!') end @@ -86,10 +96,6 @@ def transfer @loop.transfer end - def kernel_sleep(duration) - self.block(nil, duration) - end - # Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue. # @reentrant Not thread safe. def block(blocker, timeout) @@ -124,14 +130,8 @@ def unblock(blocker, fiber) end end - def set! - Fiber.set_scheduler(self) - @loop = Fiber.current - end - - def clear! - Fiber.set_scheduler(nil) - @loop = nil + def kernel_sleep(duration) + self.block(nil, duration) end def io_wait(io, events, timeout = nil) From f36d350d0e163e6bffbeb073acdb41f56ea793e8 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 23 Jun 2021 18:44:18 +1200 Subject: [PATCH 6/9] Basic support for `address_resolv` using `Resolv`. --- lib/async/scheduler.rb | 5 +++++ spec/async/scheduler/address_spec.rb | 33 ++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 spec/async/scheduler/address_spec.rb diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 446c2c1c..5381e285 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -25,6 +25,7 @@ require 'console' require 'event' require 'timers' +require 'resolv' module Async class Scheduler < Node @@ -134,6 +135,10 @@ def kernel_sleep(duration) self.block(nil, duration) end + def address_resolve(hostname) + ::Resolv.getaddresses(hostname) + end + def io_wait(io, events, timeout = nil) fiber = Fiber.current diff --git a/spec/async/scheduler/address_spec.rb b/spec/async/scheduler/address_spec.rb new file mode 100644 index 00000000..55274f9c --- /dev/null +++ b/spec/async/scheduler/address_spec.rb @@ -0,0 +1,33 @@ +# Copyright, 2021, by Samuel G. D. Williams. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +require 'async/scheduler' + +RSpec.describe Async::Scheduler, if: Async::Scheduler.supported? do + include_context Async::RSpec::Reactor + + describe ::Addrinfo do + it "can resolve addresses" do + addresses = Addrinfo.getaddrinfo("www.google.com", "80") + + expect(addresses).to_not be_empty + end + end +end From 63b657b3e3a13eeb1325c597aa88327d2112398f Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 15 Jul 2021 18:49:37 +1200 Subject: [PATCH 7/9] More robust handling of condition cancellation. --- lib/async/condition.rb | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/lib/async/condition.rb b/lib/async/condition.rb index 5da7400b..59e991ee 100644 --- a/lib/async/condition.rb +++ b/lib/async/condition.rb @@ -31,17 +31,31 @@ def initialize @waiting = [] end + Queue = Struct.new(:fiber) do + def transfer(*arguments) + fiber&.transfer(*arguments) + end + + def alive? + fiber&.alive? + end + + def nullify + self.fiber = nil + end + end + + private_constant :Queue + # Queue up the current fiber and wait on yielding the task. # @return [Object] def wait - fiber = Fiber.current - @waiting << fiber + queue = Queue.new(Fiber.current) + @waiting << queue Fiber.scheduler.transfer - rescue Exception - # It would be nice if there was a better construct for this. We only need to invoke #delete if the task was not resumed normally. This can only occur with `raise` and `throw`. But there is no easy way to detect this. - @waiting.delete(fiber) - raise + ensure + queue.nullify end # Is any fiber waiting on this notification? From 117904ad3a727b22b458a378ee6663c1c67b83b6 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 15 Jul 2021 18:50:00 +1200 Subject: [PATCH 8/9] Less verbose `Node#inspect`. --- lib/async/node.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/async/node.rb b/lib/async/node.rb index 0c331790..28c48013 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -219,6 +219,8 @@ def description if @annotation "#{@object_name} #{@annotation}" + elsif line = self.backtrace(0, 1)&.first + "#{@object_name} #{line}" else @object_name end @@ -232,6 +234,8 @@ def to_s "\#<#{self.description}>" end + alias inspect to_s + # Change the parent of this node. # @param parent [Node, nil] the parent to attach to, or nil to detach. # @return [self] From 9d2790b695afddee2d248c40acd783db81388593 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 15 Jul 2021 20:39:30 +1200 Subject: [PATCH 9/9] Event loop updates. --- async.gemspec | 4 +- lib/async/debug/selector.rb | 2 +- lib/async/scheduler.rb | 80 +++++++++++++---------------- lib/async/task.rb | 3 +- spec/async/barrier_spec.rb | 1 - spec/async/reactor_spec.rb | 5 +- spec/async/scheduler/thread_spec.rb | 1 + spec/async/scheduler_spec.rb | 2 - spec/async/task_spec.rb | 7 +++ 9 files changed, 50 insertions(+), 55 deletions(-) diff --git a/async.gemspec b/async.gemspec index 6c79a210..78a56019 100644 --- a/async.gemspec +++ b/async.gemspec @@ -13,11 +13,11 @@ Gem::Specification.new do |spec| spec.files = Dir.glob('{lib}/**/*', File::FNM_DOTMATCH, base: __dir__) - spec.required_ruby_version = ">= 3.1.0" + spec.required_ruby_version = ">= 3.0.2" spec.add_dependency "console", "~> 1.10" - spec.add_dependency "event", "~> 0.5.0" + spec.add_dependency "event", "~> 0.9.4" spec.add_dependency "timers", "~> 4.1" spec.add_development_dependency "async-rspec", "~> 1.1" diff --git a/lib/async/debug/selector.rb b/lib/async/debug/selector.rb index d3bed797..3954418a 100644 --- a/lib/async/debug/selector.rb +++ b/lib/async/debug/selector.rb @@ -27,7 +27,7 @@ module Async module Debug class Selector < Event::Debug::Selector def initialize(selector = nil) - super(selector || Event::Backend.new(Fiber.current)) + super(selector || Event::Selector.new(Fiber.current)) end end end diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 5381e285..a4ea32df 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -36,19 +36,14 @@ def self.supported? def initialize(parent = nil, selector: nil) super(parent) - @selector = selector || Event::Backend.new(Fiber.current) - @timers = Timers::Group.new - - @ready = [] - @running = [] + @selector = selector || ::Event::Selector.new(Fiber.current) + @timers = ::Timers::Group.new @guard = Mutex.new @interrupted = false @blocked = 0 @unblocked = [] - @loop = nil - @interrupt = Interrupt.new(@selector) do |event| case event when '!' @@ -59,44 +54,47 @@ def initialize(parent = nil, selector: nil) def set! Fiber.set_scheduler(self) - @loop = Fiber.current end def clear! Fiber.set_scheduler(nil) - @loop = nil end + # Interrupt the event loop. def interrupt @interrupt.signal('!') end - # Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor. - # @param fiber [#resume] The object to be resumed on the next iteration of the run-loop. - def << fiber - @ready << fiber + # Transfer from the calling fiber to the event loop. + def transfer + @selector.transfer end # Yield the current fiber and resume it on the next iteration of the event loop. def yield - @ready << Fiber.current - @loop.transfer + @selector.yield + end + + # Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor. + # @param fiber [#resume] The object to be resumed on the next iteration of the run-loop. + def push(fiber) + @selector.push(fiber) + end + + alias << push + + def raise(*arguments) + @selector.raise(*arguments) end def resume(fiber, *arguments) - if @loop - @ready << Fiber.current - fiber.transfer(*arguments) + if Fiber.scheduler + @selector.resume(fiber, *arguments) else - @ready << fiber + @selector.push(fiber) end end - # Transfer from te calling fiber to the event loop. - def transfer - @loop.transfer - end - # Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue. # @reentrant Not thread safe. def block(blocker, timeout) @@ -113,7 +111,7 @@ def block(blocker, timeout) begin @blocked += 1 - @loop.transfer + @selector.transfer ensure @blocked -= 1 end @@ -159,33 +157,27 @@ def io_wait(io, events, timeout = nil) timer&.cancel end + # def io_read(io, buffer, length) + # @selector.io_read(Fiber.current, io, buffer, length) + # end + # + # def io_write(io, buffer, length) + # @selector.io_write(Fiber.current, io, buffer, length) + # end + # Wait for the specified process ID to exit. # @parameter pid [Integer] The process ID to wait for. # @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`. # @returns [Process::Status] A process status instance. def process_wait(pid, flags) - fiber = Fiber.current - - return @selector.process_wait(fiber, pid, flags) + return @selector.process_wait(Fiber.current, pid, flags) end # Run one iteration of the event loop. # @param timeout [Float | nil] the maximum timeout, or if nil, indefinite. # @return [Boolean] whether there is more work to do. def run_once(timeout = nil) - raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? - # Console.logger.info(self) {"@ready = #{@ready} @running = #{@running}"} - - if @ready.any? - # running used to correctly answer on `finished?`, and to reuse Array object. - @running, @ready = @ready, @running - - @running.each do |fiber| - fiber.transfer if fiber.alive? - end - - @running.clear - end + Kernel::raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? if @unblocked.any? unblocked = Array.new @@ -199,7 +191,7 @@ def run_once(timeout = nil) end end - if @ready.empty? and @unblocked.empty? + if !@selector.ready? and @unblocked.empty? interval = @timers.wait_interval else # if there are tasks ready to execute, don't sleep: @@ -244,7 +236,7 @@ def run_once(timeout = nil) # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided. def run(*arguments, **options, &block) - raise RuntimeError, 'Reactor has been closed' if @selector.nil? + Kernel::raise RuntimeError, 'Reactor has been closed' if @selector.nil? initial_task = self.async(*arguments, **options, &block) if block_given? @@ -261,7 +253,7 @@ def close # This is a critical step. Because tasks could be stored as instance variables, and since the reactor is (probably) going out of scope, we need to ensure they are stopped. Otherwise, the tasks will belong to a reactor that will never run again and are not stopped. self.terminate - raise "Closing scheduler with blocked operations!" if @blocked > 0 + Kernel::raise "Closing scheduler with blocked operations!" if @blocked > 0 @guard.synchronize do @interrupt.close diff --git a/lib/async/task.rb b/lib/async/task.rb index 3b43d122..fbcc295d 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -190,8 +190,7 @@ def stop(later = false) end elsif @fiber&.alive? begin - Fiber.scheduler << Fiber.current - @fiber.raise(Stop) + Fiber.scheduler.raise(@fiber, Stop) rescue FiberError Fiber.scheduler << Stop::Later.new(self) end diff --git a/spec/async/barrier_spec.rb b/spec/async/barrier_spec.rb index ab21fc7e..d74df391 100644 --- a/spec/async/barrier_spec.rb +++ b/spec/async/barrier_spec.rb @@ -23,7 +23,6 @@ require 'async/barrier' require 'async/clock' require 'async/rspec' - require 'async/semaphore' require_relative 'chainable_async_examples' diff --git a/spec/async/reactor_spec.rb b/spec/async/reactor_spec.rb index 17865649..91903593 100644 --- a/spec/async/reactor_spec.rb +++ b/spec/async/reactor_spec.rb @@ -149,13 +149,12 @@ subject.async do |task| events << true + thread.join end subject.run - puts "join" - thread.join - + expect(thread).to_not be_alive expect(subject).to be_stopped end end diff --git a/spec/async/scheduler/thread_spec.rb b/spec/async/scheduler/thread_spec.rb index 9a73a150..867c1619 100644 --- a/spec/async/scheduler/thread_spec.rb +++ b/spec/async/scheduler/thread_spec.rb @@ -24,6 +24,7 @@ include_context Async::RSpec::Reactor describe ::Thread do + # I saw this hang. it "can wait for value" do value = Thread.new do sleep(0) diff --git a/spec/async/scheduler_spec.rb b/spec/async/scheduler_spec.rb index bb01add1..4c9062fa 100644 --- a/spec/async/scheduler_spec.rb +++ b/spec/async/scheduler_spec.rb @@ -116,8 +116,6 @@ expect(waiting).to be == 3 queue.close - - puts "Done." end end diff --git a/spec/async/task_spec.rb b/spec/async/task_spec.rb index 4c70ef2f..10383c01 100644 --- a/spec/async/task_spec.rb +++ b/spec/async/task_spec.rb @@ -22,6 +22,7 @@ require 'async' require 'async/clock' +require 'async/queue' RSpec.describe Async::Task do let(:reactor) {Async::Reactor.new} @@ -229,10 +230,13 @@ reactor.run do reactor.async do |task| parent_task = task + task.async do |task| child_task = task + task.sleep(10) end + task.sleep(10) end @@ -244,6 +248,9 @@ parent_task.stop + # We need to yield here to allow the tasks to be terminated. The parent task raises an exception in the child task and adds itself to the selector ready queue. It takes at least one iteration for the parent task to exit as well: + reactor.yield + expect(parent_task).to_not be_alive expect(child_task).to_not be_alive end