class Async::Barrier
@public Since *Async v1*.
A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}.
def async(*arguments, parent: (@parent or Task.current), **options, &block)
Execute a child task and add it to the barrier.
def async(*arguments, parent: (@parent or Task.current), **options, &block) raise "Barrier is stopped!" if @finished.closed? waiting = nil parent.async(*arguments, **options) do |task, *arguments| waiting = TaskNode.new(task) @tasks.append(waiting) block.call(task, *arguments) ensure @finished.signal(waiting) unless @finished.closed? end end
def empty?
Whether there are any tasks being held by the barrier.
def empty? @tasks.empty? end
def initialize(parent: nil)
@parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks.
Initialize the barrier.
def initialize(parent: nil) @tasks = List.new @finished = Queue.new @parent = parent end
def size
def size @tasks.size end
def stop
Stop all tasks held by the barrier.
def stop @tasks.each do |waiting| waiting.task.stop end @finished.close end
def wait
@yields {|task| ...} If a block is given, the unwaited task is yielded. You must invoke {Task#wait} yourself. In addition, you may `break` if you have captured enough results.
Wait for all tasks to complete by invoking {Task#wait} on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.
def wait while !@tasks.empty? # Wait for a task to finish (we get the task node): return unless waiting = @finished.wait # Remove the task as it is now finishing: @tasks.remove?(waiting) # Get the task: task = waiting.task # If a block is given, the user can implement their own behaviour: if block_given? yield task else # Wait for it to either complete or raise an error: task.wait end end end