lib/async/container/supervisor/memory_monitor.rb
# frozen_string_literal: true # Released under the MIT License. # Copyright, 2025, by Samuel Williams. require "memory/leak/cluster" require "set" module Async module Container module Supervisor class MemoryMonitor # Create a new memory monitor. # # @parameter interval [Integer] The interval at which to check for memory leaks. # @parameter total_size_limit [Integer] The total size limit of all processes, or nil for no limit. # @parameter options [Hash] Options to pass to the cluster when adding processes. def initialize(interval: 10, total_size_limit: nil, **options) @interval = interval @cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit) # We use these options when adding processes to the cluster: @options = options @processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity} end # Add a process to the memory monitor. You may override this to control how processes are added to the cluster. # # @parameter process_id [Integer] The process ID to add. def add(process_id) @cluster.add(process_id, **@options) end # Register the connection (worker) with the memory monitor. def register(connection) Console.debug(self, "Registering connection:", connection: connection, state: connection.state) if process_id = connection.state[:process_id] connections = @processes[process_id] if connections.empty? Console.debug(self, "Registering process:", process_id: process_id) self.add(process_id) end connections.add(connection) end end # Remove the connection (worker) from the memory monitor. def remove(connection) if process_id = connection.state[:process_id] connections = @processes[process_id] connections.delete(connection) if connections.empty? Console.debug(self, "Removing process:", process_id: process_id) @cluster.remove(process_id) end end end # Dump the current status of the memory monitor. # # @parameter call [Connection::Call] The call to respond to. def status(call) call.push(memory_monitor: @cluster) end # Invoked when a memory leak is detected. # # @parameter process_id [Integer] The process ID of the process that has a memory leak. # @parameter monitor [Memory::Leak::Monitor] The monitor that detected the memory leak. # @returns [Boolean] True if the process was killed. def memory_leak_detected(process_id, monitor) Console.info(self, "Killing process:", process_id: process_id) Process.kill(:INT, process_id) true end # Run the memory monitor. # # @returns [Async::Task] The task that is running the memory monitor. def run Async do while true # This block must return true if the process was killed. @cluster.check! do |process_id, monitor| Console.error(self, "Memory leak detected in process:", process_id: process_id, monitor: monitor) memory_leak_detected(process_id, monitor) end sleep(@interval) end end end end end end end