class Async::Container::Generic
A base class for implementing containers.
def self.run(...)
def self.run(...) self.new.run(...) end
def [] key
Look up a child process by key.
def [] key @keyed[key]&.value end
def async(**options, &block)
- Please use {spawn} or {run} instead.
def async(**options, &block) # warn "#{self.class}##{__method__} is deprecated, please use `spawn` or `run` instead.", uplevel: 1 require "async" spawn(**options) do |instance| Async(instance, &block) end end
def delete(key, child)
def delete(key, child) if key @keyed.delete(key) end @state.delete(child) end
def failed?
Whether any failures have occurred within the container.
def failed? @statistics.failed? end
def fiber(&block)
def fiber(&block) Fiber.new(blocking: true, &block) end
def fiber(&block)
def fiber(&block) Fiber.new(&block) end
def health_check_failed!(child, age_clock, health_check_timeout)
def health_check_failed!(child, age_clock, health_check_timeout) arn(self, "Child failed health check!", child: child, age: age_clock.total, health_check_timeout: health_check_timeout) child has failed the health check, we assume the worst and kill it immediately: l!
def initialize(**options)
Initialize the container.
def initialize(**options) @group = Group.new(**options) @running = true @state = {} @statistics = Statistics.new @keyed = {} end
def insert(key, child)
def insert(key, child) if key @keyed[key] = Keyed.new(key, child) end state = {} @state[child] = state return state end
def key?(key)
def key?(key) if key @keyed.key?(key) end end
def mark?(key)
def mark?(key) if key if value = @keyed[key] value.mark! return true end end return false end
def reload
def reload @keyed.each_value(&:clear!) yield dirty = false @keyed.delete_if do |key, value| value.stop? && (dirty = true) end return dirty end
def run(count: Container.processor_count, **options, &block)
Run multiple instances of the same block in the container.
def run(count: Container.processor_count, **options, &block) count.times do spawn(**options, &block) end return self end
def running?
def running? @group.running? end
def size
def size @group.size end
def sleep(duration = nil)
Sleep until some state change occurs.
def sleep(duration = nil) @group.sleep(duration) end
def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, &block)
@parameter key [Symbol] A key used for reloading child instances.
@parameter restart [Boolean] Whether to restart the child instance if it fails.
@parameter name [String] The name of the child instance.
Spawn a child instance into the container.
def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, &block) name ||= UNNAMED if mark?(key) Console.debug(self, "Reusing existing child.", child: {key: key, name: name}) return false end @statistics.spawn! fiber do while @running Console.info(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) child = self.start(name, &block) state = insert(key, child) Console.info(self, "Started child.", child: child, spawn: {key: key, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) # If a health check is specified, we will monitor the child process and terminate it if it does not update its state within the specified time. if health_check_timeout age_clock = state[:age] = Clock.start end begin status = @group.wait_for(child) do |message| case message when :health_check! if health_check_timeout&.<(age_clock.total) health_check_failed!(child, age_clock, health_check_timeout) end else state.update(message) age_clock&.reset! end end ensure delete(key, child) end if status.success? Console.info(self, "Child exited successfully.", status: status, running: @running) else @statistics.failure! Console.error(self, "Child exited with error!", status: status, running: @running) end if restart @statistics.restart! else break end end rescue => error Console.error(self, "Failure during child process management!", exception: error, running: @running) raise ensure Console.info(self, "Child process management loop exited.", running: @running) end.resume return true end
def status?(flag)
This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details.
e.g. `:ready`.
Returns true if all children instances have the specified status flag set.
def status?(flag) # This also returns true if all processes have exited/failed: @state.all?{|_, state| state[flag]} end
def stop(timeout = true)
Stop the children instances.
def stop(timeout = true) Console.info(self, "Stopping container...", timeout: timeout, caller: caller_locations) @running = false @group.stop(timeout) if @group.running? Console.warn(self, "Group is still running after stopping it!") else Console.info(self, "Group has stopped.") end rescue => error Console.error(self, "Error while stopping container!", exception: error) raise ensure @running = true end
def to_s
A human readable representation of the container.
def to_s "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures." end
def wait
def wait @group.wait end
def wait_until_ready
Wait until all the children instances have indicated that they are ready.
def wait_until_ready while true Console.debug(self) do |buffer| buffer.puts "Waiting for ready:" @state.each do |child, state| buffer.puts "\t#{child.inspect}: #{state}" end end self.sleep if self.status?(:ready) Console.logger.debug(self) do |buffer| buffer.puts "All ready:" @state.each do |child, state| buffer.puts "\t#{child.inspect}: #{state}" end end return true end end end