class Async::Container::Supervisor::MemoryMonitor
def add(process_id)
Add a process to the memory monitor. You may override this to control how processes are added to the cluster.
def add(process_id) @cluster.add(process_id, **@options) end
def initialize(interval: 10, total_size_limit: nil, **options)
@parameter total_size_limit [Integer] The total size limit of all processes, or nil for no limit.
@parameter interval [Integer] The interval at which to check for memory leaks.
Create a new memory monitor.
def initialize(interval: 10, total_size_limit: nil, **options) @interval = interval @cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit) # We use these options when adding processes to the cluster: @options = options @processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity} end
def memory_leak_detected(process_id, monitor)
@parameter monitor [Memory::Leak::Monitor] The monitor that detected the memory leak.
@parameter process_id [Integer] The process ID of the process that has a memory leak.
Invoked when a memory leak is detected.
def memory_leak_detected(process_id, monitor) Console.info(self, "Killing process:", process_id: process_id) Process.kill(:INT, process_id) true end
def register(connection)
def register(connection) Console.debug(self, "Registering connection:", connection: connection, state: connection.state) if process_id = connection.state[:process_id] connections = @processes[process_id] if connections.empty? Console.debug(self, "Registering process:", process_id: process_id) self.add(process_id) end connections.add(connection) end end
def remove(connection)
def remove(connection) if process_id = connection.state[:process_id] connections = @processes[process_id] connections.delete(connection) if connections.empty? Console.debug(self, "Removing process:", process_id: process_id) @cluster.remove(process_id) end end end
def run
Run the memory monitor.
def run Async do while true # This block must return true if the process was killed. @cluster.check! do |process_id, monitor| Console.error(self, "Memory leak detected in process:", process_id: process_id, monitor: monitor) memory_leak_detected(process_id, monitor) end sleep(@interval) end end end
def status(call)
Dump the current status of the memory monitor.
def status(call) call.push(memory_monitor: @cluster) end