class Async::Reactor

An asynchronous, cooperatively scheduled event reactor.

def self.run(*arguments, **options, &block)

running.
the block as an asynchronous task. Will block until the reactor finishes
- When invoked at the top level, will create and run a reactor, and invoke
asynchronously. Will return the task once it has been scheduled.
- When invoked within an existing reactor task, it will run the given block

The preferred method to invoke asynchronous behavior at the top level.
def self.run(*arguments, **options, &block)
	if current = Task.current?
		reactor = current.reactor
		
		return reactor.async(*arguments, **options, &block)
	else
		reactor = self.new(**options)
		
		begin
			return reactor.run(*arguments, &block)
		ensure
			reactor.close
		end
	end
end

def self.selector

def self.selector
	if backend = ENV['ASYNC_BACKEND']&.to_sym
		if NIO::Selector.backends.include?(backend)
			return NIO::Selector.new(backend)
		else
			warn "Could not find ASYNC_BACKEND=#{backend}!"
		end
	end
	
	return NIO::Selector.new
end

def << fiber

Parameters:
  • fiber (#resume) -- The object to be resumed on the next iteration of the run-loop.
def << fiber
	@ready << fiber
end

def async(*arguments, **options, &block)

Returns:
  • (Task) - The task that was scheduled into the reactor.

Other tags:
    Yield: - Executed within the task.
def async(*arguments, **options, &block)
	task = Task.new(self, **options, &block)
	
	# I want to take a moment to explain the logic of this.
	# When calling an async block, we deterministically execute it until the
	# first blocking operation. We don't *have* to do this - we could schedule
	# it for later execution, but it's useful to:
	# - Fail at the point of the method call where possible.
	# - Execute determinstically where possible.
	# - Avoid scheduler overhead if no blocking operation is performed.
	task.run(*arguments)
	
	# logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
	return task
end

def close

Returns:
  • (void) -
def close
	self.stop
	
	# TODO Should we also clear all timers?
	@selector.close
	@selector = nil
end

def closed?

Returns:
  • (Boolean) -
def closed?
	@selector.nil?
end

def finished?

def finished?
	# TODO I'm not sure if checking `@running.empty?` is really required.
	super && @ready.empty? && @running.empty?
end

def initialize(parent = nil, selector: self.class.selector, logger: nil)

def initialize(parent = nil, selector: self.class.selector, logger: nil)
	super(parent)
	
	@selector = selector
	@timers = Timers::Group.new
	@logger = logger
	
	@ready = []
	@running = []
	
	@interrupted = false
	@guard = Mutex.new
end

def interrupt

Interrupt the reactor at the earliest convenience. Can be called from a different thread safely.
def interrupt
	@guard.synchronize do
		unless @interrupted
			@interrupted = true
			@selector.wakeup
		end
	end
end

def logger

def logger
	@logger ||= Console.logger
end

def register(io, interest, value = Fiber.current)

def register(io, interest, value = Fiber.current)
	monitor = @selector.register(io, interest)
	monitor.value = value
	
	return monitor
end

def run(*arguments, &block)

loop, if a block is provided.
invoked. Proxies arguments to {#async} immediately before entering the
Run the reactor until either all tasks complete or {#pause} or {#stop} is
def run(*arguments, &block)
	raise RuntimeError, 'Reactor has been closed' if @selector.nil?
	
	initial_task = self.async(*arguments, &block) if block_given?
	
	while self.run_once
		# Round and round we go!
	end
	
	return initial_task
ensure
	logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."}
end

def run_once(timeout = nil)

Returns:
  • (Boolean) - whether there is more work to do.

Parameters:
  • timeout (Float | nil) -- the maximum timeout, or if nil, indefinite.
def run_once(timeout = nil)
	logger.debug(self) {"@ready = #{@ready} @running = #{@running}"}
	
	if @ready.any?
		# running used to correctly answer on `finished?`, and to reuse Array object.
		@running, @ready = @ready, @running
		
		@running.each do |fiber|
			fiber.resume if fiber.alive?
		end
		
		@running.clear
	end
	
	if @ready.empty?
		interval = @timers.wait_interval
	else
		# if there are tasks ready to execute, don't sleep:
		interval = 0
	end
	
	# If there is no interval to wait (thus no timers), and no tasks, we could be done:
	if interval.nil?
		if self.finished?
			# If there is nothing to do, then finish:
			return false
		end
		
		# Allow the user to specify a maximum interval if we would otherwise be sleeping indefinitely:
		interval = timeout
	elsif interval < 0
		# We have timers ready to fire, don't sleep in the selctor:
		interval = 0
	elsif timeout and interval > timeout
		interval = timeout
	end
	
	logger.debug(self) {"Selecting with #{@children&.size} children with interval = #{interval ? interval.round(2) : 'infinite'}..."}
	if monitors = @selector.select(interval)
		monitors.each do |monitor|
			monitor.value.resume
		end
	end
	
	@timers.fire
	
	# We check and clear the interrupted flag here:
	if @interrupted
		@guard.synchronize do
			@interrupted = false
		end
		
		self.stop
		
		return false
	end
	
	return true
end

def sleep(duration)

Parameters:
  • duration (Numeric) -- The time in seconds, to sleep for.
def sleep(duration)
	fiber = Fiber.current
	
	timer = self.after(duration) do
		if fiber.alive?
			fiber.resume
		end
	end
	
	Task.yield
ensure
	timer.cancel if timer
end

def stopped?

def stopped?
	@children.nil? || @children.empty?
end

def to_s

def to_s
	"\#<#{self.description} #{@children&.size || 0} children #{stopped? ? 'stopped' : 'running'}>"
end

def with_timeout(timeout, exception = TimeoutError)

Parameters:
  • duration (Numeric) -- The time in seconds, in which the task should
def with_timeout(timeout, exception = TimeoutError)
	fiber = Fiber.current
	
	timer = self.after(timeout) do
		if fiber.alive?
			error = exception.new("execution expired")
			fiber.resume error
		end
	end
	
	yield timer
ensure
	timer.cancel if timer
end

def yield(fiber = Fiber.current)

Yield the current fiber and resume it on the next iteration of the event loop.
def yield(fiber = Fiber.current)
	@ready << fiber
	
	Fiber.yield
end