class Async::Reactor
An asynchronous, cooperatively scheduled event reactor.
def self.run(*args, &block)
the block as an asynchronous task. Will block until the reactor finishes
- When invoked at the top level, will create and run a reactor, and invoke
asynchronously. Will return the task once it has been scheduled.
- When invoked within an existing reactor task, it will run the given block
The preferred method to invoke asynchronous behavior at the top level.
def self.run(*args, &block) if current = Task.current? reactor = current.reactor return reactor.async(*args, &block) else reactor = self.new begin return reactor.run(*args, &block) ensure reactor.close end end end
def << fiber
-
fiber
(#resume
) -- The object to be resumed on the next iteration of the run-loop.
def << fiber @ready << fiber end
def async(*args, &block)
-
(Task)
- The task that was scheduled into the reactor.
Other tags:
- Yield: - Executed within the task.
def async(*args, &block) task = Task.new(self, &block) # I want to take a moment to explain the logic of this. # When calling an async block, we deterministically execute it until the # first blocking operation. We don't *have* to do this - we could schedule # it for later execution, but it's useful to: # - Fail at the point of call where possible. # - Execute determinstically where possible. # - Avoid overhead if no blocking operation is performed. task.run(*args) # Async.logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..." return task end
def close
-
(void)
-
def close @children.each(&:stop) # TODO Should we also clear all timers? @selector.close @selector = nil end
def closed?
-
(Boolean)
-
def closed? @selector.nil? end
def finished?
def finished? super && @ready.empty? && @running.empty? end
def initialize(parent = nil, selector: NIO::Selector.new)
def initialize(parent = nil, selector: NIO::Selector.new) super(parent) @selector = selector @timers = Timers::Group.new @ready = [] @running = [] @stopped = true end
def register(io, interest, value = Fiber.current)
def register(io, interest, value = Fiber.current) monitor = @selector.register(io, interest) monitor.value = value return monitor end
def run(*args, &block)
Run the reactor until either all tasks complete or {#stop} is invoked.
def run(*args, &block) raise RuntimeError, 'Reactor has been closed' if @selector.nil? @stopped = false initial_task = self.async(*args, &block) if block_given? @timers.wait do |interval| # running used to correctly answer on `finished?`, and to reuse Array object. @running, @ready = @ready, @running if @running.any? @running.each do |fiber| fiber.resume if fiber.alive? end @running.clear # if there are tasks ready to execute, don't sleep. if @ready.any? interval = 0 else # The above tasks may schedule, cancel or affect timers in some way. We need to compute a new wait interval for the blocking selector call below: interval = @timers.wait_interval end end # - nil: no timers # - -ve: timers expired already # - 0: timers ready to fire # - +ve: timers waiting to fire if interval && interval < 0 interval = 0 end # Async.logger.debug(self) {"Updating #{@children.count} children..."} # As timeouts may have been updated, and caused fibers to complete, we should check this. # If there is nothing to do, then finish: if !interval && self.finished? return initial_task end # Async.logger.debug(self) {"Selecting with #{@children.count} fibers interval = #{interval.inspect}..."} if monitors = @selector.select(interval) monitors.each do |monitor| monitor.value.resume end end end until @stopped return initial_task ensure Async.logger.debug(self) {"Exiting run-loop because #{$! ? $!.inspect : 'finished'}."} @stopped = true end
def sleep(duration)
-
duration
(Numeric
) -- The time in seconds, to sleep for.
def sleep(duration) fiber = Fiber.current timer = self.after(duration) do if fiber.alive? fiber.resume end end Task.yield ensure timer.cancel if timer end
def stop
-
(void)
-
def stop unless @stopped @stopped = true @selector.wakeup end end
def stopped?
def stopped? @stopped end
def timeout(duration, exception = TimeoutError)
-
duration
(Numeric
) -- The time in seconds, in which the task should
def timeout(duration, exception = TimeoutError) backtrace = caller fiber = Fiber.current timer = self.after(duration) do if fiber.alive? error = exception.new("execution expired") error.set_backtrace backtrace fiber.resume error end end yield timer ensure timer.cancel if timer end
def to_s
def to_s "<#{self.description} stopped=#{@stopped}>" end
def yield(fiber = Fiber.current)
def yield(fiber = Fiber.current) @ready << fiber Fiber.yield end