class Async::Reactor
def run(*args, &block)
def run(*args, &block) raise RuntimeError, 'Reactor has been closed' if @selector.nil? @stopped = false # Allow the user to kick of the initial async tasks. async(*args, &block) if block_given? @timers.wait do |interval| # - nil: no timers # - -ve: timers expired already # - 0: timers ready to fire # - +ve: timers waiting to fire interval = 0 if interval && interval < 0 Async.logger.debug{"[#{self} Pre] Updating #{@children.count} children..."} Async.logger.debug{@children.collect{|child| [child.to_s, child.alive?]}.inspect} # As timeouts may have been updated, and caused fibers to complete, we should check this. # If there is nothing to do, then finish: Async.logger.debug{"[#{self}] @children.empty? = #{@children.empty?} && interval #{interval.inspect}"} return if @children.empty? && interval.nil? Async.logger.debug{"Selecting with #{@children.count} fibers interval = #{interval}..."} if monitors = @selector.select(interval) monitors.each do |monitor| if fiber = monitor.value # Async.logger.debug "Resuming task #{task} due to IO..." fiber.resume end end end end until @stopped return self ensure Async.logger.debug{"[#{self} Ensure] Exiting run-loop (stopped: #{@stopped} exception: #{$!})..."} Async.logger.debug{@children.collect{|child| [child.to_s, child.alive?]}.inspect} @stopped = true end