lib/puma/server.rb



# frozen_string_literal: true

require 'stringio'

require_relative 'thread_pool'
require_relative 'const'
require_relative 'log_writer'
require_relative 'events'
require_relative 'null_io'
require_relative 'reactor'
require_relative 'client'
require_relative 'binder'
require_relative 'util'
require_relative 'response'
require_relative 'configuration'
require_relative 'cluster_accept_loop_delay'

require 'socket'
require 'io/wait' unless Puma::HAS_NATIVE_IO_WAIT

module Puma
  # The HTTP Server itself. Serves out a single Rack app.
  #
  # This class is used by the `Puma::Single` and `Puma::Cluster` classes
  # to generate one or more `Puma::Server` instances capable of handling requests.
  # Each Puma process will contain one `Puma::Server` instance.
  #
  # The `Puma::Server` instance pulls requests from the socket, adds them to a
  # `Puma::Reactor` where they get eventually passed to a `Puma::ThreadPool`.
  #
  # Each `Puma::Server` will have one reactor and one thread pool.
  class Server
    module FiberPerRequest
      def handle_request(processor, client, requests)
        Fiber.new do
          super
        end.resume
      end
    end

    include Const
    include Response

    attr_reader :options
    attr_reader :thread
    attr_reader :log_writer
    attr_reader :events
    attr_reader :min_threads, :max_threads  # for #stats
    attr_reader :requests_count             # @version 5.0.0

    # @todo the following may be deprecated in the future
    attr_reader :auto_trim_time, :early_hints, :first_data_timeout,
      :leak_stack_on_error,
      :persistent_timeout, :reaping_time

    attr_accessor :app
    attr_accessor :binder

    # Create a server for the rack app +app+.
    #
    # +log_writer+ is a Puma::LogWriter object used to log info and error messages.
    #
    # +events+ is a Puma::Events object used to notify application status events.
    #
    # Server#run returns a thread that you can join on to wait for the server
    # to do its work.
    #
    # @note Several instance variables exist so they are available for testing,
    #   and have default values set via +fetch+.  Normally the values are set via
    #   `::Puma::Configuration.puma_default_options`.
    #
    # @note The `events` parameter is set to nil, and set to `Events.new` in code.
    #   Often `options` needs to be passed, but `events` does not.  Using nil allows
    #   calling code to not require events.rb.
    #
    def initialize(app, events = nil, options = {})
      @app = app
      @events = events || Events.new

      @check, @notify = nil
      @status = :stop

      @thread = nil
      @thread_pool = nil
      @reactor = nil

      @env_set_http_version = nil

      @options = if options.is_a?(UserFileDefaultOptions)
        options
      else
        UserFileDefaultOptions.new(options, Configuration::DEFAULTS)
      end

      @clustered                 = (@options.fetch :workers, 0) > 0
      @worker_write              = @options[:worker_write]
      @log_writer                = @options.fetch :log_writer, LogWriter.stdio
      @early_hints               = @options[:early_hints]
      @first_data_timeout        = @options[:first_data_timeout]
      @persistent_timeout        = @options[:persistent_timeout]
      @idle_timeout              = @options[:idle_timeout]
      @min_threads               = @options[:min_threads]
      @max_threads               = @options[:max_threads]
      @queue_requests            = @options[:queue_requests]
      @max_keep_alive            = @options[:max_keep_alive]
      @enable_keep_alives        = @options[:enable_keep_alives]
      @enable_keep_alives      &&= @queue_requests
      @io_selector_backend       = @options[:io_selector_backend]
      @http_content_length_limit = @options[:http_content_length_limit]
      @cluster_accept_loop_delay = ClusterAcceptLoopDelay.new(
        workers: @options[:workers],
        max_delay: @options[:wait_for_less_busy_worker] || 0 # Real default is in Configuration::DEFAULTS, this is for unit testing
      )

      if @options[:fiber_per_request]
        singleton_class.prepend(FiberPerRequest)
      end

      # make this a hash, since we prefer `key?` over `include?`
      @supported_http_methods =
        if @options[:supported_http_methods] == :any
          :any
        else
          if (ary = @options[:supported_http_methods])
            ary
          else
            SUPPORTED_HTTP_METHODS
          end.sort.product([nil]).to_h.freeze
        end

      temp = !!(@options[:environment] =~ /\A(development|test)\z/)
      @leak_stack_on_error = @options[:environment] ? temp : true

      @binder = Binder.new(log_writer, @options)

      ENV['RACK_ENV'] ||= "development"

      @mode = :http

      @precheck_closing = true

      @requests_count = 0

      @idle_timeout_reached = false
    end

    def inherit_binder(bind)
      @binder = bind
    end

    class << self
      # @!attribute [r] current
      def current
        Thread.current.puma_server
      end

      # :nodoc:
      # @version 5.0.0
      def tcp_cork_supported?
        Socket.const_defined?(:TCP_CORK) && Socket.const_defined?(:IPPROTO_TCP)
      end

      # :nodoc:
      # @version 5.0.0
      def closed_socket_supported?
        Socket.const_defined?(:TCP_INFO) && Socket.const_defined?(:IPPROTO_TCP)
      end
      private :tcp_cork_supported?
      private :closed_socket_supported?
    end

    # On Linux, use TCP_CORK to better control how the TCP stack
    # packetizes our stream. This improves both latency and throughput.
    # socket parameter may be an MiniSSL::Socket, so use to_io
    #
    if tcp_cork_supported?
      # 6 == Socket::IPPROTO_TCP
      # 3 == TCP_CORK
      # 1/0 == turn on/off
      def cork_socket(socket)
        skt = socket.to_io
        begin
          skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if skt.kind_of? TCPSocket
        rescue IOError, SystemCallError
        end
      end

      def uncork_socket(socket)
        skt = socket.to_io
        begin
          skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if skt.kind_of? TCPSocket
        rescue IOError, SystemCallError
        end
      end
    else
      def cork_socket(socket)
      end

      def uncork_socket(socket)
      end
    end

    if closed_socket_supported?
      UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze

      def closed_socket?(socket)
        skt = socket.to_io
        return false unless skt.kind_of?(TCPSocket) && @precheck_closing

        begin
          tcp_info = skt.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO)
        rescue IOError, SystemCallError
          @precheck_closing = false
          false
        else
          state = tcp_info.unpack(UNPACK_TCP_STATE_FROM_TCP_INFO)[0]
          # TIME_WAIT: 6, CLOSE: 7, CLOSE_WAIT: 8, LAST_ACK: 9, CLOSING: 11
          (state >= 6 && state <= 9) || state == 11
        end
      end
    else
      def closed_socket?(socket)
        false
      end
    end

    # @!attribute [r] backlog
    def backlog
      @thread_pool&.backlog
    end

    # @!attribute [r] running
    def running
      @thread_pool&.spawned
    end

    # This number represents the number of requests that
    # the server is capable of taking right now.
    #
    # For example if the number is 5 then it means
    # there are 5 threads sitting idle ready to take
    # a request. If one request comes in, then the
    # value would be 4 until it finishes processing.
    # @!attribute [r] pool_capacity
    def pool_capacity
      @thread_pool&.pool_capacity
    end

    # Runs the server.
    #
    # If +background+ is true (the default) then a thread is spun
    # up in the background to handle requests. Otherwise requests
    # are handled synchronously.
    #
    def run(background=true, thread_name: 'srv')
      BasicSocket.do_not_reverse_lookup = true

      @events.fire :state, :booting

      @status = :run

      @thread_pool = ThreadPool.new(thread_name, options, server: self) do |processor, client|
        process_client(processor, client)
      end

      if @queue_requests
        @reactor = Reactor.new(@io_selector_backend) { |c|
          # Inversion of control, the reactor is calling a method on the server when it
          # is done buffering a request or receives a new request from a keepalive connection.
          self.reactor_wakeup(c)
        }
        @reactor.run
      end

      @thread_pool.auto_reap! if options[:reaping_time]
      @thread_pool.auto_trim! if @min_threads != @max_threads && options[:auto_trim_time]

      @check, @notify = Puma::Util.pipe unless @notify

      @events.fire :state, :running

      if background
        @thread = Thread.new do
          Puma.set_thread_name thread_name
          handle_servers
        end
        return @thread
      else
        handle_servers
      end
    end

    # This method is called from the Reactor thread when a queued Client receives data,
    # times out, or when the Reactor is shutting down.
    #
    # While the code lives in the Server, the logic is executed on the reactor thread, independently
    # from the server.
    #
    # It is responsible for ensuring that a request has been completely received
    # before it starts to be processed by the ThreadPool. This may be known as read buffering.
    # If read buffering is not done, and no other read buffering is performed (such as by an application server
    # such as nginx) then the application would be subject to a slow client attack.
    #
    # For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/main/docs/architecture.md).
    #
    # The method checks to see if it has the full header and body with
    # the `Puma::Client#try_to_finish` method. If the full request has been sent,
    # then the request is passed to the ThreadPool (`@thread_pool << client`)
    # so that a "processor thread" can pick up the request and begin to execute application logic.
    # The Client is then removed from the reactor (return `true`).
    #
    # If a client object times out, a 408 response is written, its connection is closed,
    # and the object is removed from the reactor (return `true`).
    #
    # If the Reactor is shutting down, all Clients are either timed out or passed to the
    # ThreadPool, depending on their current state (#can_close?).
    #
    # Otherwise, if the full request is not ready then the client will remain in the reactor
    # (return `false`). When the client sends more data to the socket the `Puma::Client` object
    # will wake up and again be checked to see if it's ready to be passed to the thread pool.
    def reactor_wakeup(client)
      shutdown = !@queue_requests
      if client.try_to_finish || (shutdown && !client.can_close?)
        @thread_pool << client
      elsif shutdown || client.timeout == 0
        client.timeout!
      else
        client.set_timeout(@first_data_timeout)
        false
      end
    rescue StandardError => e
      client_error(e, client)
      close_client_safely(client)
      true
    end

    def handle_servers
      @env_set_http_version = Object.const_defined?(:Rack) && ::Rack.respond_to?(:release) &&
        Gem::Version.new(::Rack.release) < Gem::Version.new('3.1.0')

      begin
        check = @check
        sockets = [check] + @binder.ios
        pool = @thread_pool
        queue_requests = @queue_requests
        drain = options[:drain_on_shutdown] ? 0 : nil

        addr_send_name, addr_value = case options[:remote_address]
        when :value
          [:peerip=, options[:remote_address_value]]
        when :header
          [:remote_addr_header=, options[:remote_address_header]]
        when :proxy_protocol
          [:expect_proxy_proto=, options[:remote_address_proxy_protocol]]
        else
          [nil, nil]
        end

        while @status == :run || (drain && shutting_down?)
          begin
            ios = IO.select sockets, nil, nil, (shutting_down? ? 0 : @idle_timeout)
            unless ios
              unless shutting_down?
                @idle_timeout_reached = true

                if @clustered
                  @worker_write << "#{PipeRequest::PIPE_IDLE}#{Process.pid}\n" rescue nil
                  next
                else
                  @log_writer.log "- Idle timeout reached"
                  @status = :stop
                end
              end

              break
            end

            if @idle_timeout_reached && @clustered
              @idle_timeout_reached = false
              @worker_write << "#{PipeRequest::PIPE_IDLE}#{Process.pid}\n" rescue nil
            end

            ios.first.each do |sock|
              if sock == check
                break if handle_check
              else
                # if ThreadPool out_of_band code is running, we don't want to add
                # clients until the code is finished.
                pool.wait_while_out_of_band_running

                # A well rested herd (cluster) runs faster
                if @cluster_accept_loop_delay.on? && (busy_threads_plus_todo = pool.busy_threads) > 0
                  delay = @cluster_accept_loop_delay.calculate(
                    max_threads: @max_threads,
                    busy_threads_plus_todo: busy_threads_plus_todo
                  )
                  sleep(delay)
                end

                io = begin
                  sock.accept_nonblock
                rescue IO::WaitReadable
                  next
                end
                drain += 1 if shutting_down?

                client = new_client(io, sock)
                client.send(addr_send_name, addr_value) if addr_value
                pool << client
              end
            end
          rescue IOError, Errno::EBADF
            # In the case that any of the sockets are unexpectedly close.
            raise
          rescue StandardError => e
            @log_writer.unknown_error e, nil, "Listen loop"
          end
        end

        @log_writer.debug { "Drained #{drain} additional connections." } if drain
        @events.fire :state, @status

        if queue_requests
          @queue_requests = false
          @reactor.shutdown
        end

        graceful_shutdown if @status == :stop || @status == :restart
      rescue Exception => e
        @log_writer.unknown_error e, nil, "Exception handling servers"
      ensure
        # Errno::EBADF is infrequently raised
        [@check, @notify].each do |io|
          begin
            io.close unless io.closed?
          rescue Errno::EBADF
          end
        end
        @notify = nil
        @check = nil
      end

      @events.fire :state, :done
    end

    # :nodoc:
    def new_client(io, sock)
      client = Client.new(io, @binder.env(sock))
      client.listener = sock
      client.env_set_http_version = @env_set_http_version
      client.http_content_length_limit = @http_content_length_limit
      client.supported_http_methods = @supported_http_methods
      client
    end

    # :nodoc:
    def handle_check
      cmd = @check.read(1)

      case cmd
      when STOP_COMMAND
        @status = :stop
        return true
      when HALT_COMMAND
        @status = :halt
        return true
      when RESTART_COMMAND
        @status = :restart
        return true
      end

      false
    end

    # Given a connection on +client+, handle the incoming requests,
    # or queue the connection in the Reactor if no request is available.
    #
    # This method is called from a ThreadPool processor thread.
    #
    # This method supports HTTP Keep-Alive so it may, depending on if the client
    # indicates that it supports keep alive, wait for another request before
    # returning.
    #
    # Return true if one or more requests were processed.
    def process_client(processor, client)
      close_socket = true

      requests = 0

      begin
        if @queue_requests && !client.eagerly_finish

          client.set_timeout(@first_data_timeout)
          if @reactor.add client
            close_socket = false
            return false
          end
        end

        with_force_shutdown(client) do
          client.finish(@first_data_timeout)
        end

        can_loop = true
        while can_loop
          can_loop = false
          @requests_count += 1
          case handle_request(processor, client, requests + 1)
          when :close
          when :async
            close_socket = false
          when :keep_alive
            requests += 1

            client.reset

            # This indicates data exists in the client read buffer and there may be
            # additional requests on it, so process them
            next_request_ready = if client.has_back_to_back_requests?
              with_force_shutdown(client) { client.process_back_to_back_requests }
            else
              with_force_shutdown(client) { client.eagerly_finish }
            end

            if next_request_ready
              # When Puma has spare threads, allow this one to be monopolized
              # Perf optimization for https://github.com/puma/puma/issues/3788
              if @thread_pool.waiting > 0
                can_loop = true
              else
                @thread_pool << client
                close_socket = false
              end
            elsif @queue_requests
              client.set_timeout @persistent_timeout
              if @reactor.add client
                close_socket = false
              end
            end
          end
        end
        true
      rescue StandardError => e
        client_error(e, client, requests)
        # The ensure tries to close +client+ down
        requests > 0
      ensure
        client.io_buffer.reset

        close_client_safely(client) if close_socket
      end
    end

    # :nodoc:
    def close_client_safely(client)
      client.close
    rescue IOError, SystemCallError
      # Already closed
    rescue MiniSSL::SSLError => e
      @log_writer.ssl_error e, client.io
    rescue StandardError => e
      @log_writer.unknown_error e, nil, "Client"
    end

    # Triggers a client timeout if the thread-pool shuts down
    # during execution of the provided block.
    def with_force_shutdown(client, &block)
      @thread_pool.with_force_shutdown(&block)
    rescue ThreadPool::ForceShutdown
      client.timeout!
    end

    # :nocov:

    # Handle various error types thrown by Client I/O operations.
    def client_error(e, client, requests = 1)
      # Swallow, do not log
      return if [ConnectionError, EOFError].include?(e.class)

      case e
      when MiniSSL::SSLError
        lowlevel_error(e, client.env)
        @log_writer.ssl_error e, client.io
      when HttpParserError
        response_to_error(client, requests, e, client.error_status_code || 400)
        @log_writer.parse_error e, client
      when HttpParserError501
        response_to_error(client, requests, e, 501)
        @log_writer.parse_error e, client
      else
        response_to_error(client, requests, e, 500)
        @log_writer.unknown_error e, nil, "Read"
      end
    end

    # A fallback rack response if +@app+ raises as exception.
    #
    def lowlevel_error(e, env, status=500)
      if handler = options[:lowlevel_error_handler]
        if handler.arity == 1
          return handler.call(e)
        elsif handler.arity == 2
          return handler.call(e, env)
        else
          return handler.call(e, env, status)
        end
      end

      if @leak_stack_on_error
        backtrace = e.backtrace.nil? ? '<no backtrace available>' : e.backtrace.join("\n")
        [status, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{backtrace}"]]
      else
        [status, {}, [""]]
      end
    end

    def response_to_error(client, requests, err, status_code)
      # @todo remove sometime later
      if status_code == 413
        status = 413
        res_body = ["Payload Too Large"]
        headers = {}
        headers[CONTENT_LENGTH2] = 17
      else
        status, headers, res_body = lowlevel_error(err, client.env, status_code)
      end
      prepare_response(status, headers, res_body, requests, client)
    end
    private :response_to_error

    # Wait for all outstanding requests to finish.
    #
    def graceful_shutdown
      if @status != :restart
        @binder.close
      end

      @thread_pool.shutdown(options[:force_shutdown_after])
    end

    def notify_safely(message)
      @notify << message
    rescue IOError, NoMethodError, Errno::EPIPE, Errno::EBADF
      # The server, in another thread, is shutting down
    rescue RuntimeError => e
      # Temporary workaround for https://bugs.ruby-lang.org/issues/13239
      if e.message.include?('IOError')
        # ignore
      else
        raise e
      end
    end
    private :notify_safely

    # Stops the acceptor thread and then causes the processor threads to finish
    # off the request queue before finally exiting.

    def stop(sync=false)
      notify_safely(STOP_COMMAND)
      @thread.join if @thread && sync
    end

    def halt(sync=false)
      notify_safely(HALT_COMMAND)
      @thread.join if @thread && sync
    end

    def begin_restart(sync=false)
      notify_safely(RESTART_COMMAND)
      @thread.join if @thread && sync
    end

    def shutting_down?
      @status == :stop || @status == :restart
    end

    # List of methods invoked by #stats.
    # @version 5.0.0
    STAT_METHODS = [
      :backlog,
      :running,
      :pool_capacity,
      :busy_threads,
      :backlog_max,
      :max_threads,
      :requests_count,
      :reactor_max,
    ].freeze

    # Returns a hash of stats about the running server for reporting purposes.
    # @version 5.0.0
    # @!attribute [r] stats
    # @return [Hash] hash containing stat info from `Server` and `ThreadPool`
    def stats
      stats = @thread_pool&.stats || {}
      stats[:max_threads]    = @max_threads
      stats[:requests_count] = @requests_count
      stats[:reactor_max] = @reactor.reactor_max if @reactor
      reset_max
      stats
    end

    def reset_max
      @reactor.reactor_max = 0 if @reactor
      @thread_pool&.reset_max
    end

    # below are 'delegations' to binder
    # remove in Puma 7?


    def add_tcp_listener(host, port, optimize_for_latency = true, backlog = 1024)
      @binder.add_tcp_listener host, port, optimize_for_latency, backlog
    end

    def add_ssl_listener(host, port, ctx, optimize_for_latency = true,
                         backlog = 1024)
      @binder.add_ssl_listener host, port, ctx, optimize_for_latency, backlog
    end

    def add_unix_listener(path, umask = nil, mode = nil, backlog = 1024)
      @binder.add_unix_listener path, umask, mode, backlog
    end

    # Updates the minimum and maximum number of threads in the thread pool.
    #
    # This method allows dynamic adjustment of the thread pool size while the server
    # is running. It validates the provided values and updates both the thread pool
    # and the server's thread configuration.
    #
    # @param min [Integer] The minimum number of threads to maintain in the pool.
    #   Defaults to the current minimum if not specified. Must be greater than 0
    #   and less than or equal to max.
    # @param max [Integer] The maximum number of threads allowed in the pool.
    #   Defaults to the current maximum if not specified. Must be greater than or
    #   equal to min.
    #
    # @return [void]
    #
    # @note If validation fails, a warning message is logged and no changes are made.
    #
    # @example Update both min and max threads
    #   server.update_thread_pool_min_max(min: 2, max: 8)
    #
    # @example Update only the minimum threads
    #   server.update_thread_pool_min_max(min: 4)
    #
    # @example Update only the maximum threads
    #   server.update_thread_pool_min_max(max: 16)
    #
    def update_thread_pool_min_max(min: @min_threads, max: @max_threads)
      if min > max
        @log_writer.log "`min' value cannot be greater than `max' value."
        return
      end

      if min < 0
        @log_writer.log "`min' value cannot be less than 0"
        return
      end

      @thread_pool&.with_mutex do
        @thread_pool.min, @thread_pool.max = min, max
        @min_threads, @max_threads = min, max
      end
    end

    # @!attribute [r] connected_ports
    def connected_ports
      @binder.connected_ports
    end
  end
end