class Async::Reactor

An asynchronous, cooperatively scheduled event reactor.

def self.run(*args, **options, &block)

running.
the block as an asynchronous task. Will block until the reactor finishes
- When invoked at the top level, will create and run a reactor, and invoke
asynchronously. Will return the task once it has been scheduled.
- When invoked within an existing reactor task, it will run the given block

The preferred method to invoke asynchronous behavior at the top level.
def self.run(*args, **options, &block)
	if current = Task.current?
		reactor = current.reactor
		
		return reactor.async(*args, **options, &block)
	else
		reactor = self.new(**options)
		
		begin
			return reactor.run(*args, &block)
		ensure
			reactor.close
		end
	end
end

def self.selector

def self.selector
	if backend = ENV['ASYNC_BACKEND']&.to_sym
		if NIO::Selector.backends.include?(backend)
			return NIO::Selector.new(backend)
		else
			warn "Could not find ASYNC_BACKEND=#{backend}!"
		end
	end
	
	return NIO::Selector.new
end

def << fiber

Parameters:
  • fiber (#resume) -- The object to be resumed on the next iteration of the run-loop.
def << fiber
	@ready << fiber
end

def async(*args, **options, &block)

Returns:
  • (Task) - The task that was scheduled into the reactor.

Other tags:
    Yield: - Executed within the task.
def async(*args, **options, &block)
	task = Task.new(self, **options, &block)
	
	# I want to take a moment to explain the logic of this.
	# When calling an async block, we deterministically execute it until the
	# first blocking operation. We don't *have* to do this - we could schedule
	# it for later execution, but it's useful to:
	# - Fail at the point of call where possible.
	# - Execute determinstically where possible.
	# - Avoid overhead if no blocking operation is performed.
	task.run(*args)
	
	# logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
	return task
end

def close

Returns:
  • (void) -
def close
	@children&.each(&:stop)
	
	# TODO Should we also clear all timers?
	@selector.close
	@selector = nil
end

def closed?

Returns:
  • (Boolean) -
def closed?
	@selector.nil?
end

def finished?

def finished?
	# I'm not sure if checking `@running.empty?` is really required.
	super && @ready.empty? && @running.empty?
end

def initialize(parent = nil, selector: self.class.selector, logger: nil)

def initialize(parent = nil, selector: self.class.selector, logger: nil)
	super(parent)
	
	@selector = selector
	@timers = Timers::Group.new
	@logger = logger
	
	@ready = []
	@running = []
	
	@stopped = true
end

def logger

def logger
	@logger ||= Console.logger
end

def register(io, interest, value = Fiber.current)

def register(io, interest, value = Fiber.current)
	monitor = @selector.register(io, interest)
	monitor.value = value
	
	return monitor
end

def run(*args, &block)

Proxies arguments to {#async} immediately before entering the loop.
Run the reactor until either all tasks complete or {#stop} is invoked.
def run(*args, &block)
	raise RuntimeError, 'Reactor has been closed' if @selector.nil?
	
	@stopped = false
	
	initial_task = self.async(*args, &block) if block_given?
	
	@timers.wait do |interval|
		# logger.debug(self) {"@ready = #{@ready} @running = #{@running}"}
		
		if @ready.any?
			# running used to correctly answer on `finished?`, and to reuse Array object.
			@running, @ready = @ready, @running
			
			@running.each do |fiber|
				fiber.resume if fiber.alive?
			end
			
			@running.clear
			
			# if there are tasks ready to execute, don't sleep.
			if @ready.any?
				interval = 0
			else
				# The above tasks may schedule, cancel or affect timers in some way. We need to compute a new wait interval for the blocking selector call below:
				interval = @timers.wait_interval
			end
		end
		
		# As timeouts may have been updated, and caused fibers to complete, we should check this.
		if interval.nil?
			if self.finished?
				# If there is nothing to do, then finish:
				return initial_task
			end
		elsif interval < 0
			# We have timers ready to fire, don't sleep in the selctor:
			interval = 0
		end
		
		# logger.debug(self) {"Selecting with #{@children&.count} children with interval = #{interval.inspect}..."}
		if monitors = @selector.select(interval)
			monitors.each do |monitor|
				monitor.value.resume
			end
		end
	end until @stopped
	
	return initial_task
ensure
	logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."}
	
	@stopped = true
end

def sleep(duration)

Parameters:
  • duration (Numeric) -- The time in seconds, to sleep for.
def sleep(duration)
	fiber = Fiber.current
	
	timer = self.after(duration) do
		if fiber.alive?
			fiber.resume
		end
	end
	
	Task.yield
ensure
	timer.cancel if timer
end

def stop

Returns:
  • (void) -
def stop
	unless @stopped
		@stopped = true
		@selector.wakeup
	end
end

def stopped?

def stopped?
	@stopped
end

def timeout(*args, &block)

TODO remove
def timeout(*args, &block)
	warn "#{self.class}\#timeout(...) is deprecated, use #{self.class}\#with_timeout(...) instead."
	
	with_timeout(*args, &block)
end

def to_s

def to_s
	"<#{self.description} stopped=#{@stopped}>"
end

def with_timeout(timeout, exception = TimeoutError)

Parameters:
  • duration (Numeric) -- The time in seconds, in which the task should
def with_timeout(timeout, exception = TimeoutError)
	fiber = Fiber.current
	
	timer = self.after(timeout) do
		if fiber.alive?
			error = exception.new("execution expired")
			fiber.resume error
		end
	end
	
	yield timer
ensure
	timer.cancel if timer
end

def yield(fiber = Fiber.current)

Yield the current fiber and resume it on the next iteration of the event loop.
def yield(fiber = Fiber.current)
	@ready << fiber
	
	Fiber.yield
end