lib/async/container/threaded.rb



# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2017-2025, by Samuel Williams.

require_relative "generic"
require_relative "channel"
require_relative "notify/pipe"

module Async
	module Container
		# A multi-thread container which uses {Thread.fork}.
		class Threaded < Generic
			# Indicates that this is not a multi-process container.
			def self.multiprocess?
				false
			end
			
			# Represents a running child thread from the point of view of the parent container.
			class Child < Channel
				# Used to propagate the exit status of a child process invoked by {Instance#exec}.
				class Exit < Exception
					# Initialize the exit status.
					# @parameter status [::Process::Status] The process exit status.
					def initialize(status)
						@status = status
					end
					
					# The process exit status.
					# @attribute [::Process::Status]
					attr :status
					
					# The process exit status if it was an error.
					# @returns [::Process::Status | Nil]
					def error
						unless status.success?
							status
						end
					end
				end
				
				# Represents a running child thread from the point of view of the child thread.
				class Instance < Notify::Pipe
					# Wrap an instance around the {Thread} instance from within the threaded child.
					# @parameter thread [Thread] The thread intance to wrap.
					def self.for(thread)
						instance = self.new(thread.out)
						
						return instance
					end
					
					# Initialize the child thread instance.
					#
					# @parameter io [IO] The IO object to use for communication with the parent.
					def initialize(io)
						@thread = ::Thread.current
						
						super
					end
					
					# Generate a hash representation of the thread.
					#
					# @returns [Hash] The thread as a hash, including `process_id`, `thread_id`, and `name`.
					def as_json(...)
						{
							process_id: ::Process.pid,
							thread_id: @thread.object_id,
							name: @thread.name,
						}
					end
					
					# Generate a JSON representation of the thread.
					#
					# @returns [String] The thread as JSON.
					def to_json(...)
						as_json.to_json(...)
					end
					
					# Set the name of the thread.
					# @parameter value [String] The name to set.
					def name= value
						@thread.name = value
					end
					
					# Get the name of the thread.
					# @returns [String]
					def name
						@thread.name
					end
					
					# Execute a child process using {::Process.spawn}. In order to simulate {::Process.exec}, an {Exit} instance is raised to propagage exit status.
					# This creates the illusion that this method does not return (normally).
					def exec(*arguments, ready: true, **options)
						if ready
							self.ready!(status: "(spawn)")
						else
							self.before_spawn(arguments, options)
						end
						
						begin
							pid = ::Process.spawn(*arguments, **options)
						ensure
							_, status = ::Process.wait2(pid)
							
							raise Exit, status
						end
					end
				end
				
				# Start a new child thread and execute the provided block in it.
				#
				# @parameter options [Hash] Additional options to to the new child instance.
				def self.fork(**options)
					self.new(**options) do |thread|
						::Thread.new do
							# This could be a configuration option (see forked implementation too):
							::Thread.handle_interrupt(SignalException => :immediate) do
								yield Instance.for(thread)
							end
						end
					end
				end
				
				# Initialize the thread.
				#
				# @parameter name [String] The name to use for the child thread.
				def initialize(name: nil)
					super()
					
					@status = nil
					
					@thread = yield(self)
					@thread.report_on_exception = false
					@thread.name = name
					
					@waiter = ::Thread.new do
						begin
							@thread.join
						rescue Exit => exit
							finished(exit.error)
						rescue Interrupt
							# Graceful shutdown.
							finished
						rescue Exception => error
							finished(error)
						else
							finished
						end
					end
				end
				
				# Convert the child process to a hash, suitable for serialization.
				#
				# @returns [Hash] The request as a hash.
				def as_json(...)
					{
						name: @thread.name,
						status: @status&.as_json,
					}
				end
				
				# Convert the request to JSON.
				#
				# @returns [String] The request as JSON.
				def to_json(...)
					as_json.to_json(...)
				end
				
				# Set the name of the thread.
				# @parameter value [String] The name to set.
				def name= value
					@thread.name = value
				end
				
				# Get the name of the thread.
				# @returns [String]
				def name
					@thread.name
				end
				
				# A human readable representation of the thread.
				# @returns [String]
				def to_s
					"\#<#{self.class} #{@thread.name}>"
				end
				
				# Invoke {#terminate!} and then {#wait} for the child thread to exit.
				def close
					self.terminate!
					self.wait
				ensure
					super
				end
				
				# Raise {Interrupt} in the child thread.
				def interrupt!
					@thread.raise(Interrupt)
				end
				
				# Raise {Terminate} in the child thread.
				def terminate!
					@thread.raise(Terminate)
				end
				
				# Invoke {Thread#kill} on the child thread.
				def kill!
					# Killing a thread does not raise an exception in the thread, so we need to handle the status here:
					@status = Status.new(:killed)
					
					@thread.kill
				end
				
				# Raise {Restart} in the child thread.
				def restart!
					@thread.raise(Restart)
				end
				
				# Wait for the thread to exit and return he exit status.
				# @returns [Status]
				def wait
					if @waiter
						@waiter.join
						@waiter = nil
					end
					
					return @status
				end
				
				# A pseudo exit-status wrapper.
				class Status
					# Initialise the status.
					# @parameter error [::Process::Status] The exit status of the child thread.
					def initialize(error = nil)
						@error = error
					end
					
					# Whether the status represents a successful outcome.
					# @returns [Boolean]
					def success?
						@error.nil?
					end
					
					# Convert the status to a hash, suitable for serialization.
					#
					# @returns [Boolean | String] If the status is an error, the error message is returned, otherwise `true`.
					def as_json(...)
						if @error
							@error.inspect
						else
							true
						end
					end
					
					# A human readable representation of the status.
					def to_s
						"\#<#{self.class} #{success? ? "success" : "failure"}>"
					end
				end
				
				protected
				
				# Invoked by the @waiter thread to indicate the outcome of the child thread.
				def finished(error = nil)
					if error
						Console.error(self) {error}
					end
					
					@status ||= Status.new(error)
					self.close_write
				end
			end
			
			# Start a named child thread and execute the provided block in it.
			# @parameter name [String] The name (title) of the child process.
			# @parameter block [Proc] The block to execute in the child process.
			def start(name, &block)
				Child.fork(name: name, &block)
			end
		end
	end
end