class Memory::Leak::Cluster
This class is used to manage a cluster of processes and detect memory leaks in each process. It can also apply a memory limit to the cluster, and terminate processes if the memory limit is exceeded.
Detects memory leaks in a cluster of processes.
def add(process_id, **options)
def add(process_id, **options) @processes[process_id] = Monitor.new(process_id, **options) end
def apply_limit!(limit = @limit)
Apply the memory limit to the cluster. If the total memory usage exceeds the limit, yields each process ID and monitor in order of maximum memory usage, so that they could be terminated and/or removed.
def apply_limit!(limit = @limit) total = @processes.values.map(&:current).sum if total > limit Console.warn(self, "Total memory usage exceeded limit.", total: total, limit: limit) end sorted = @processes.sort_by do |process_id, monitor| -monitor.current end sorted.each do |process_id, monitor| if total > limit if yield process_id, monitor, total total -= monitor.current end else break end end end
def as_json(...)
def as_json(...) { limit: @limit, processes: @processes.transform_values(&:as_json), } end
def check!(&block)
Check all processes in the cluster for memory leaks.
def check!(&block) leaking = [] @processes.each do |process_id, monitor| monitor.sample! if monitor.leaking? Console.debug(self, "Memory Leak Detected!", process_id: process_id, monitor: monitor) leaking << [process_id, monitor] end end leaking.each(&block) # Finally, apply any per-cluster memory limits: apply_limit!(@limit, &block) if @limit end
def initialize(limit: nil)
Create a new cluster.
def initialize(limit: nil) @limit = limit @processes = {} end
def remove(process_id)
def remove(process_id) @processes.delete(process_id) end
def to_json(...)
def to_json(...) as_json.to_json(...) end