class Async::Container::Group

Manages a group of running processes.

def any?

@returns [Boolean]
Whether the group contains any running processes.
def any?
	@running.any?
end

def each_running(&block)

def each_running(&block)
eate a copy of the values here, in case the block modifies the running set:
g.values.each(&block)

def empty?

@returns [Boolean]
Whether the group is empty.
def empty?
	@running.empty?
end

def health_check!

Perform a health check on all running processes.
def health_check!
	each_running do |fiber|
		fiber.resume(:health_check!)
	end
end

def initialize(health_check_interval: 1.0)

@parameter health_check_interval [Numeric | Nil] The (biggest) interval at which health checks are performed.

Initialize an empty group.
def initialize(health_check_interval: 1.0)
	@health_check_interval = health_check_interval
	
	# The running fibers, indexed by IO:
	@running = {}
	
	# This queue allows us to wait for processes to complete, without spawning new processes as a result.
	@queue = nil
end

def inspect

@returns [String] A human-readable representation of the group.
def inspect
	"#<#{self.class} running=#{@running.size}>"
end

def interrupt

This resumes the controlling fiber with an instance of {Interrupt}.
Interrupt all running processes.
def interrupt
	Console.info(self, "Sending interrupt to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Interrupt)
	end
end

def kill

This resumes the controlling fiber with an instance of {Kill}.
Kill all running processes.
def kill
	Console.info(self, "Sending kill to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Kill)
	end
end

def resume

def resume
	if @queue
		queue = @queue
		@queue = nil
		
		queue.each(&:resume)
	end
end

def running?

@returns [Boolean]
Whether the group contains any running processes.
def running?
	@running.any?
end

def select(duration)

Wait for a child process to exit OR a signal to be received.
def select(duration)
	::Thread.handle_interrupt(SignalException => :immediate) do
		readable, _, _ = ::IO.select(@running.keys, nil, nil, duration)
		
		return readable
	end
end

def size

@returns [Integer] The number of running processes.
def size
	@running.size
end

def sleep(duration)

Sleep for at most the specified duration until some state change occurs.
def sleep(duration)
	self.resume
	self.suspend
	
	self.wait_for_children(duration)
end

def stop(graceful = true, interrupt_timeout: INTERRUPT_TIMEOUT, terminate_timeout: TERMINATE_TIMEOUT)

@parameter terminate_timeout [Numeric | Nil] Time to wait after SIGTERM before escalating to SIGKILL.
@parameter interrupt_timeout [Numeric | Nil] Time to wait after SIGINT before escalating to SIGTERM.
@parameter graceful [Boolean] Whether to send SIGINT first or skip directly to SIGTERM.

If `graceful` is false, skips the SIGINT phase and goes directly to SIGTERM → SIGKILL.

3. Send SIGKILL and wait indefinitely for process cleanup
2. Send SIGTERM and wait up to `terminate_timeout` seconds
1. Send SIGINT and wait up to `interrupt_timeout` seconds
A graceful shutdown performs the following sequence:

Stop all child processes with a multi-phase shutdown sequence.
def stop(graceful = true, interrupt_timeout: INTERRUPT_TIMEOUT, terminate_timeout: TERMINATE_TIMEOUT)
	case graceful
	when true
		# Use defaults.
	when false
		interrupt_timeout = nil
	when Numeric
		interrupt_timeout = graceful
		terminate_timeout = graceful
	end
	
	Console.debug(self, "Stopping all processes...", interrupt_timeout: interrupt_timeout, terminate_timeout: terminate_timeout)
	
	# If a timeout is specified, interrupt the children first:
	if interrupt_timeout
		clock = Async::Clock.start
		
		# Interrupt the children:
		self.interrupt
		
		# Wait for the children to exit:
		self.wait_for_exit(clock, interrupt_timeout)
	end
	
	if terminate_timeout and self.any?
		clock = Async::Clock.start
		
		# If the children are still running, terminate them:
		self.terminate
		
		# Wait for the children to exit:
		self.wait_for_exit(clock, terminate_timeout)
	end
	
	if any?
		self.kill
		self.wait
	end
end

def suspend

def suspend
	@queue ||= []
end

def terminate

This resumes the controlling fiber with an instance of {Terminate}.
Terminate all running processes.
def terminate
	Console.info(self, "Sending terminate to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Terminate)
	end
end

def wait

Begin any outstanding queued processes and wait for them indefinitely.
def wait
	self.resume
	
	with_health_checks do |duration|
		self.wait_for_children(duration)
	end
end

def wait_for(channel)

Wait for a message in the specified {Channel}.
def wait_for(channel)
	io = channel.in
	
	@running[io] = Fiber.current
	
	while @running.key?(io)
		# Wait for some event on the channel:
		result = Fiber.yield
		
		if result == Interrupt
			channel.interrupt!
		elsif result == Terminate
			channel.terminate!
		elsif result == Kill
			channel.kill!
		elsif result
			yield result
		elsif message = channel.receive
			yield message
		else
			# Wait for the channel to exit:
			return channel.wait
		end
	end
ensure
	@running.delete(io)
end

def wait_for_children(duration = nil)

def wait_for_children(duration = nil)
	# This log is a bit noisy and doesn't really provide a lot of useful information:
	Console.debug(self, "Waiting for children...", duration: duration, running: @running)
	
	unless @running.empty?
		# Maybe consider using a proper event loop here:
		if ready = self.select(duration)
			ready.each do |io|
				@running[io].resume
			end
		end
	end
end

def wait_for_exit(clock, timeout)

def wait_for_exit(clock, timeout)
elf.any?
on = timeout - clock.total
ation >= 0
wait_for_children(duration)
wait_for_children(0)

def with_health_checks

def with_health_checks
lth_check_interval
_check_clock = Clock.start
self.running?
ion = [@health_check_interval - health_check_clock.total, 0].max
 duration
alth_check_clock.total > @health_check_interval
.health_check!
th_check_clock.reset!
self.running?
 nil

def yield

def yield
	if @queue
		fiber = Fiber.current
		
		@queue << fiber
		Fiber.yield
	end
end