lib/puma/reactor.rb
# frozen_string_literal: true module Puma class UnsupportedBackend < StandardError; end # Monitors a collection of IO objects, calling a block whenever # any monitored object either receives data or times out, or when the Reactor shuts down. # # The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev, # Java NIO or just plain IO#select). The call to `NIO::Selector#select` will # 'wakeup' any IO object that receives data. # # This class additionally tracks a timeout for every added object, # and wakes up any object when its timeout elapses. # # The implementation uses a Queue to synchronize adding new objects from the internal select loop. class Reactor # @!attribute [rw] reactor_max # Maximum number of clients in the selector. Reset with calls to `Server.stats`. attr_accessor :reactor_max attr_reader :reactor_size # Create a new Reactor to monitor IO objects added by #add. # The provided block will be invoked when an IO has data available to read, # its timeout elapses, or when the Reactor shuts down. def initialize(backend, &block) require 'nio' valid_backends = [:auto, *::NIO::Selector.backends] unless valid_backends.include?(backend) raise ArgumentError.new("unsupported IO selector backend: #{backend} (available backends: #{valid_backends.join(', ')})") end @selector = ::NIO::Selector.new(NIO::Selector.backends.delete(backend)) @input = Queue.new @timeouts = [] @block = block @reactor_size = 0 @reactor_max = 0 end # Run the internal select loop, using a background thread by default. def run(background=true) if background @thread = Thread.new do Puma.set_thread_name "reactor" select_loop end else select_loop end end # Add a new client to monitor. # The object must respond to #timeout and #timeout_at. # Returns false if the reactor is already shut down. def add(client) @input << client @selector.wakeup true rescue ClosedQueueError, IOError # Ignore if selector is already closed false end # Shutdown the reactor, blocking until the background thread is finished. def shutdown @input.close begin @selector.wakeup rescue IOError # Ignore if selector is already closed end @thread&.join end private def select_loop begin until @input.closed? && @input.empty? # Wakeup any registered object that receives incoming data. # Block until the earliest timeout or Selector#wakeup is called. timeout = (earliest = @timeouts.first) && earliest.timeout @selector.select(timeout) do |monitor| wakeup!(monitor.value) end # Wakeup all objects that timed out. timed_out = @timeouts.take_while { |client| client.timeout == 0 } timed_out.each { |client| wakeup!(client) } unless @input.empty? until @input.empty? client = @input.pop register(client) if client.io_ok? end @timeouts.sort_by!(&:timeout_at) end end rescue StandardError => e STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})" STDERR.puts e.backtrace retry end # Wakeup all remaining objects on shutdown. @timeouts.each(&@block) @selector.close end # Start monitoring the object. def register(client) @selector.register(client.to_io, :r).value = client @reactor_size += 1 @reactor_max = @reactor_size if @reactor_max < @reactor_size @timeouts << client rescue ArgumentError # unreadable clients raise error when processed by NIO end # 'Wake up' a monitored object by calling the provided block. # Stop monitoring the object if the block returns `true`. def wakeup!(client) if @block.call client @selector.deregister client.to_io @reactor_size -= 1 @timeouts.delete client end end end end