class Memory::Leak::Cluster
This class is used to manage a cluster of processes and detect memory leaks in each process. It can also apply a memory limit to the cluster, and terminate processes if the memory limit is exceeded.
Detects memory leaks in a cluster of processes.
def add(process_id, **options)
def add(process_id, **options) @processes[process_id] = Monitor.new(process_id, **options) end
def apply_limit!(total_size_limit = @total_size_limit)
Apply the memory limit to the cluster. If the total memory usage exceeds the limit, yields each process ID and monitor in order of maximum memory usage, so that they could be terminated and/or removed.
def apply_limit!(total_size_limit = @total_size_limit) @total_size = @processes.values.map(&:current_size).sum if @total_size > total_size_limit Console.warn(self, "Total memory usage exceeded limit.", total_size: @total_size, total_size_limit: total_size_limit) else return false end sorted = @processes.sort_by do |process_id, monitor| -monitor.current_size end sorted.each do |process_id, monitor| if @total_size > total_size_limit yield(process_id, monitor, @total_size) # For the sake of the calculation, we assume that the process has been terminated: @total_size -= monitor.current_size else break end end end
def as_json(...)
def as_json(...) { total_size: @total_size, total_size_limit: @total_size_limit, processes: @processes.transform_values(&:as_json), } end
def check!(&block)
Check all processes in the cluster for memory leaks.
def check!(&block) return to_enum(__method__) unless block_given? self.sample! leaking = [] @processes.each do |process_id, monitor| if monitor.leaking? Console.debug(self, "Memory Leak Detected!", process_id: process_id, monitor: monitor) leaking << [process_id, monitor] end end leaking.each(&block) # Finally, apply any per-cluster memory limits: apply_limit!(@total_size_limit, &block) if @total_size_limit end
def initialize(total_size_limit: nil)
Create a new cluster.
def initialize(total_size_limit: nil) @total_size = nil @total_size_limit = total_size_limit @processes = {} end
def remove(process_id)
def remove(process_id) @processes.delete(process_id) end
def sample!
def sample! System.memory_usages(@processes.keys) do |process_id, memory_usage| if monitor = @processes[process_id] monitor.current_size = memory_usage end end end
def to_json(...)
def to_json(...) as_json.to_json(...) end