class Async::Container::Generic

A base class for implementing containers.

def self.run(...)

Run a new container.
def self.run(...)
	self.new.run(...)
end

def [] key

A key could be a symbol, a file path, or something else which the child instance represents.
Look up a child process by key.
def [] key
	@keyed[key]&.value
end

def async(**options, &block)

Deprecated:
  • Please use {spawn} or {run} instead.
def async(**options, &block)
	# warn "#{self.class}##{__method__} is deprecated, please use `spawn` or `run` instead.", uplevel: 1
	
	require "async"
	
	spawn(**options) do |instance|
		Async(instance, &block)
	end
end

def delete(key, child)

Clear the child (value) as running.
def delete(key, child)
	if key
		@keyed.delete(key)
	end
	
	@state.delete(child)
end

def failed?

@returns [Boolean]
Whether any failures have occurred within the container.
def failed?
	@statistics.failed?
end

def fiber(&block)

def fiber(&block)
	Fiber.new(blocking: true, &block)
end

def fiber(&block)

def fiber(&block)
	Fiber.new(&block)
end

def health_check_failed!(child, age_clock, health_check_timeout)

def health_check_failed!(child, age_clock, health_check_timeout)
arn(self, "Child failed health check!", child: child, age: age_clock.total, health_check_timeout: health_check_timeout)
child has failed the health check, we assume the worst and kill it immediately:
l!

def initialize(**options)

@parameter options [Hash] Options passed to the {Group} instance.

Initialize the container.
def initialize(**options)
	@group = Group.new(**options)
	@running = true
	
	@state = {}
	
	@statistics = Statistics.new
	@keyed = {}
end

def insert(key, child)

Register the child (value) as running.
def insert(key, child)
	if key
		@keyed[key] = Keyed.new(key, child)
	end
	
	state = {}
	
	@state[child] = state
	
	return state
end

def key?(key)

Whether a child instance exists for the given key.
def key?(key)
	if key
		@keyed.key?(key)
	end
end

def mark?(key)

Mark the container's keyed instance which ensures that it won't be discarded.
def mark?(key)
	if key
		if value = @keyed[key]
			value.mark!
			
			return true
		end
	end
	
	return false
end

def reload

Reload the container's keyed instances.
def reload
	@keyed.each_value(&:clear!)
	
	yield
	
	dirty = false
	
	@keyed.delete_if do |key, value|
		value.stop? && (dirty = true)
	end
	
	return dirty
end

def run(count: Container.processor_count, **options, &block)

@parameter count [Integer] The number of instances to start.
Run multiple instances of the same block in the container.
def run(count: Container.processor_count, **options, &block)
	count.times do
		spawn(**options, &block)
	end
	
	return self
end

def running?

Whether the container has running children instances.
def running?
	@group.running?
end

def size

@returns [Integer] The number of running children instances.
def size
	@group.size
end

def sleep(duration = nil)

@parameter duration [Numeric] the maximum amount of time to sleep for.
Sleep until some state change occurs.
def sleep(duration = nil)
	@group.sleep(duration)
end

def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, &block)

@parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy.
@parameter key [Symbol] A key used for reloading child instances.
@parameter restart [Boolean] Whether to restart the child instance if it fails.
@parameter name [String] The name of the child instance.
Spawn a child instance into the container.
def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, &block)
	name ||= UNNAMED
	
	if mark?(key)
		Console.debug(self, "Reusing existing child.", child: {key: key, name: name})
		return false
	end
	
	@statistics.spawn!
	
	fiber do
		while @running
			Console.info(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics)
			
			child = self.start(name, &block)
			state = insert(key, child)
			
			Console.info(self, "Started child.", child: child, spawn: {key: key, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics)
			
			# If a health check is specified, we will monitor the child process and terminate it if it does not update its state within the specified time.
			if health_check_timeout
				age_clock = state[:age] = Clock.start
			end
			
			begin
				status = @group.wait_for(child) do |message|
					case message
					when :health_check!
						if health_check_timeout&.<(age_clock.total)
							health_check_failed!(child, age_clock, health_check_timeout)
						end
					else
						state.update(message)
						age_clock&.reset!
					end
				end
			ensure
				delete(key, child)
			end
			
			if status.success?
				Console.info(self, "Child exited successfully.", status: status, running: @running)
			else
				@statistics.failure!
				Console.error(self, "Child exited with error!", status: status, running: @running)
			end
			
			if restart
				@statistics.restart!
			else
				break
			end
		end
	rescue => error
		Console.error(self, "Failure during child process management!", exception: error, running: @running)
		raise
	ensure
		Console.info(self, "Child process management loop exited.", running: @running)
	end.resume
	
	return true
end

def status?(flag)

@returns [Boolean]
This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details.
e.g. `:ready`.
Returns true if all children instances have the specified status flag set.
def status?(flag)
	# This also returns true if all processes have exited/failed:
	@state.all?{|_, state| state[flag]}
end

def stop(timeout = true)

@parameter timeout [Boolean | Numeric] Whether to stop gracefully, or a specific timeout.
Stop the children instances.
def stop(timeout = true)
	Console.info(self, "Stopping container...", timeout: timeout, caller: caller_locations)
	@running = false
	@group.stop(timeout)
	
	if @group.running?
		Console.warn(self, "Group is still running after stopping it!")
	else
		Console.info(self, "Group has stopped.")
	end
rescue => error
	Console.error(self, "Error while stopping container!", exception: error)
	raise
ensure
	@running = true
end

def to_s

@returns [String]
A human readable representation of the container.
def to_s
	"#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures."
end

def wait

Wait until all spawned tasks are completed.
def wait
	@group.wait
end

def wait_until_ready

@returns [Boolean] The children all became ready.
Wait until all the children instances have indicated that they are ready.
def wait_until_ready
	while true
		Console.debug(self) do |buffer|
			buffer.puts "Waiting for ready:"
			@state.each do |child, state|
				buffer.puts "\t#{child.inspect}: #{state}"
			end
		end
		
		self.sleep
		
		if self.status?(:ready)
			Console.logger.debug(self) do |buffer|
				buffer.puts "All ready:"
				@state.each do |child, state|
					buffer.puts "\t#{child.inspect}: #{state}"
				end
			end
			
			return true
		end
	end
end