lib/async/scheduler.rb



# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2020-2024, by Samuel Williams.
# Copyright, 2020, by Jun Jiang.
# Copyright, 2021, by Julien Portalier.

require_relative 'clock'
require_relative 'task'

require 'io/event'

require 'console'
require 'resolv'

module Async
	# Handles scheduling of fibers. Implements the fiber scheduler interface.
	class Scheduler < Node
		# Raised when an operation is attempted on a closed scheduler.
		class ClosedError < RuntimeError
			# Create a new error.
			#
			# @parameter message [String] The error message.
			def initialize(message = "Scheduler is closed!")
				super
			end
		end
		
		# Whether the fiber scheduler is supported.
		# @public Since `stable-v1`.
		def self.supported?
			true
		end
		
		# Create a new scheduler.
		#
		# @public Since `stable-v1`.
		# @parameter parent [Node | Nil] The parent node to use for task hierarchy.
		# @parameter selector [IO::Event::Selector] The selector to use for event handling.
		def initialize(parent = nil, selector: nil)
			super(parent)
			
			@selector = selector || ::IO::Event::Selector.new(Fiber.current)
			@interrupted = false
			
			@blocked = 0
			
			@busy_time = 0.0
			@idle_time = 0.0
			
			@timers = ::IO::Event::Timers.new
		end
		
		# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
		# @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded.
		def load
			total_time = @busy_time + @idle_time
			
			# If the total time is zero, then the load is zero:
			return 0.0 if total_time.zero?
			
			# We normalize to a 1 second window:
			if total_time > 1.0
				ratio = 1.0 / total_time
				@busy_time *= ratio
				@idle_time *= ratio
				
				# We don't need to divide here as we've already normalised it to a 1s window:
				return @busy_time
			else
				return @busy_time / total_time
			end
		end
	
		# Invoked when the fiber scheduler is being closed.
		#
		# Executes the run loop until all tasks are finished, then closes the scheduler.
		def scheduler_close(error = $!)
			# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
			unless error
				self.run
			end
		ensure
			self.close
		end
		
		# Terminate all child tasks.
		def terminate
			# If that doesn't work, take more serious action:
			@children&.each do |child|
				child.terminate
			end
			
			return @children.nil?
		end
		
		# Terminate all child tasks and close the scheduler.
		# @public Since `stable-v1`.
		def close
			self.run_loop do
				until self.terminate
					self.run_once!
				end
			end
			
			Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
		ensure
			# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
			selector = @selector
			@selector = nil
			
			selector&.close
			
			consume
		end
		
		# @returns [Boolean] Whether the scheduler has been closed.
		# @public Since `stable-v1`.
		def closed?
			@selector.nil?
		end
		
		# @returns [String] A description of the scheduler.
		def to_s
			"\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>"
		end
		
		# Interrupt the event loop and cause it to exit.
		# @asynchronous May be called from any thread.
		def interrupt
			@interrupted = true
			@selector&.wakeup
		end
		
		# Transfer from the calling fiber to the event loop.
		def transfer
			@selector.transfer
		end
		
		# Yield the current fiber and resume it on the next iteration of the event loop.
		def yield
			@selector.yield
		end
		
		# Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
		# @parameter fiber [Fiber | Object] The object to be resumed on the next iteration of the run-loop.
		def push(fiber)
			@selector.push(fiber)
		end
		
		# Raise an exception on a specified fiber with the given arguments.
		#
		# This internally schedules the current fiber to be ready, before raising the exception, so that it will later resume execution.
		#
		# @parameter fiber [Fiber] The fiber to raise the exception on.
		# @parameter *arguments [Array] The arguments to pass to the fiber.
		def raise(...)
			@selector.raise(...)
		end
		
		# Resume execution of the specified fiber.
		#
		# @parameter fiber [Fiber] The fiber to resume.
		# @parameter arguments [Array] The arguments to pass to the fiber.
		def resume(fiber, *arguments)
			@selector.resume(fiber, *arguments)
		end
		
		# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
		# @asynchronous May only be called on same thread as fiber scheduler.
		def block(blocker, timeout)
			# $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})"
			fiber = Fiber.current
			
			if timeout
				timer = @timers.after(timeout) do
					if fiber.alive?
						fiber.transfer(false)
					end
				end
			end
			
			begin
				@blocked += 1
				@selector.transfer
			ensure
				@blocked -= 1
			end
		ensure
			timer&.cancel!
		end
		
		# @asynchronous May be called from any thread.
		def unblock(blocker, fiber)
			# $stderr.puts "unblock(#{blocker}, #{fiber})"
			
			# This operation is protected by the GVL:
			if selector = @selector
				selector.push(fiber)
				selector.wakeup
			end
		end
		
		# @asynchronous May be non-blocking..
		def kernel_sleep(duration = nil)
			if duration
				self.block(nil, duration)
			else
				self.transfer
			end
		end
		
		# @asynchronous May be non-blocking..
		def address_resolve(hostname)
			# On some platforms, hostnames may contain a device-specific suffix (e.g. %en0). We need to strip this before resolving.
			# See <https://github.com/socketry/async/issues/180> for more details.
			hostname = hostname.split("%", 2).first
			::Resolv.getaddresses(hostname)
		end
		
		
		if IO.method_defined?(:timeout)
			private def get_timeout(io)
				io.timeout
			end
		else
			private def get_timeout(io)
				nil
			end
		end
		
		# @asynchronous May be non-blocking..
		def io_wait(io, events, timeout = nil)
			fiber = Fiber.current
			
			if timeout
				# If an explicit timeout is specified, we expect that the user will handle it themselves:
				timer = @timers.after(timeout) do
					fiber.transfer
				end
			elsif timeout = get_timeout(io)
				# Otherwise, if we default to the io's timeout, we raise an exception:
				timer = @timers.after(timeout) do
					fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become ready!")
				end
			end
			
			return @selector.io_wait(fiber, io, events)
		ensure
			timer&.cancel!
		end
		
		if ::IO::Event::Support.buffer?
			def io_read(io, buffer, length, offset = 0)
				fiber = Fiber.current
				
				if timeout = get_timeout(io)
					timer = @timers.after(timeout) do
						fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become readable!")
					end
				end
				
				@selector.io_read(fiber, io, buffer, length, offset)
			ensure
				timer&.cancel!
			end
			
			if RUBY_ENGINE != "ruby" || RUBY_VERSION >= "3.3.1"
				def io_write(io, buffer, length, offset = 0)
					fiber = Fiber.current
					
					if timeout = get_timeout(io)
						timer = @timers.after(timeout) do
							fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become writable!")
						end
					end
					
					@selector.io_write(fiber, io, buffer, length, offset)
				ensure
					timer&.cancel!
				end
			end
		end
		
		# Wait for the specified process ID to exit.
		# @parameter pid [Integer] The process ID to wait for.
		# @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
		# @returns [Process::Status] A process status instance.
		# @asynchronous May be non-blocking..
		def process_wait(pid, flags)
			return @selector.process_wait(Fiber.current, pid, flags)
		end
		
		# Run one iteration of the event loop.
		#
		# When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks.
		#
		# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
		# @returns [Boolean] Whether there is more work to do.
		private def run_once!(timeout = nil)
			start_time = Async::Clock.now
			
			interval = @timers.wait_interval
			
			# If there is no interval to wait (thus no timers), and no tasks, we could be done:
			if interval.nil?
				# Allow the user to specify a maximum interval if we would otherwise be sleeping indefinitely:
				interval = timeout
			elsif interval < 0
				# We have timers ready to fire, don't sleep in the selctor:
				interval = 0
			elsif timeout and interval > timeout
				interval = timeout
			end
			
			begin
				@selector.select(interval)
			rescue Errno::EINTR
				# Ignore.
			end
			
			@timers.fire
			
			# Compute load:
			end_time = Async::Clock.now
			total_duration = end_time - start_time
			idle_duration = @selector.idle_duration
			busy_duration = total_duration - idle_duration
			
			@busy_time += busy_duration
			@idle_time += idle_duration
			
			# The reactor still has work to do:
			return true
		end
		
		# Run one iteration of the event loop.
		# Does not handle interrupts.
		# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
		# @returns [Boolean] Whether there is more work to do.
		def run_once(timeout = nil)
			Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
			
			if self.finished?
				self.stop
			end
			
			# If we are finished, we stop the task tree and exit:
			if @children.nil?
				return false
			end
			
			return run_once!(timeout)
		end
		
		# Checks and clears the interrupted state of the scheduler.
		# @returns [Boolean] Whether the reactor has been interrupted.
		private def interrupted?
			if @interrupted
				@interrupted = false
				return true
			end
			
			if Thread.pending_interrupt?
				return true
			end
			
			return false
		end
		
		# Stop all children, including transient children, ignoring any signals.
		def stop
			@children&.each do |child|
				child.stop
			end
		end
		
		private def run_loop(&block)
			interrupt = nil
			
			begin
				# In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this.
				Thread.handle_interrupt(::SignalException => :never) do
					until self.interrupted?
						# If we are finished, we need to exit:
						break unless yield
					end
				end
			rescue Interrupt => interrupt
				Thread.handle_interrupt(::SignalException => :never) do
					self.stop
				end
				
				retry
			end
			
			# If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too.
			Kernel.raise(interrupt) if interrupt
		end
		
		# Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
		def run(...)
			Kernel.raise ClosedError if @selector.nil?
			
			initial_task = self.async(...) if block_given?
			
			self.run_loop do
				run_once
			end
			
			return initial_task
		end
		
		# Start an asynchronous task within the specified reactor. The task will be
		# executed until the first blocking call, at which point it will yield and
		# and this method will return.
		#
		# This is the main entry point for scheduling asynchronus tasks.
		#
		# @yields {|task| ...} Executed within the task.
		# @returns [Task] The task that was scheduled into the reactor.
		# @deprecated With no replacement.
		def async(*arguments, **options, &block)
			Kernel.raise ClosedError if @selector.nil?
			
			task = Task.new(Task.current? || self, **options, &block)
			
			# I want to take a moment to explain the logic of this.
			# When calling an async block, we deterministically execute it until the
			# first blocking operation. We don't *have* to do this - we could schedule
			# it for later execution, but it's useful to:
			# - Fail at the point of the method call where possible.
			# - Execute determinstically where possible.
			# - Avoid scheduler overhead if no blocking operation is performed.
			task.run(*arguments)
			
			# Console.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
			return task
		end
		
		def fiber(...)
			return async(...).fiber
		end
		
		# Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
		# @parameter duration [Numeric] The time in seconds, in which the task should complete.
		def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block)
			fiber = Fiber.current
			
			timer = @timers.after(duration) do
				if fiber.alive?
					fiber.raise(exception, message)
				end
			end
			
			yield timer
		ensure
			timer&.cancel!
		end
		
		def timeout_after(duration, exception, message, &block)
			with_timeout(duration, exception, message) do |timer|
				yield duration
			end
		end
	end
end