lib/async/barrier.rb
# frozen_string_literal: true # Released under the MIT License. # Copyright, 2019-2022, by Samuel Williams. require_relative 'task' module Async # A synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}. # @public Since `stable-v1`. class Barrier # Initialize the barrier. # @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks. # @public Since `stable-v1`. def initialize(parent: nil) @tasks = [] @parent = parent end # All tasks which have been invoked into the barrier. attr :tasks # The number of tasks currently held by the barrier. def size @tasks.size end # Execute a child task and add it to the barrier. # @asynchronous Executes the given block concurrently. def async(*arguments, parent: (@parent or Task.current), **options, &block) task = parent.async(*arguments, **options, &block) @tasks << task return task end # Whether there are any tasks being held by the barrier. # @returns [Boolean] def empty? @tasks.empty? end # Wait for all tasks. # @asynchronous Will wait for tasks to finish executing. def wait # TODO: This would be better with linked list. while @tasks.any? task = @tasks.first begin task.wait ensure # We don't know for sure that the exception was due to the task completion. unless task.running? # Remove the task from the waiting list if it's finished: @tasks.shift if @tasks.first == task end end end end # Stop all tasks held by the barrier. # @asynchronous May wait for tasks to finish executing. def stop # We have to be careful to avoid enumerating tasks while adding/removing to it: tasks = @tasks.dup tasks.each(&:stop) end end end