class Async::Reactor

An asynchronous, cooperatively scheduled event reactor.

def self.run(*args, &block)

running.
the block as an asynchronous task. Will block until the reactor finishes
- When invoked at the top level, will create and run a reactor, and invoke
asynchronously. Will return the task once it has been scheduled.
- When invoked within an existing reactor task, it will run the given block

The preferred method to invoke asynchronous behavior.
def self.run(*args, &block)
	if current = Task.current?
		reactor = current.reactor
		
		reactor.async(*args, &block)
	else
		reactor = self.new
		
		begin
			reactor.run(*args, &block)
		ensure
			reactor.close
		end
		
		return reactor
	end
end

def async(*ios, &block)

Returns:
  • (Task) -
def async(*ios, &block)
	task = Task.new(ios, self, &block)
	
	# I want to take a moment to explain the logic of this.
	# When calling an async block, we deterministically execute it until the
	# first blocking operation. We don't *have* to do this - we could schedule
	# it for later execution, but it's useful to:
	# - Fail at the point of call where possible.
	# - Execute determinstically where possible.
	# - Avoid overhead if no blocking operation is performed.
	task.run
	
	# Async.logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
	return task
end

def close

Returns:
  • (void) -
def close
	@children.each(&:stop)
	
	@selector.close
	@selector = nil
end

def closed?

Returns:
  • (Boolean) -
def closed?
	@selector.nil?
end

def initialize(wrappers: IO)

Parameters:
  • wrappers (Hash) -- A mapping for wrapping pre-existing IO objects.
def initialize(wrappers: IO)
	super(nil)
	
	@wrappers = wrappers
	
	@selector = NIO::Selector.new
	@timers = Timers::Group.new
	
	@stopped = true
end

def register(*args)

def register(*args)
	@selector.register(*args)
end

def run(*args, &block)

Proxies arguments to {#async} immediately before entering the loop.
Run the reactor until either all tasks complete or {#stop} is invoked.
def run(*args, &block)
	raise RuntimeError, 'Reactor has been closed' if @selector.nil?
	
	@stopped = false
	
	# Allow the user to kick of the initial async tasks.
	async(*args, &block) if block_given?
	
	@timers.wait do |interval|
		# - nil: no timers
		# - -ve: timers expired already
		# -   0: timers ready to fire
		# - +ve: timers waiting to fire
		interval = 0 if interval && interval < 0
		
		Async.logger.debug{"[#{self} Pre] Updating #{@children.count} children..."}
		Async.logger.debug{@children.collect{|child| [child.to_s, child.alive?]}.inspect}
		# As timeouts may have been updated, and caused fibers to complete, we should check this.
		
		# If there is nothing to do, then finish:
		Async.logger.debug{"[#{self}] @children.empty? = #{@children.empty?} && interval #{interval.inspect}"}
		return if @children.empty? && interval.nil?
		
		Async.logger.debug{"Selecting with #{@children.count} fibers interval = #{interval}..."}
		if monitors = @selector.select(interval)
			monitors.each do |monitor|
				if fiber = monitor.value
					# Async.logger.debug "Resuming task #{task} due to IO..."
					fiber.resume # if fiber.alive?
				end
			end
		end
	end until @stopped
	
	return self
ensure
	Async.logger.debug{"[#{self} Ensure] Exiting run-loop (stopped: #{@stopped} exception: #{$!})..."}
	Async.logger.debug{@children.collect{|child| [child.to_s, child.alive?]}.inspect}
	@stopped = true
end

def sleep(duration)

Parameters:
  • duration (Numeric) -- The time in seconds, to sleep for.
def sleep(duration)
	task = Fiber.current
	
	timer = self.after(duration) do
		if task.alive?
			task.resume
		end
	end
	
	Task.yield
ensure
	timer.cancel if timer
end

def stop

Returns:
  • (void) -
def stop
	unless @stopped
		@stopped = true
		@selector.wakeup
	end
end

def timeout(duration)

Parameters:
  • duration (Integer) -- The time in seconds, in which the task should
def timeout(duration)
	backtrace = caller
	task = Fiber.current
	
	timer = self.after(duration) do
		if task.alive?
			error = TimeoutError.new("execution expired")
			error.set_backtrace backtrace
			task.resume error
		end
	end
	
	yield
ensure
	timer.cancel if timer
end

def with(io, &block)

def with(io, &block)
	async do |task|
		task.with(io, &block)
	end
end

def wrap(io, task)

Returns:
  • (Wrapper) -

Parameters:
  • task (Task) -- The task which manages the wrapper.
  • io () -- The `IO` instance to wrap.
def wrap(io, task)
	@wrappers[io].new(io, task)
end