class Async::Scheduler
Handles scheduling of fibers. Implements the fiber scheduler interface.
def self.supported?
Whether the fiber scheduler is supported.
def self.supported? true end
def address_resolve(hostname)
def address_resolve(hostname) ::Resolv.getaddresses(hostname) end
def async(*arguments, **options, &block)
- With no replacement.
def async(*arguments, **options, &block) task = Task.new(Task.current? || self, **options, &block) # I want to take a moment to explain the logic of this. # When calling an async block, we deterministically execute it until the # first blocking operation. We don't *have* to do this - we could schedule # it for later execution, but it's useful to: # - Fail at the point of the method call where possible. # - Execute determinstically where possible. # - Avoid scheduler overhead if no blocking operation is performed. task.run(*arguments) # Console.logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..." return task end
def block(blocker, timeout)
Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
def block(blocker, timeout) # $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})" fiber = Fiber.current if timeout timer = @timers.after(timeout) do if fiber.alive? fiber.transfer(false) end end end begin @blocked += 1 @selector.transfer ensure @blocked -= 1 end ensure timer&.cancel end
def close
def close # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. until self.terminate self.run_once end Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0 # We depend on GVL for consistency: # @guard.synchronize do # We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it. selector = @selector @selector = nil selector&.close # end consume end
def closed?
@returns [Boolean] Whether the scheduler has been closed.
def closed? @selector.nil? end
def fiber(...)
def fiber(...) return async(...).fiber end
def initialize(parent = nil, selector: nil)
def initialize(parent = nil, selector: nil) super(parent) @selector = selector || ::IO::Event::Selector.new(Fiber.current) @interrupted = false @blocked = 0 @timers = ::Timers::Group.new end
def interrupt
Interrupt the event loop and cause it to exit.
def interrupt @interrupted = true @selector&.wakeup end
def io_read(io, buffer, length, offset = 0)
def io_read(io, buffer, length, offset = 0) @selector.io_read(Fiber.current, io, buffer, length, offset) end
def io_wait(io, events, timeout = nil)
def io_wait(io, events, timeout = nil) fiber = Fiber.current if timeout timer = @timers.after(timeout) do fiber.raise(TimeoutError) end end # Console.logger.info(self, "-> io_wait", fiber, io, events) events = @selector.io_wait(fiber, io, events) # Console.logger.info(self, "<- io_wait", fiber, io, events) return events rescue TimeoutError return false ensure timer&.cancel end
def io_write(io, buffer, length, offset = 0)
def io_write(io, buffer, length, offset = 0) @selector.io_write(Fiber.current, io, buffer, length, offset) end
def kernel_sleep(duration = nil)
def kernel_sleep(duration = nil) if duration self.block(nil, duration) else self.transfer end end
def process_wait(pid, flags)
@returns [Process::Status] A process status instance.
@parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
@parameter pid [Integer] The process ID to wait for.
Wait for the specified process ID to exit.
def process_wait(pid, flags) return @selector.process_wait(Fiber.current, pid, flags) end
def push(fiber)
Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
def push(fiber) @selector.push(fiber) end
def raise(*arguments)
def raise(*arguments) @selector.raise(*arguments) end
def resume(fiber, *arguments)
def resume(fiber, *arguments) @selector.resume(fiber, *arguments) end
def run(...)
def run(...) Kernel::raise RuntimeError, 'Reactor has been closed' if @selector.nil? initial_task = self.async(...) if block_given? @interrupted = false while self.run_once if @interrupted break end end return initial_task ensure Console.logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end
def run_once(timeout = nil)
@parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
Run one iteration of the event loop.
def run_once(timeout = nil) Kernel::raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? # If we are finished, we stop the task tree and exit: if self.finished? return false end interval = @timers.wait_interval # If there is no interval to wait (thus no timers), and no tasks, we could be done: if interval.nil? # Allow the user to specify a maximum interval if we would otherwise be sleeping indefinitely: interval = timeout elsif interval < 0 # We have timers ready to fire, don't sleep in the selctor: interval = 0 elsif timeout and interval > timeout interval = timeout end begin @selector.select(interval) rescue Errno::EINTR # Ignore. end @timers.fire # The reactor still has work to do: return true end
def scheduler_close
def scheduler_close self.run ensure self.close end
def timeout_after(duration, exception, message, &block)
def timeout_after(duration, exception, message, &block) with_timeout(duration, exception, message) do |timer| yield duration end end
def to_s
def to_s "\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>" end
def transfer
def transfer @selector.transfer end
def unblock(blocker, fiber)
def unblock(blocker, fiber) # $stderr.puts "unblock(#{blocker}, #{fiber})" # This operation is protected by the GVL: if selector = @selector selector.push(fiber) selector.wakeup end end
def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block)
Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block) fiber = Fiber.current timer = @timers.after(duration) do if fiber.alive? fiber.raise(exception, message) end end yield timer ensure timer.cancel if timer end
def yield
def yield @selector.yield end