class Async::Container::Group
Manages a group of running processes.
def any?
Whether the group contains any running processes.
def any? @running.any? end
def empty?
Whether the group is empty.
def empty? @running.empty? end
def health_check!
def health_check! @running.each_value do |fiber| fiber.resume(:health_check!) end end
def initialize(health_check_interval: 1.0)
Initialize an empty group.
def initialize(health_check_interval: 1.0) @health_check_interval = health_check_interval # The running fibers, indexed by IO: @running = {} # This queue allows us to wait for processes to complete, without spawning new processes as a result. @queue = nil end
def inspect
def inspect "#<#{self.class} running=#{@running.size}>" end
def interrupt
Interrupt all running processes.
def interrupt Console.info(self, "Sending interrupt to #{@running.size} running processes...") @running.each_value do |fiber| fiber.resume(Interrupt) end end
def resume
def resume if @queue queue = @queue @queue = nil queue.each(&:resume) end end
def running?
Whether the group contains any running processes.
def running? @running.any? end
def select(duration)
def select(duration) ::Thread.handle_interrupt(SignalException => :immediate) do readable, _, _ = ::IO.select(@running.keys, nil, nil, duration) return readable end end
def size
def size @running.size end
def sleep(duration)
def sleep(duration) self.resume self.suspend self.wait_for_children(duration) end
def stop(timeout = 1)
Stop all child processes using {#terminate}.
def stop(timeout = 1) Console.debug(self, "Stopping all processes...", timeout: timeout) # Use a default timeout if not specified: timeout = 1 if timeout == true if timeout start_time = Async::Clock.now self.interrupt while self.any? duration = Async::Clock.now - start_time remaining = timeout - duration if remaining >= 0 self.wait_for_children(duration) else self.wait_for_children(0) break end end end # Terminate all children: self.terminate if any? # Wait for all children to exit: self.wait end
def suspend
def suspend @queue ||= [] end
def terminate
Terminate all running processes.
def terminate Console.info(self, "Sending terminate to #{@running.size} running processes...") @running.each_value do |fiber| fiber.resume(Terminate) end end
def wait
def wait self.resume with_health_checks do |duration| self.wait_for_children(duration) end end
def wait_for(channel)
def wait_for(channel) io = channel.in @running[io] = Fiber.current while @running.key?(io) # Wait for some event on the channel: result = Fiber.yield if result == Interrupt channel.interrupt! elsif result == Terminate channel.terminate! elsif result yield result elsif message = channel.receive yield message else # Wait for the channel to exit: return channel.wait end end ensure @running.delete(io) end
def wait_for_children(duration = nil)
def wait_for_children(duration = nil) # This log is a big noisy and doesn't really provide a lot of useful information. # Console.debug(self, "Waiting for children...", duration: duration, running: @running) if !@running.empty? # Maybe consider using a proper event loop here: if ready = self.select(duration) ready.each do |io| @running[io].resume end end end end
def with_health_checks
def with_health_checks lth_check_interval _check_clock = Clock.start self.running? ion = [@health_check_interval - health_check_clock.total, 0].max duration alth_check_clock.total > @health_check_interval .health_check! th_check_clock.reset! self.running? nil
def yield
def yield if @queue fiber = Fiber.current @queue << fiber Fiber.yield end end