Skip to content

Use a linked list for the barrier implementation. #192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions lib/async/barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Released under the MIT License.
# Copyright, 2019-2022, by Samuel Williams.

require_relative 'list'
require_relative 'task'

module Async
Expand All @@ -13,25 +14,33 @@ class Barrier
# @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks.
# @public Since `stable-v1`.
def initialize(parent: nil)
@tasks = []
@tasks = List.new

@parent = parent
end

# All tasks which have been invoked into the barrier.
attr :tasks
class Waiting < List::Node
def initialize(task)
@task = task
end

attr :task
end

# The number of tasks currently held by the barrier.
# Number of tasks being held by the barrier.
def size
@tasks.size
end

# All tasks which have been invoked into the barrier.
attr :tasks

# Execute a child task and add it to the barrier.
# @asynchronous Executes the given block concurrently.
def async(*arguments, parent: (@parent or Task.current), **options, &block)
task = parent.async(*arguments, **options, &block)

@tasks << task
@tasks.append(Waiting.new(task))

return task
end
Expand All @@ -42,31 +51,22 @@ def empty?
@tasks.empty?
end

# Wait for all tasks.
# Wait for all tasks to complete. You will still want to wait for individual tasks to complete if you want to handle errors.
# @asynchronous Will wait for tasks to finish executing.
def wait
# TODO: This would be better with linked list.
while @tasks.any?
task = @tasks.first

begin
task.wait
ensure
# We don't know for sure that the exception was due to the task completion.
unless task.running?
# Remove the task from the waiting list if it's finished:
@tasks.shift if @tasks.first == task
end
end
while waiting = @tasks.first
task = waiting.task
task.join
@tasks.remove?(waiting)
end
end

# Stop all tasks held by the barrier.
# @asynchronous May wait for tasks to finish executing.
def stop
# We have to be careful to avoid enumerating tasks while adding/removing to it:
tasks = @tasks.dup
tasks.each(&:stop)
@tasks.each do |waiting|
waiting.task.stop
end
end
end
end
9 changes: 3 additions & 6 deletions lib/async/condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ def alive?
# Queue up the current fiber and wait on yielding the task.
# @returns [Object]
def wait
waiter = Waiter.new(Fiber.current)
@waiting.append(waiter)

Fiber.scheduler.transfer
ensure
waiter.delete!
@waiting.stack(Waiter.new(Fiber.current)) do
Fiber.scheduler.transfer
end
end

# Is any fiber waiting on this notification?
Expand Down
64 changes: 44 additions & 20 deletions lib/async/list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class List
def initialize
@head = self
@tail = self
@size = 0
end

# @private
Expand All @@ -16,6 +17,13 @@ def initialize
# @private
attr_accessor :tail

attr :size

def added(node)
@size += 1
return node
end

# Append a node to the end of the list.
def append(node)
if node.head
Expand All @@ -27,7 +35,7 @@ def append(node)
node.head = @head
@head = node

return node
return added(node)
end

def prepend(node)
Expand All @@ -40,24 +48,48 @@ def prepend(node)
node.tail = @tail
@tail = node

return added(node)
end

# Add the node, yield, and the remove the node.
def stack(node, &block)
append(node)
yield
ensure
remove!(node)
end

def removed(node)
@size -= 1
return node
end

def delete(node)
# One downside of this interface is we don't actually check if the node is part of the list defined by `self`. This means that there is a potential for a node to be deleted from a different list using this method, which in can throw off book-keeping when lists track size, etc.

# Remove the node if it is in the list.
def remove?(node)
if node.head
remove!(node)
end
end

# Remove the node.
def remove(node)
# One downside of this interface is we don't actually check if the node is part of the list defined by `self`. This means that there is a potential for a node to be removed from a different list using this method, which in can throw off book-keeping when lists track size, etc.
unless node.head
raise ArgumentError, "Node is not in a list!"
end

remove!(node)
end

private def remove!(node)
node.head.tail = node.tail
node.tail.head = node.head

# This marks the node as being deleted, and causes delete to fail if called a 2nd time.
# This marks the node as being removed, and causes remove to fail if called a 2nd time.
node.head = nil
# node.tail = nil

return node
return removed(node)
end

def empty?
Expand Down Expand Up @@ -92,28 +124,20 @@ def include?(needle)
end

