class Async::Reactor

An asynchronous, cooperatively scheduled event reactor.

def self.run(*args, &block)

running.
the block as an asynchronous task. Will block until the reactor finishes
- When invoked at the top level, will create and run a reactor, and invoke
asynchronously. Will return the task once it has been scheduled.
- When invoked within an existing reactor task, it will run the given block

The preferred method to invoke asynchronous behavior at the top level.
def self.run(*args, &block)
	if current = Task.current?
		reactor = current.reactor
		
		return reactor.async(*args, &block)
	else
		reactor = self.new
		
		begin
			return reactor.run(*args, &block)
		ensure
			reactor.close
		end
	end
end

def << fiber

Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
def << fiber
	@ready << fiber
end

def async(*args, &block)

Returns:
  • (Task) - The task that was

Other tags:
    Yield: - Executed within the asynchronous task.
def async(*args, &block)
	task = Task.new(self, &block)
	
	# I want to take a moment to explain the logic of this.
	# When calling an async block, we deterministically execute it until the
	# first blocking operation. We don't *have* to do this - we could schedule
	# it for later execution, but it's useful to:
	# - Fail at the point of call where possible.
	# - Execute determinstically where possible.
	# - Avoid overhead if no blocking operation is performed.
	task.run(*args)
	
	# Async.logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
	return task
end

def close

Returns:
  • (void) -
def close
	@children.each(&:stop)
	
	# TODO Should we also clear all timers?
	
	@selector.close
	@selector = nil
end

def closed?

Returns:
  • (Boolean) -
def closed?
	@selector.nil?
end

def initialize

def initialize
	super
	
	@selector = NIO::Selector.new
	@timers = Timers::Group.new
	
	@ready = []
	
	@stopped = true
end

def register(*args)

def register(*args)
	monitor = @selector.register(*args)
	
	monitor.value = Fiber.current
	
	return monitor
end

def run(*args, &block)

Proxies arguments to {#async} immediately before entering the loop.
Run the reactor until either all tasks complete or {#stop} is invoked.
def run(*args, &block)
	raise RuntimeError, 'Reactor has been closed' if @selector.nil?
	
	@stopped = false
	
	# Allow the user to kick of the initial async tasks.
	initial_task = async(*args, &block) if block_given?
	
	@timers.wait do |interval|
		# - nil: no timers
		# - -ve: timers expired already
		# -   0: timers ready to fire
		# - +ve: timers waiting to fire
		interval = 0 if interval && interval < 0
		
		# Async.logger.debug{"[#{self} Pre] Updating #{@children.count} children..."}
		# As timeouts may have been updated, and caused fibers to complete, we should check this.
		
		@ready.each do |fiber|
			fiber.resume if fiber.alive?
		end; @ready.clear
		
		# If there is nothing to do, then finish:
		# Async.logger.debug{"[#{self}] @children.empty? = #{@children.empty?} && interval #{interval.inspect}"}
		return initial_task if @children.empty? && interval.nil?
		
		# Async.logger.debug{"Selecting with #{@children.count} fibers interval = #{interval.inspect}..."}
		if monitors = @selector.select(interval)
			monitors.each do |monitor|
				if fiber = monitor.value
					fiber.resume # if fiber.alive?
				end
			end
		end
	end until @stopped
	
	return initial_task
ensure
	Async.logger.debug{"[#{self} Ensure] Exiting run-loop (stopped: #{@stopped} exception: #{$!.inspect})..."}
	@stopped = true
end

def sleep(duration)

Parameters:
  • duration (Numeric) -- The time in seconds, to sleep for.
def sleep(duration)
	fiber = Fiber.current
	
	timer = self.after(duration) do
		if fiber.alive?
			fiber.resume
		end
	end
	
	Task.yield
ensure
	timer.cancel if timer
end

def stop

Returns:
  • (void) -
def stop
	unless @stopped
		@stopped = true
		@selector.wakeup
	end
end

def timeout(duration)

Parameters:
  • duration (Integer) -- The time in seconds, in which the task should
def timeout(duration)
	backtrace = caller
	task = Fiber.current
	
	timer = self.after(duration) do
		if task.alive?
			error = TimeoutError.new("execution expired")
			error.set_backtrace backtrace
			task.resume error
		end
	end
	
	yield
ensure
	timer.cancel if timer
end

def to_s

def to_s
	"<#{self.description} stopped=#{@stopped}>"
end

def yield(fiber = Fiber.current)

Yield the current fiber and resume it on the next iteration of the event loop.
def yield(fiber = Fiber.current)
	@ready << fiber
	
	Fiber.yield
end