class Async::Reactor
An asynchronous, cooperatively scheduled event reactor.
def self.run(*args, &block)
the block as an asynchronous task. Will block until the reactor finishes
- When invoked at the top level, will create and run a reactor, and invoke
asynchronously. Will return the task once it has been scheduled.
- When invoked within an existing reactor task, it will run the given block
The preferred method to invoke asynchronous behavior.
def self.run(*args, &block) if current = Task.current? reactor = current.reactor reactor.async(*args, &block) else reactor = self.new begin reactor.run(*args, &block) ensure reactor.close end return reactor end end
def async(*ios, &block)
-
(Task)
-
def async(*ios, &block) task = Task.new(ios, self, &block) # I want to take a moment to explain the logic of this. # When calling an async block, we deterministically execute it until the # first blocking operation. We don't *have* to do this - we could schedule # it for later execution, but it's useful to: # - Fail at the point of call where possible. # - Execute determinstically where possible. # - Avoid overhead if no blocking operation is performed. task.run # Async.logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..." return task end
def close
-
(void)
-
def close @children.each(&:stop) @selector.close @selector = nil end
def closed?
-
(Boolean)
-
def closed? @selector.nil? end
def initialize(wrappers: IO)
-
wrappers
(Hash
) -- A mapping for wrapping pre-existing IO objects.
def initialize(wrappers: IO) super(nil) @wrappers = wrappers @selector = NIO::Selector.new @timers = Timers::Group.new @stopped = true end
def register(*args)
def register(*args) @selector.register(*args) end
def run(*args, &block)
Run the reactor until either all tasks complete or {#stop} is invoked.
def run(*args, &block) raise RuntimeError, 'Reactor has been closed' if @selector.nil? @stopped = false # Allow the user to kick of the initial async tasks. async(*args, &block) if block_given? @timers.wait do |interval| # - nil: no timers # - -ve: timers expired already # - 0: timers ready to fire # - +ve: timers waiting to fire interval = 0 if interval && interval < 0 Async.logger.debug{"[#{self} Pre] Updating #{@children.count} children..."} Async.logger.debug{@children.collect{|child| [child.to_s, child.alive?]}.inspect} # As timeouts may have been updated, and caused fibers to complete, we should check this. # If there is nothing to do, then finish: Async.logger.debug{"[#{self}] @children.empty? = #{@children.empty?} && interval #{interval.inspect}"} return if @children.empty? && interval.nil? Async.logger.debug{"Selecting with #{@children.count} fibers interval = #{interval}..."} if monitors = @selector.select(interval) monitors.each do |monitor| if fiber = monitor.value # Async.logger.debug "Resuming task #{task} due to IO..." fiber.resume # if fiber.alive? end end end end until @stopped return self ensure Async.logger.debug{"[#{self} Ensure] Exiting run-loop (stopped: #{@stopped} exception: #{$!})..."} Async.logger.debug{@children.collect{|child| [child.to_s, child.alive?]}.inspect} @stopped = true end
def sleep(duration)
-
duration
(Numeric
) -- The time in seconds, to sleep for.
def sleep(duration) task = Fiber.current timer = self.after(duration) do if task.alive? task.resume end end Task.yield ensure timer.cancel if timer end
def stop
-
(void)
-
def stop unless @stopped @stopped = true @selector.wakeup end end
def timeout(duration)
-
duration
(Integer
) -- The time in seconds, in which the task should
def timeout(duration) backtrace = caller task = Fiber.current timer = self.after(duration) do if task.alive? error = TimeoutError.new("execution expired") error.set_backtrace backtrace task.resume error end end yield ensure timer.cancel if timer end
def with(io, &block)
def with(io, &block) async do |task| task.with(io, &block) end end
def wrap(io, task)
-
(Wrapper)
-
Parameters:
-
task
(Task
) -- The task which manages the wrapper. -
io
() -- The `IO` instance to wrap.
def wrap(io, task) @wrappers[io].new(io, task) end