lib/async/reactor.rb



# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com>
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative 'logger'
require_relative 'task'
require_relative 'wrapper'

require 'nio'
require 'timers'
require 'forwardable'

module Async
	# Raised if a timeout occurs on a specific Fiber. Handled gracefully by {Task}.
	class TimeoutError < Exception
	end
	
	# An asynchronous, cooperatively scheduled event reactor.
	class Reactor < Node
		extend Forwardable
		
		# The preferred method to invoke asynchronous behavior at the top level.
		#
		# - When invoked within an existing reactor task, it will run the given block
		# asynchronously. Will return the task once it has been scheduled.
		# - When invoked at the top level, will create and run a reactor, and invoke
		# the block as an asynchronous task. Will block until the reactor finishes
		# running.
		def self.run(*args, &block)
			if current = Task.current?
				reactor = current.reactor
				
				return reactor.async(*args, &block)
			else
				reactor = self.new
				
				begin
					return reactor.run(*args, &block)
				ensure
					reactor.close
				end
			end
		end
		
		def initialize(parent = nil, selector: NIO::Selector.new)
			super(parent)
			
			@selector = selector
			@timers = Timers::Group.new
			
			@ready = []
			@running = []
			
			@stopped = true
		end
		
		def to_s
			"<#{self.description} stopped=#{@stopped}>"
		end
		
		# @attr stopped [Boolean]
		attr :stopped
		
		def stopped?
			@stopped
		end
		
		def_delegators :@timers, :every, :after
		
		# Start an asynchronous task within the specified reactor. The task will be
		# executed until the first blocking call, at which point it will yield and
		# and this method will return.
		#
		# This is the main entry point for scheduling asynchronus tasks.
		#
		# @yield [Task] Executed within the asynchronous task.
		# @return [Task] The task that was 
		def async(*args, &block)
			task = Task.new(self, &block)
			
			# I want to take a moment to explain the logic of this.
			# When calling an async block, we deterministically execute it until the
			# first blocking operation. We don't *have* to do this - we could schedule
			# it for later execution, but it's useful to:
			# - Fail at the point of call where possible.
			# - Execute determinstically where possible.
			# - Avoid overhead if no blocking operation is performed.
			task.run(*args)
			
			# Async.logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
			return task
		end
		
		def register(io, interest, value = Fiber.current)
			monitor = @selector.register(io, interest)
			monitor.value = value
			
			return monitor
		end
	
		# Stop the reactor at the earliest convenience. Can be called from a different thread safely.
		# @return [void]
		def stop
			unless @stopped
				@stopped = true
				@selector.wakeup
			end
		end
		
		# Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
		def << fiber
			@ready << fiber
		end
		
		# Yield the current fiber and resume it on the next iteration of the event loop.
		def yield(fiber = Fiber.current)
			@ready << fiber
			
			Fiber.yield
		end
		
		def finished?
			super && @ready.empty? && @running.empty?
		end
		
		# Run the reactor until either all tasks complete or {#stop} is invoked.
		# Proxies arguments to {#async} immediately before entering the loop.
		def run(*args, &block)
			raise RuntimeError, 'Reactor has been closed' if @selector.nil?
			
			@stopped = false
			
			# Allow the user to kick of the initial async tasks.
			initial_task = async(*args, &block) if block_given?
			
			@timers.wait do |interval|
				# running used to correctly answer on `finished?`, and to reuse Array object.
				@running, @ready = @ready, @running
				if @running.any?
					@running.each do |fiber|
						fiber.resume if fiber.alive?
					end
					@running.clear

					# if there are tasks ready to execute, don't sleep.
					if @ready.any?
						interval = 0
					else
						# The above tasks may schedule, cancel or affect timers in some way. We need to compute a new wait interval for the blocking selector call below:
						interval = @timers.wait_interval
					end
				end
				
				# - nil: no timers
				# - -ve: timers expired already
				# -   0: timers ready to fire
				# - +ve: timers waiting to fire
				if interval && interval < 0
					interval = 0
				end
				
				# Async.logger.debug(self) {"Updating #{@children.count} children..."}
				# As timeouts may have been updated, and caused fibers to complete, we should check this.
				
				# If there is nothing to do, then finish:
				if !interval && self.finished?
					return initial_task
				end
				
				# Async.logger.debug(self) {"Selecting with #{@children.count} fibers interval = #{interval.inspect}..."}
				if monitors = @selector.select(interval)
					monitors.each do |monitor|
						monitor.value.resume
					end
				end
			end until @stopped
			
			return initial_task
		ensure
			Async.logger.debug(self) {"Exiting run-loop because #{$! ? $!.inspect : 'finished'}."}
			@stopped = true
		end
	
		# Stop each of the children tasks and close the selector.
		# 
		# @return [void]
		def close
			@children.each(&:stop)
			
			# TODO Should we also clear all timers?
			
			@selector.close
			@selector = nil
		end
		
		# Check if the selector has been closed.
		# @return [Boolean]
		def closed?
			@selector.nil?
		end
	
		# Put the calling fiber to sleep for a given ammount of time.
		# @param duration [Numeric] The time in seconds, to sleep for.
		def sleep(duration)
			fiber = Fiber.current
			
			timer = self.after(duration) do
				if fiber.alive?
					fiber.resume
				end
			end
			
			Task.yield
		ensure
			timer.cancel if timer
		end
		
		# Invoke the block, but after the timeout, raise {TimeoutError} in any
		# currenly blocking operation.
		# @param duration [Integer] The time in seconds, in which the task should 
		#   complete.
		def timeout(duration, exception = TimeoutError)
			backtrace = caller
			fiber = Fiber.current
			
			timer = self.after(duration) do
				if fiber.alive?
					error = exception.new("execution expired")
					error.set_backtrace backtrace
					fiber.resume error
				end
			end
			
			yield
		ensure
			timer.cancel if timer
		end
	end
end