def first
@tail
unless @tail.equal?(self)
@tail
end
end

def last
@head
unless @head.equal?(self)
@head
end
end
end

class List::Node
attr_accessor :head
attr_accessor :tail

# Delete the node from the list.
def delete!
@head.tail = @tail
@tail.head = @head
@head = nil

# See above deletion implementation for more details:
# @tail = nil

return self
end
end
end
34 changes: 10 additions & 24 deletions lib/async/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,14 @@ module Async
class Children < List
def initialize
super

@size = 0
@transient_count = 0
end

attr :size

# Does this node have (direct) transient children?
def transients?
@transient_count > 0
end

def append(node)
added(super)
end

undef prepend

def delete(node)
removed(super)
end

def finished?
@size == @transient_count
end
Expand All @@ -49,15 +35,15 @@ def added(node)
@transient_count += 1
end

@size += 1
return super
end

def removed(node)
if node.transient?
@transient_count -= 1
end

@size -= 1
return super
end
end

Expand Down Expand Up @@ -152,7 +138,7 @@ def parent=(parent)
return if @parent.equal?(parent)

if @parent
@parent.delete_child(self)
@parent.remove_child(self)
@parent = nil
end

Expand All @@ -173,8 +159,8 @@ def parent=(parent)
child.set_parent(self)
end

protected def delete_child(child)
@children.delete(child)
protected def remove_child(child)
@children.remove(child)
child.set_parent(nil)
end

Expand All @@ -189,15 +175,15 @@ def finished?
# the parent.
def consume
if parent = @parent and finished?
parent.delete_child(self)
parent.remove_child(self)

if @children
@children.each do |child|
if child.finished?
delete_child(child)
remove_child(child)
else
# In theory we don't need to do this... because we are throwing away the list. However, if you don't correctly update the list when moving the child to the parent, it foobars the enumeration, and subsequent nodes will be skipped, or in the worst case you might start enumerating the parents nodes.
delete_child(child)
remove_child(child)
parent.add_child(child)
end
end
Expand All @@ -209,14 +195,14 @@ def consume
end
end

# Traverse the task tree.
# @yields {|node, level| ...} The node and the level relative to the given root.
def traverse(&block)
return enum_for(:traverse) unless block_given?

self.traverse_recurse(&block)
end

# Traverse the tree.
# @yields {|node, level| ...} The node and the level relative to the given root.
protected def traverse_recurse(level = 0, &block)
yield self, level

Expand Down
18 changes: 11 additions & 7 deletions lib/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,26 @@ def async(*arguments, **options, &block)
return task
end

# Retrieve the current result of the task. Will cause the caller to wait until result is available. If the result was an exception, raise that exception.
# @raises [RuntimeError] If the task's fiber is the current fiber.
# @returns [Object] The final expression/result of the task's block.
def wait
def join
raise "Cannot wait on own fiber" if Fiber.current.equal?(@fiber)

if running?
@finished ||= Condition.new
@finished.wait
end

case @result
return @result
end

# Retrieve the current result of the task. Will cause the caller to wait until result is available. If the result was an exception, raise that exception.
# @raises [RuntimeError] If the task's fiber is the current fiber.
# @returns [Object] The final expression/result of the task's block.
def wait
case result = self.join
when Exception
raise @result
raise result
else
return @result
return result
end
end

Expand Down
4 changes: 2 additions & 2 deletions test/async/barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
expect(task1).to be(:failed?)
expect(task2).to be(:finished?)

expect{barrier.wait}.to raise_exception(RuntimeError, message: be =~ /Boom/)
barrier.wait

barrier.wait until barrier.empty?
expect{task1.wait}.to raise_exception(RuntimeError, message: be =~ /Boom/)

expect(barrier).to be(:empty?)
end
Expand Down
10 changes: 5 additions & 5 deletions test/async/children.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@
expect(children).not.to be(:empty?)
end

it "can't delete a child that hasn't been inserted" do
it "can't remove a child that hasn't been inserted" do
child = Async::Node.new

expect{children.delete(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/)
expect{children.remove(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/)
end

it "can't delete the child twice" do
it "can't remove the child twice" do
child = Async::Node.new
children.append(child)

children.delete(child)
children.remove(child)

expect{children.delete(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/)
expect{children.remove(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/)
end
end
end
Loading