lib/async/container/forked.rb



# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2017-2024, by Samuel Williams.

require_relative "error"

require_relative "generic"
require_relative "channel"
require_relative "notify/pipe"

module Async
	module Container
		# A multi-process container which uses {Process.fork}.
		class Forked < Generic
			# Indicates that this is a multi-process container.
			def self.multiprocess?
				true
			end
			
			# Represents a running child process from the point of view of the parent container.
			class Child < Channel
				# Represents a running child process from the point of view of the child process.
				class Instance < Notify::Pipe
					# Wrap an instance around the {Process} instance from within the forked child.
					# @parameter process [Process] The process intance to wrap.
					def self.for(process)
						instance = self.new(process.out)
						
						# The child process won't be reading from the channel:
						process.close_read
						
						instance.name = process.name
						
						return instance
					end
					
					# Initialize the child process instance.
					#
					# @parameter io [IO] The IO object to use for communication.
					def initialize(io)
						super
						
						@name = nil
					end
					
					# Generate a hash representation of the process.
					#
					# @returns [Hash] The process as a hash, including `process_id` and `name`.
					def as_json(...)
						{
							process_id: ::Process.pid,
							name: @name,
						}
					end
					
					# Generate a JSON representation of the process.
					#
					# @returns [String] The process as JSON.
					def to_json(...)
						as_json.to_json(...)
					end
					
					# Set the process title to the specified value.
					#
					# @parameter value [String] The name of the process.
					def name= value
						@name = value
						
						# This sets the process title to an empty string if the name is nil:
						::Process.setproctitle(@name.to_s)
					end
					
					# @returns [String] The name of the process.
					def name
						@name
					end
					
					# Replace the current child process with a different one. Forwards arguments and options to {::Process.exec}.
					# This method replaces the child process with the new executable, thus this method never returns.
					#
					# @parameter arguments [Array] The arguments to pass to the new process.
					# @parameter ready [Boolean] If true, informs the parent process that the child is ready. Otherwise, the child process will need to use a notification protocol to inform the parent process that it is ready.
					# @parameter options [Hash] Additional options to pass to {::Process.exec}.
					def exec(*arguments, ready: true, **options)
						if ready
							self.ready!(status: "(exec)")
						else
							self.before_spawn(arguments, options)
						end
						
						::Process.exec(*arguments, **options)
					end
				end
				
				# Fork a child process appropriate for a container.
				#
				# @returns [Process]
				def self.fork(**options)
					# $stderr.puts fork: caller
					self.new(**options) do |process|
						::Process.fork do
							# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
							Signal.trap(:INT) {::Thread.current.raise(Interrupt)}
							Signal.trap(:TERM) {::Thread.current.raise(Terminate)}
							Signal.trap(:HUP) {::Thread.current.raise(Restart)}
							
							# This could be a configuration option:
							::Thread.handle_interrupt(SignalException => :immediate) do
								yield Instance.for(process)
							rescue Interrupt
								# Graceful exit.
							rescue Exception => error
								Console.error(self, error)
								
								exit!(1)
							end
						end
					end
				end
				
				# Spawn a child process using {::Process.spawn}.
				#
				# The child process will need to inform the parent process that it is ready using a notification protocol.
				#
				# @parameter arguments [Array] The arguments to pass to the new process.
				# @parameter name [String] The name of the process.
				# @parameter options [Hash] Additional options to pass to {::Process.spawn}.
				def self.spawn(*arguments, name: nil, **options)
					self.new(name: name) do |process|
						Notify::Pipe.new(process.out).before_spawn(arguments, options)
						
						::Process.spawn(*arguments, **options)
					end
				end
				
				# Initialize the process.
				# @parameter name [String] The name to use for the child process.
				def initialize(name: nil)
					super()
					
					@name = name
					@status = nil
					@pid = nil
					
					@pid = yield(self)
					
					# The parent process won't be writing to the channel:
					self.close_write
				end
				
				# Convert the child process to a hash, suitable for serialization.
				#
				# @returns [Hash] The request as a hash.
				def as_json(...)
					{
						name: @name,
						pid: @pid,
						status: @status&.to_i,
					}
				end
				
				# Convert the request to JSON.
				#
				# @returns [String] The request as JSON.
				def to_json(...)
					as_json.to_json(...)
				end
				
				# Set the name of the process.
				# Invokes {::Process.setproctitle} if invoked in the child process.
				def name= value
					@name = value
					
					# If we are the child process:
					::Process.setproctitle(@name) if @pid.nil?
				end
				
				# The name of the process.
				# @attribute [String]
				attr :name
				
				# @attribute [Integer] The process identifier.
				attr :pid
				
				# A human readable representation of the process.
				# @returns [String]
				def inspect
					"\#<#{self.class} name=#{@name.inspect} status=#{@status.inspect} pid=#{@pid.inspect}>"
				end
				
				alias to_s inspect
				
				# Invoke {#terminate!} and then {#wait} for the child process to exit.
				def close
					self.terminate!
					self.wait
				ensure
					super
				end
				
				# Send `SIGINT` to the child process.
				def interrupt!
					unless @status
						::Process.kill(:INT, @pid)
					end
				end
				
				# Send `SIGTERM` to the child process.
				def terminate!
					unless @status
						::Process.kill(:TERM, @pid)
					end
				end
				
				# Send `SIGKILL` to the child process.
				def kill!
					unless @status
						::Process.kill(:KILL, @pid)
					end
				end
				
				# Send `SIGHUP` to the child process.
				def restart!
					unless @status
						::Process.kill(:HUP, @pid)
					end
				end
				
				# Wait for the child process to exit.
				# @asynchronous This method may block.
				#
				# @returns [::Process::Status] The process exit status.
				def wait
					if @pid && @status.nil?
						Console.debug(self, "Waiting for process to exit...", pid: @pid)
						
						_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)

						while @status.nil?
							sleep(0.1)
							
							_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
							
							if @status.nil?
								Console.warn(self) {"Process #{@pid} is blocking, has it exited?"}
							end
						end
					end
					
					Console.debug(self, "Process exited.", pid: @pid, status: @status)
					
					return @status
				end
			end
			
			
			# Start a named child process and execute the provided block in it.
			# @parameter name [String] The name (title) of the child process.
			# @parameter block [Proc] The block to execute in the child process.
			def start(name, &block)
				Child.fork(name: name, &block)
			end
		end
	end
end