class Async::Reactor
def run(*args, &block)
Run the reactor until either all tasks complete or {#stop} is invoked.
def run(*args, &block) raise RuntimeError, 'Reactor has been closed' if @selector.nil? @stopped = false # Allow the user to kick of the initial async tasks. initial_task = async(*args, &block) if block_given? @timers.wait do |interval| if @ready.any? @ready.each do |fiber| fiber.resume if fiber.alive? end @ready.clear # The above tasks may schedule, cancel or affect timers in some way. We need to compute a new wait interval for the blocking selector call below: interval = @timers.wait_interval end # - nil: no timers # - -ve: timers expired already # - 0: timers ready to fire # - +ve: timers waiting to fire if interval && interval < 0 interval = 0 end # Async.logger.debug(self) {"Updating #{@children.count} children..."} # As timeouts may have been updated, and caused fibers to complete, we should check this. # If there is nothing to do, then finish: if !interval && self.finished? return initial_task end # Async.logger.debug(self) {"Selecting with #{@children.count} fibers interval = #{interval.inspect}..."} if monitors = @selector.select(interval) monitors.each do |monitor| monitor.value.resume end end end until @stopped return initial_task ensure Async.logger.debug(self) {"Exiting run-loop because #{$! ? $!.inspect : 'finished'}."} @stopped = true end