class Async::Reactor
An asynchronous, cooperatively scheduled event reactor.
def self.run(*arguments, **options, &block)
the block as an asynchronous task. Will block until the reactor finishes
- When invoked at the top level, will create and run a reactor, and invoke
asynchronously. Will return the task once it has been scheduled.
- When invoked within an existing reactor task, it will run the given block
The preferred method to invoke asynchronous behavior at the top level.
def self.run(*arguments, **options, &block) if current = Task.current? reactor = current.reactor return reactor.async(*arguments, **options, &block) else reactor = self.new(**options) begin return reactor.run(*arguments, &block) ensure reactor.close end end end
def self.selector
def self.selector if backend = ENV['ASYNC_BACKEND']&.to_sym if NIO::Selector.backends.include?(backend) return NIO::Selector.new(backend) else warn "Could not find ASYNC_BACKEND=#{backend}!" end end return NIO::Selector.new end
def << fiber
-
fiber(#resume) -- The object to be resumed on the next iteration of the run-loop.
def << fiber @ready << fiber end
def async(*arguments, **options, &block)
-
(Task)- The task that was scheduled into the reactor.
Other tags:
- Yield: - Executed within the task.
def async(*arguments, **options, &block) task = Task.new(self, **options, &block) # I want to take a moment to explain the logic of this. # When calling an async block, we deterministically execute it until the # first blocking operation. We don't *have* to do this - we could schedule # it for later execution, but it's useful to: # - Fail at the point of the method call where possible. # - Execute determinstically where possible. # - Avoid scheduler overhead if no blocking operation is performed. task.run(*arguments) # logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..." return task end
def close
-
(void)-
def close self.stop # TODO Should we also clear all timers? @selector.close @selector = nil end
def closed?
-
(Boolean)-
def closed? @selector.nil? end
def finished?
def finished? # TODO I'm not sure if checking `@running.empty?` is really required. super && @ready.empty? && @running.empty? end
def initialize(parent = nil, selector: self.class.selector, logger: nil)
def initialize(parent = nil, selector: self.class.selector, logger: nil) super(parent) @selector = selector @timers = Timers::Group.new @logger = logger @ready = [] @running = [] @interrupted = false @guard = Mutex.new end
def interrupt
def interrupt @guard.synchronize do unless @interrupted @interrupted = true @selector.wakeup end end end
def logger
def logger @logger ||= Console.logger end
def register(io, interest, value = Fiber.current)
def register(io, interest, value = Fiber.current) monitor = @selector.register(io, interest) monitor.value = value return monitor end
def run(*arguments, &block)
invoked. Proxies arguments to {#async} immediately before entering the
Run the reactor until either all tasks complete or {#pause} or {#stop} is
def run(*arguments, &block) raise RuntimeError, 'Reactor has been closed' if @selector.nil? initial_task = self.async(*arguments, &block) if block_given? while self.run_once # Round and round we go! end return initial_task ensure logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end
def run_once(timeout = nil)
-
(Boolean)- whether there is more work to do.
Parameters:
-
timeout(Float | nil) -- the maximum timeout, or if nil, indefinite.
def run_once(timeout = nil) logger.debug(self) {"@ready = #{@ready} @running = #{@running}"} if @ready.any? # running used to correctly answer on `finished?`, and to reuse Array object. @running, @ready = @ready, @running @running.each do |fiber| fiber.resume if fiber.alive? end @running.clear end if @ready.empty? interval = @timers.wait_interval else # if there are tasks ready to execute, don't sleep: interval = 0 end # If there is no interval to wait (thus no timers), and no tasks, we could be done: if interval.nil? if self.finished? # If there is nothing to do, then finish: return false end # Allow the user to specify a maximum interval if we would otherwise be sleeping indefinitely: interval = timeout elsif interval < 0 # We have timers ready to fire, don't sleep in the selctor: interval = 0 elsif timeout and interval > timeout interval = timeout end logger.debug(self) {"Selecting with #{@children&.size} children with interval = #{interval ? interval.round(2) : 'infinite'}..."} if monitors = @selector.select(interval) monitors.each do |monitor| monitor.value.resume end end @timers.fire # We check and clear the interrupted flag here: if @interrupted @guard.synchronize do @interrupted = false end self.stop return false end return true end
def sleep(duration)
-
duration(Numeric) -- The time in seconds, to sleep for.
def sleep(duration) fiber = Fiber.current timer = self.after(duration) do if fiber.alive? fiber.resume end end Task.yield ensure timer.cancel if timer end
def stopped?
def stopped? @children.nil? || @children.empty? end
def to_s
def to_s "\#<#{self.description} #{@children&.size || 0} children #{stopped? ? 'stopped' : 'running'}>" end
def with_timeout(timeout, exception = TimeoutError)
-
duration(Numeric) -- The time in seconds, in which the task should
def with_timeout(timeout, exception = TimeoutError) fiber = Fiber.current timer = self.after(timeout) do if fiber.alive? error = exception.new("execution expired") fiber.resume error end end yield timer ensure timer.cancel if timer end
def yield(fiber = Fiber.current)
def yield(fiber = Fiber.current) @ready << fiber Fiber.yield end