class Async::Reactor
def run(*args, &block)
Run the reactor until either all tasks complete or {#stop} is invoked.
def run(*args, &block) raise RuntimeError, 'Reactor has been closed' if @selector.nil? @stopped = false initial_task = self.async(*args, &block) if block_given? @timers.wait do |interval| # logger.debug(self) {"@ready = #{@ready} @running = #{@running}"} if @ready.any? # running used to correctly answer on `finished?`, and to reuse Array object. @running, @ready = @ready, @running @running.each do |fiber| fiber.resume if fiber.alive? end @running.clear # if there are tasks ready to execute, don't sleep. if @ready.any? interval = 0 else # The above tasks may schedule, cancel or affect timers in some way. We need to compute a new wait interval for the blocking selector call below: interval = @timers.wait_interval end end # As timeouts may have been updated, and caused fibers to complete, we should check this. if interval.nil? if self.finished? # If there is nothing to do, then finish: return initial_task end elsif interval < 0 # We have timers ready to fire, don't sleep in the selctor: interval = 0 end # logger.debug(self) {"Selecting with #{@children&.count} children with interval = #{interval.inspect}..."} if monitors = @selector.select(interval) monitors.each do |monitor| monitor.value.resume end end end until @stopped return initial_task ensure logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} @stopped = true end