diff --git a/lib/async/limited_barrier.rb b/lib/async/limited_barrier.rb new file mode 100644 index 00000000..ee11b526 --- /dev/null +++ b/lib/async/limited_barrier.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2022, by Samuel Williams. + +module Async + # A composable synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore} and/or {Barrier}. + class LimitedBarrier + def initialize(parent: nil, finished: Async::Condition.new) + @finished = finished + @done = [] + + @parent = parent + end + + def async(parent: (@parent or Task.current), &block) + parent.async do |task| + yield(task) + ensure + @done << task + @finished.signal + end + end + + def wait_for(count) + while @done.size < count + @finished.wait + end + + return @done.shift(count) + end + + def wait(count) + wait_for(count).map(&:result) + end + end +end diff --git a/test/async/limited_barrier.rb b/test/async/limited_barrier.rb new file mode 100644 index 00000000..49965301 --- /dev/null +++ b/test/async/limited_barrier.rb @@ -0,0 +1,23 @@ + +require 'async/limited_barrier' +require 'sus/fixtures/async' + +describe Async::LimitedBarrier do + include Sus::Fixtures::Async::ReactorContext + + let(:limited_barrier) {subject.new} + + it "can wait for a subset of tasks" do + 3.times do + limited_barrier.async do + sleep(rand * 0.01) + end + end + + done = limited_barrier.wait(2) + expect(done.size).to be == 2 + + done = limited_barrier.wait(1) + expect(done.size).to be == 1 + end +end