# frozen_string_literal: true# Released under the MIT License.# Copyright, 2019-2024, by Samuel Williams.require"etc"require"async/clock"require_relative"group"require_relative"keyed"require_relative"statistics"moduleAsyncmoduleContainer# An environment variable key to override {.processor_count}.ASYNC_CONTAINER_PROCESSOR_COUNT="ASYNC_CONTAINER_PROCESSOR_COUNT"# The processor count which may be used for the default number of container threads/processes. You can override the value provided by the system by specifying the `ASYNC_CONTAINER_PROCESSOR_COUNT` environment variable.# @returns [Integer] The number of hardware processors which can run threads/processes simultaneously.# @raises [RuntimeError] If the process count is invalid.defself.processor_count(env=ENV)count=env.fetch(ASYNC_CONTAINER_PROCESSOR_COUNT)doEtc.nprocessorsrescue1end.to_iifcount<1raiseRuntimeError,"Invalid processor count #{count}!"endreturncountend# A base class for implementing containers.classGeneric# Run a new container.defself.run(...)self.new.run(...)endUNNAMED="Unnamed"# Initialize the container.## @parameter options [Hash] Options passed to the {Group} instance.definitialize(**options)@group=Group.new(**options)@running=true@state={}@statistics=Statistics.new@keyed={}end# @attribute [Group] The group of running children instances.attr:group# @returns [Integer] The number of running children instances.defsize@group.sizeend# @attribute [Hash(Child, Hash)] The state of each child instance.attr:state# A human readable representation of the container.# @returns [String]defto_s"#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures."end# Look up a child process by key.# A key could be a symbol, a file path, or something else which the child instance represents.def[]key@keyed[key]&.valueend# Statistics relating to the behavior of children instances.# @attribute [Statistics]attr:statistics# Whether any failures have occurred within the container.# @returns [Boolean]deffailed?@statistics.failed?end# Whether the container has running children instances.defrunning?@group.running?end# Sleep until some state change occurs.# @parameter duration [Numeric] the maximum amount of time to sleep for.defsleep(duration=nil)@group.sleep(duration)end# Wait until all spawned tasks are completed.defwait@group.waitend# Returns true if all children instances have the specified status flag set.# e.g. `:ready`.# This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details.# @returns [Boolean]defstatus?(flag)# This also returns true if all processes have exited/failed:@state.all?{|_,state|state[flag]}end# Wait until all the children instances have indicated that they are ready.# @returns [Boolean] The children all became ready.defwait_until_readywhiletrueConsole.debug(self)do|buffer|buffer.puts"Waiting for ready:"@state.eachdo|child,state|buffer.puts"\t#{child.inspect}: #{state}"endendself.sleepifself.status?(:ready)Console.logger.debug(self)do|buffer|buffer.puts"All ready:"@state.eachdo|child,state|buffer.puts"\t#{child.inspect}: #{state}"endendreturntrueendendend# Stop the children instances.# @parameter timeout [Boolean | Numeric] Whether to stop gracefully, or a specific timeout.defstop(timeout=true)@running=false@group.stop(timeout)if@group.running?Console.warn(self){"Group is still running after stopping it!"}endensure@running=trueendprotecteddefhealth_check_failed!(child,age_clock,health_check_timeout)Console.warn(self,"Child failed health check!",child: child,age: age_clock.total,health_check_timeout: health_check_timeout)# If the child has failed the health check, we assume the worst and kill it immediately:child.kill!end# Spawn a child instance into the container.# @parameter name [String] The name of the child instance.# @parameter restart [Boolean] Whether to restart the child instance if it fails.# @parameter key [Symbol] A key used for reloading child instances.# @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy.defspawn(name: nil,restart: false,key: nil,health_check_timeout: nil,&block)name||=UNNAMEDifmark?(key)Console.debug(self){"Reusing existing child for #{key}: #{name}"}returnfalseend@statistics.spawn!fiberdowhile@runningchild=self.start(name,&block)state=insert(key,child)# If a health check is specified, we will monitor the child process and terminate it if it does not update its state within the specified time.ifhealth_check_timeoutage_clock=state[:age]=Clock.startendbeginstatus=@group.wait_for(child)do|message|casemessagewhen:health_check!ifhealth_check_timeout&.<(age_clock.total)health_check_failed!(child,age_clock,health_check_timeout)endelsestate.update(message)age_clock&.reset!endendensuredelete(key,child)endifstatus.success?Console.debug(self){"#{child} exited with #{status}"}else@statistics.failure!Console.error(self,status: status)endifrestart@statistics.restart!elsebreakendendend.resumereturntrueend# Run multiple instances of the same block in the container.# @parameter count [Integer] The number of instances to start.defrun(count: Container.processor_count,**options,&block)count.timesdospawn(**options,&block)endreturnselfend# @deprecated Please use {spawn} or {run} instead.defasync(**options,&block)# warn "#{self.class}##{__method__} is deprecated, please use `spawn` or `run` instead.", uplevel: 1require"async"spawn(**options)do|instance|Async(instance,&block)endend# Reload the container's keyed instances.defreload@keyed.each_value(&:clear!)yielddirty=false@keyed.delete_ifdo|key,value|value.stop?&&(dirty=true)endreturndirtyend# Mark the container's keyed instance which ensures that it won't be discarded.defmark?(key)ifkeyifvalue=@keyed[key]value.mark!returntrueendendreturnfalseend# Whether a child instance exists for the given key.defkey?(key)ifkey@keyed.key?(key)endendprotected# Register the child (value) as running.definsert(key,child)ifkey@keyed[key]=Keyed.new(key,child)endstate={}@state[child]=statereturnstateend# Clear the child (value) as running.defdelete(key,child)ifkey@keyed.delete(key)end@state.delete(child)endprivateifFiber.respond_to?(:blocking?)deffiber(&block)Fiber.new(blocking: true,&block)endelsedeffiber(&block)Fiber.new(&block)endendendendend