lib/memory/leak/cluster.rb
# frozen_string_literal: true # Released under the MIT License. # Copyright, 2025, by Samuel Williams. require "console" require_relative "monitor" module Memory module Leak # Detects memory leaks in a cluster of processes. # # This class is used to manage a cluster of processes and detect memory leaks in each process. It can also apply a memory limit to the cluster, and terminate processes if the memory limit is exceeded. class Cluster # Create a new cluster. # # @parameter total_size_limit [Numeric | Nil] The total memory limit for the cluster. def initialize(total_size_limit: nil) @total_size = nil @total_size_limit = total_size_limit @processes = {} end # @returns [Hash] A serializable representation of the cluster. def as_json(...) { total_size: @total_size, total_size_limit: @total_size_limit, processes: @processes.transform_values(&:as_json), } end # @returns [String] The JSON representation of the cluster. def to_json(...) as_json.to_json(...) end # @attribute [Numeric | Nil] The total size of the cluster. attr :total_size # @attribute [Numeric | Nil] The total size limit for the cluster, in bytes, if which is exceeded, the cluster will terminate processes. attr_accessor :total_size_limit # @attribute [Hash(Integer, Monitor)] The process IDs and monitors in the cluster. attr :processes # Add a new process ID to the cluster. def add(process_id, **options) @processes[process_id] = Monitor.new(process_id, **options) end # Remove a process ID from the cluster. def remove(process_id) @processes.delete(process_id) end # Apply the memory limit to the cluster. If the total memory usage exceeds the limit, yields each process ID and monitor in order of maximum memory usage, so that they could be terminated and/or removed. # # @yields {|process_id, monitor| ...} each process ID and monitor in order of maximum memory usage, return true if it was terminated to adjust memory usage. def apply_limit!(total_size_limit = @total_size_limit) @total_size = @processes.values.map(&:current_size).sum if @total_size > total_size_limit Console.warn(self, "Total memory usage exceeded limit.", total_size: @total_size, total_size_limit: total_size_limit) else return false end sorted = @processes.sort_by do |process_id, monitor| -monitor.current_size end sorted.each do |process_id, monitor| if @total_size > total_size_limit yield(process_id, monitor, @total_size) # For the sake of the calculation, we assume that the process has been terminated: @total_size -= monitor.current_size else break end end end # Sample the memory usage of all processes in the cluster. def sample! System.memory_usages(@processes.keys) do |process_id, memory_usage| if monitor = @processes[process_id] monitor.current_size = memory_usage end end end # Check all processes in the cluster for memory leaks. # # @yields {|process_id, monitor| ...} each process ID and monitor that is leaking or exceeds the memory limit. def check!(&block) return to_enum(__method__) unless block_given? self.sample! leaking = [] @processes.each do |process_id, monitor| if monitor.leaking? Console.debug(self, "Memory Leak Detected!", process_id: process_id, monitor: monitor) leaking << [process_id, monitor] end end leaking.each(&block) # Finally, apply any per-cluster memory limits: apply_limit!(@total_size_limit, &block) if @total_size_limit end end end end