Skip to content

Allow queue to be used for task finished. #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 0 additions & 99 deletions fixtures/a_condition.rb

This file was deleted.

103 changes: 103 additions & 0 deletions fixtures/async/a_condition.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018-2022, by Samuel Williams.

require 'async/variable'

module Async
ACondition = Sus::Shared("a condition") do
let(:condition) {subject.new}

it 'can signal waiting task' do
state = nil

reactor.async do
state = :waiting
condition.wait
state = :resumed
end

expect(state).to be == :waiting

condition.signal

reactor.yield

expect(state).to be == :resumed
end

it 'should be able to signal stopped task' do
expect(condition).to be(:empty?)
expect(condition).not.to be(:waiting?)

task = reactor.async do
condition.wait
end

expect(condition).not.to be(:empty?)
expect(condition).to be(:waiting?)

task.stop

condition.signal
end

it 'resumes tasks in order' do
order = []

5.times do |i|
task = reactor.async do
condition.wait
order << i
end
end

condition.signal

reactor.yield

expect(order).to be == [0, 1, 2, 3, 4]
end

with "timeout" do
let(:ready) {Async::Variable.new(condition)}
let(:waiting) {Async::Variable.new(subject.new)}

def before
@state = nil

@task = reactor.async do |task|
task.with_timeout(0.01) do
begin
@state = :waiting
waiting.resolve

ready.wait
@state = :signalled
rescue Async::TimeoutError
@state = :timeout
end
end
end

super
end

it 'can timeout while waiting' do
@task.wait

expect(@state).to be == :timeout
end

it 'can signal while waiting' do
waiting.wait
ready.resolve

@task.wait

expect(@state).to be == :signalled
end
end
end
end
8 changes: 6 additions & 2 deletions lib/async/condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ def wait
end
end

# Is any fiber waiting on this notification?
# @returns [Boolean]
# @deprecated Replaced by {#waiting?}
def empty?
@waiting.empty?
end

# @returns [Boolean] Is any fiber waiting on this notification?
def waiting?
@waiting.size > 0
end

# Signal to a given task that it should resume operations.
# @parameter value [Object | Nil] The value to return to the waiting fibers.
def signal(value = nil)
Expand Down
35 changes: 24 additions & 11 deletions lib/async/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@

module Async
# A queue which allows items to be processed in order.
#
# It has a compatible interface with {Notification} and {Condition}, except that it's multi-value.
#
# @public Since `stable-v1`.
class Queue < Notification
class Queue
# Create a new queue.
#
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
def initialize(parent: nil)
super()

# @parameter available [Notification] The notification to use for signaling when items are available.
def initialize(parent: nil, available: Notification.new)
@items = []
@parent = parent
@available = available
end

# @attribute [Array] The items in the queue.
Expand All @@ -38,20 +41,20 @@ def empty?
def <<(item)
@items << item

self.signal unless self.empty?
@available.signal unless self.empty?
end

# Add multiple items to the queue.
def enqueue(*items)
@items.concat(items)

self.signal unless self.empty?
@available.signal unless self.empty?
end

# Remove and return the next item from the queue.
def dequeue
while @items.empty?
self.wait
@available.wait
end

@items.shift
Expand All @@ -77,6 +80,16 @@ def each
yield item
end
end

# Signal the queue with a value, the same as {#enqueue}.
def signal(value)
self.enqueue(value)
end

# Wait for an item to be available, the same as {#dequeue}.
def wait
self.dequeue
end
end

# A queue which limits the number of items that can be enqueued.
Expand All @@ -85,12 +98,12 @@ class LimitedQueue < Queue
# Create a new limited queue.
#
# @parameter limit [Integer] The maximum number of items that can be enqueued.
def initialize(limit = 1, **options)
# @parameter full [Notification] The notification to use for signaling when the queue is full.
def initialize(limit = 1, full: Notification.new, **options)
super(**options)

@limit = limit

@full = Notification.new
@full = full
end

# @attribute [Integer] The maximum number of items that can be enqueued.
Expand Down Expand Up @@ -128,7 +141,7 @@ def enqueue(*items)
available = @limit - @items.size
@items.concat(items.shift(available))

self.signal unless self.empty?
@available.signal unless self.empty?
end
end

Expand Down
4 changes: 2 additions & 2 deletions test/async/condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
require 'sus/fixtures/async'
require 'async/condition'

require 'a_condition'
require 'async/a_condition'

describe Async::Condition do
include Sus::Fixtures::Async::ReactorContext
Expand Down Expand Up @@ -52,5 +52,5 @@
expect(consumer.status).to be == :completed
end

it_behaves_like ACondition
it_behaves_like Async::ACondition
end
4 changes: 2 additions & 2 deletions test/async/notification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
require 'sus/fixtures/async'
require 'async/notification'

require 'a_condition'
require 'async/a_condition'

describe Async::Notification do
include Sus::Fixtures::Async::ReactorContext
Expand Down Expand Up @@ -47,5 +47,5 @@
]
end

it_behaves_like ACondition
it_behaves_like Async::ACondition
end
32 changes: 32 additions & 0 deletions test/async/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,44 @@
end
end

with '#signal' do
it 'can signal with an item' do
queue.signal(:item)
expect(queue.dequeue).to be == :item
end
end

with '#wait' do
it 'can wait for an item' do
reactor.async do |task|
queue.enqueue(:item)
end

expect(queue.wait).to be == :item
end
end

with 'an empty queue' do
it "is expected to be empty" do
expect(queue).to be(:empty?)
end
end

with 'task finishing queue' do
it 'can signal task completion' do
3.times do
Async(finished: queue) do
:result
end
end

3.times do
task = queue.dequeue
expect(task.wait).to be == :result
end
end
end

with 'semaphore' do
let(:capacity) {2}
let(:semaphore) {Async::Semaphore.new(capacity)}
Expand Down
Loading