lib/async/container/supervisor/server.rb



# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

require_relative "connection"
require_relative "endpoint"
require_relative "dispatchable"

require "io/stream"

module Async
	module Container
		module Supervisor
			# The server represents the main supervisor process which is responsible for managing the lifecycle of other processes.
			#
			# There are various tasks that can be executed by the server, such as restarting the process group, and querying the status of the processes. The server is also responsible for managing the lifecycle of the monitors, which can be used to monitor the status of the connected workers.
			class Server
				def initialize(monitors: [], endpoint: Supervisor.endpoint)
					@monitors = monitors
					@endpoint = endpoint
				end
				
				attr :monitors
				
				include Dispatchable
				
				def do_register(call)
					call.connection.state.merge!(call.message[:state])
					
					@monitors.each do |monitor|
						begin
							monitor.register(call.connection)
						rescue => error
							Console.error(self, "Error while registering process!", monitor: monitor, exception: error)
						end
					end
				ensure
					call.finish
				end
				
				# Restart the current process group, usually including the supervisor and any other processes.
				#
				# @parameter signal [Symbol] The signal to send to the process group.
				def do_restart(call)
					signal = call[:signal] || :INT
					
					# We are going to terminate the progress group, including *this* process, so finish the current RPC before that:
					call.finish
					
					::Process.kill(signal, ::Process.ppid)
				end
				
				def do_status(call)
					@monitors.each do |monitor|
						monitor.status(call)
					end
					
					call.finish
				end
				
				def remove(connection)
					@monitors.each do |monitor|
						begin
							monitor.remove(connection)
						rescue => error
							Console.error(self, "Error while removing process!", monitor: monitor, exception: error)
						end
					end
				end
				
				def run(parent: Task.current)
					parent.async do |task|
						@monitors.each do |monitor|
							begin
								monitor.run
							rescue => error
								Console.error(self, "Error while starting monitor!", monitor: monitor, exception: error)
							end
						end
						
						@endpoint.accept do |peer|
							connection = Connection.new(peer, 1)
							connection.run(self)
						ensure
							connection.close
							remove(connection)
						end
						
						task.children&.each(&:wait)
					ensure
						task.stop
					end
				end
			end
		end
	end
end