Skip to content

Take advantage of Ruby 3 scheduler. #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions .github/workflows/development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ jobs:
- macos

ruby:
- 2.5
- 2.6
- 2.7
- 3.0
- head

experimental: [false]
env: [""]
Expand All @@ -34,7 +31,7 @@ jobs:
ruby: head
experimental: true
- os: ubuntu
ruby: 2.6
ruby: head
env: COVERAGE=PartialSummary,Coveralls
experimental: true

Expand Down
5 changes: 3 additions & 2 deletions async.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ Gem::Specification.new do |spec|

spec.files = Dir.glob('{lib}/**/*', File::FNM_DOTMATCH, base: __dir__)

spec.required_ruby_version = ">= 2.5.0"
spec.required_ruby_version = ">= 3.0.2"

spec.add_dependency "console", "~> 1.10"
spec.add_dependency "nio4r", "~> 2.3"

spec.add_dependency "event", "~> 0.9.4"
spec.add_dependency "timers", "~> 4.1"

spec.add_development_dependency "async-rspec", "~> 1.1"
Expand Down
2 changes: 1 addition & 1 deletion examples/fibers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def timeout(duration)

timer = reactor.add_timer(duration) do
if self.alive?
error = Fiber::TimeoutError.new("execution expired")
error = Fiber::TimeoutError.new
error.set_backtrace backtrace
self.resume error
end
Expand Down
3 changes: 3 additions & 0 deletions gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

gemspec

# gem "event", path: "../event"
# gem "async-rspec", path: "../async-rspec"

group :maintenance, optional: true do
gem "bake-bundler"
gem "bake-modernize"
Expand Down
5 changes: 2 additions & 3 deletions lib/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
# THE SOFTWARE.

require_relative "async/version"
require_relative "async/logger"
require_relative "async/reactor"

require_relative "kernel/async"
require_relative "kernel/sync"

module Async
# Invoke `Reactor.run` with all arguments/block.
def self.run(*arguments, &block)
Reactor.run(*arguments, &block)
def self.run(...)
Reactor.run(...)
end
end
32 changes: 22 additions & 10 deletions lib/async/condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,31 @@ def initialize
@waiting = []
end

Queue = Struct.new(:fiber) do
def transfer(*arguments)
fiber&.transfer(*arguments)
end

def alive?
fiber&.alive?
end

def nullify
self.fiber = nil
end
end

private_constant :Queue

# Queue up the current fiber and wait on yielding the task.
# @return [Object]
def wait
fiber = Fiber.current
@waiting << fiber

Task.yield
queue = Queue.new(Fiber.current)
@waiting << queue

# It would be nice if there was a better construct for this. We only need to invoke #delete if the task was not resumed normally. This can only occur with `raise` and `throw`. But there is no easy way to detect this.
# ensure when not return or ensure when raise, throw
rescue Exception
@waiting.delete(fiber)
raise
Fiber.scheduler.transfer
ensure
queue.nullify
end

# Is any fiber waiting on this notification?
Expand All @@ -61,7 +73,7 @@ def signal(value = nil)
@waiting = []

waiting.each do |fiber|
fiber.resume(value) if fiber.alive?
Fiber.scheduler.resume(fiber, value) if fiber.alive?
end

return nil
Expand Down
60 changes: 6 additions & 54 deletions lib/async/debug/selector.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com>
# Copyright, 2021, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
Expand All @@ -20,62 +20,14 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative 'monitor'
require_relative '../logger'

require 'nio'
require 'set'
require 'fiber'
require 'event/debug/selector'

module Async
module Debug
class LeakError < RuntimeError
def initialize(monitors)
super "Trying to close selector with active monitors: #{monitors.inspect}! This may cause your socket or file descriptor to leak."
end
end

class Selector
def initialize(selector = NIO::Selector.new)
@selector = selector
@monitors = Set.new
end

def register(object, interests)
Async.logger.debug(self) {"Registering #{object.inspect} for #{interests}."}

unless io = ::IO.try_convert(object)
raise RuntimeError, "Could not convert #{io} into IO!"
end

monitor = Monitor.new(@selector.register(object, interests), self)

@monitors.add(monitor)

return monitor
end

def deregister(monitor)
Async.logger.debug(self) {"Deregistering #{monitor.inspect}."}

unless @monitors.delete?(monitor)
raise RuntimeError, "Trying to remove monitor for #{monitor.inspect} but it was not registered!"
end
end

def wakeup
@selector.wakeup
end

def close
if @monitors.any?
raise LeakError, @monitors
end
ensure
@selector.close
end

def select(*arguments)
@selector.select(*arguments)
class Selector < Event::Debug::Selector
def initialize(selector = nil)
super(selector || Event::Selector.new(Fiber.current))
end
end
end
Expand Down
45 changes: 24 additions & 21 deletions lib/async/debug/monitor.rb → lib/async/interrupt.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
# frozen_string_literal: true

# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com>
# Copyright, 2020, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
Expand All @@ -20,28 +18,33 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require 'delegate'

module Async
module Debug
class Monitor < Delegator
def initialize(monitor, selector)
@monitor = monitor
@selector = selector
end

def __getobj__
@monitor
end
# A thread safe synchronisation primative.
class Interrupt
def initialize(scheduler, &block)
@scheduler = scheduler
@input, @output = IO.pipe

def close
@selector.deregister(self)
@monitor.close
@fiber = Fiber.new do
while true
@scheduler.io_wait(@fiber, @input, ::Event::READABLE)
block.call(@input.read_nonblock(1))
end
end

def inspect
"\#<#{self.class} io=#{@monitor.io.inspect} interests=#{@monitor.interests.inspect} readiness=#{@monitor.readiness.inspect}>"
end
@fiber.transfer
end

def signal(event = '.')
@output.write('.')
@output.flush
end

def close
@input.close
@output.close
end
end

private_constant :Interrupt
end
14 changes: 13 additions & 1 deletion lib/async/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ def initialize(parent = nil, annotation: nil, transient: false)
end
end

def root
@parent&.root || self
end

# You should not directly rely on these pointers but instead use `#children`.
# List pointers:
attr_accessor :head
Expand Down Expand Up @@ -215,6 +219,8 @@ def description

if @annotation
"#{@object_name} #{@annotation}"
elsif line = self.backtrace(0, 1)&.first
"#{@object_name} #{line}"
else
@object_name
end
Expand All @@ -225,9 +231,11 @@ def backtrace(*arguments)
end

def to_s
"\#<#{description}>"
"\#<#{self.description}>"
end

alias inspect to_s

# Change the parent of this node.
# @param parent [Node, nil] the parent to attach to, or nil to detach.
# @return [self]
Expand Down Expand Up @@ -329,6 +337,10 @@ def stop(later = false)
end
end

def stopped?
@children.nil?
end

def print_hierarchy(out = $stdout, backtrace: true)
self.traverse do |node, level|
indent = "\t" * level
Expand Down
6 changes: 3 additions & 3 deletions lib/async/notification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Notification < Condition
def signal(value = nil, task: Task.current)
return if @waiting.empty?

task.reactor << Signal.new(@waiting, value)
Fiber.scheduler << Signal.new(@waiting, value)

@waiting = []

Expand All @@ -42,9 +42,9 @@ def alive?
true
end

def resume
def transfer
waiting.each do |fiber|
fiber.resume(value) if fiber.alive?
fiber.transfer(value) if fiber.alive?
end
end
end
Expand Down
Loading