class Puma::Server
Each ‘Puma::Server` will have one reactor and one thread pool.
`Puma::Reactor` where they get eventually passed to a `Puma::ThreadPool`.
The `Puma::Server` instance pulls requests from the socket, adds them to a
Each Puma process will contain one `Puma::Server` instance.
to generate one or more `Puma::Server` instances capable of handling requests.
This class is used by the `Puma::Single` and `Puma::Cluster` classes
The HTTP Server itself. Serves out a single Rack app.
def add_ssl_listener(host, port, ctx, optimize_for_latency = true,
def add_ssl_listener(host, port, ctx, optimize_for_latency = true, backlog = 1024) @binder.add_ssl_listener host, port, ctx, optimize_for_latency, backlog end
def add_tcp_listener(host, port, optimize_for_latency = true, backlog = 1024)
def add_tcp_listener(host, port, optimize_for_latency = true, backlog = 1024) @binder.add_tcp_listener host, port, optimize_for_latency, backlog end
def add_unix_listener(path, umask = nil, mode = nil, backlog = 1024)
def add_unix_listener(path, umask = nil, mode = nil, backlog = 1024) @binder.add_unix_listener path, umask, mode, backlog end
def backlog
def backlog @thread_pool&.backlog end
def begin_restart(sync=false)
def begin_restart(sync=false) notify_safely(RESTART_COMMAND) @thread.join if @thread && sync end
def client_error(e, client, requests = 1)
def client_error(e, client, requests = 1) # Swallow, do not log return if [ConnectionError, EOFError].include?(e.class) case e when MiniSSL::SSLError lowlevel_error(e, client.env) @log_writer.ssl_error e, client.io when HttpParserError response_to_error(client, requests, e, client.error_status_code || 400) @log_writer.parse_error e, client when HttpParserError501 response_to_error(client, requests, e, 501) @log_writer.parse_error e, client else response_to_error(client, requests, e, 500) @log_writer.unknown_error e, nil, "Read" end end
def close_client_safely(client)
def close_client_safely(client) client.close rescue IOError, SystemCallError # Already closed rescue MiniSSL::SSLError => e @log_writer.ssl_error e, client.io rescue StandardError => e @log_writer.unknown_error e, nil, "Client" end
def closed_socket?(socket)
def closed_socket?(socket) skt = socket.to_io return false unless skt.kind_of?(TCPSocket) && @precheck_closing begin tcp_info = skt.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO) rescue IOError, SystemCallError @precheck_closing = false false else state = tcp_info.unpack(UNPACK_TCP_STATE_FROM_TCP_INFO)[0] # TIME_WAIT: 6, CLOSE: 7, CLOSE_WAIT: 8, LAST_ACK: 9, CLOSING: 11 (state >= 6 && state <= 9) || state == 11 end end
def closed_socket?(socket)
def closed_socket?(socket) false end
def closed_socket_supported?
- Version: - 5.0.0
def closed_socket_supported? Socket.const_defined?(:TCP_INFO) && Socket.const_defined?(:IPPROTO_TCP) end
def connected_ports
def connected_ports @binder.connected_ports end
def cork_socket(socket)
3 == TCP_CORK
6 == Socket::IPPROTO_TCP
def cork_socket(socket) skt = socket.to_io begin skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if skt.kind_of? TCPSocket rescue IOError, SystemCallError end end
def cork_socket(socket)
def cork_socket(socket) end
def current
def current Thread.current.puma_server end
def graceful_shutdown
Wait for all outstanding requests to finish.
def graceful_shutdown if @status != :restart @binder.close end @thread_pool.shutdown(options[:force_shutdown_after]) end
def halt(sync=false)
def halt(sync=false) notify_safely(HALT_COMMAND) @thread.join if @thread && sync end
def handle_check
def handle_check cmd = @check.read(1) case cmd when STOP_COMMAND @status = :stop return true when HALT_COMMAND @status = :halt return true when RESTART_COMMAND @status = :restart return true end false end
def handle_servers
def handle_servers @env_set_http_version = Object.const_defined?(:Rack) && ::Rack.respond_to?(:release) && Gem::Version.new(::Rack.release) < Gem::Version.new('3.1.0') begin check = @check sockets = [check] + @binder.ios pool = @thread_pool queue_requests = @queue_requests drain = options[:drain_on_shutdown] ? 0 : nil addr_send_name, addr_value = case options[:remote_address] when :value [:peerip=, options[:remote_address_value]] when :header [:remote_addr_header=, options[:remote_address_header]] when :proxy_protocol [:expect_proxy_proto=, options[:remote_address_proxy_protocol]] else [nil, nil] end while @status == :run || (drain && shutting_down?) begin ios = IO.select sockets, nil, nil, (shutting_down? ? 0 : @idle_timeout) unless ios unless shutting_down? @idle_timeout_reached = true if @clustered @worker_write << "#{PipeRequest::PIPE_IDLE}#{Process.pid}\n" rescue nil next else @log_writer.log "- Idle timeout reached" @status = :stop end end break end if @idle_timeout_reached && @clustered @idle_timeout_reached = false @worker_write << "#{PipeRequest::PIPE_IDLE}#{Process.pid}\n" rescue nil end ios.first.each do |sock| if sock == check break if handle_check else # if ThreadPool out_of_band code is running, we don't want to add # clients until the code is finished. pool.wait_while_out_of_band_running # A well rested herd (cluster) runs faster if @cluster_accept_loop_delay.on? && (busy_threads_plus_todo = pool.busy_threads) > 0 delay = @cluster_accept_loop_delay.calculate( max_threads: @max_threads, busy_threads_plus_todo: busy_threads_plus_todo ) sleep(delay) end io = begin sock.accept_nonblock rescue IO::WaitReadable next end drain += 1 if shutting_down? client = new_client(io, sock) client.send(addr_send_name, addr_value) if addr_value pool << client end end rescue IOError, Errno::EBADF # In the case that any of the sockets are unexpectedly close. raise rescue StandardError => e @log_writer.unknown_error e, nil, "Listen loop" end end @log_writer.debug { "Drained #{drain} additional connections." } if drain @events.fire :state, @status if queue_requests @queue_requests = false @reactor.shutdown end graceful_shutdown if @status == :stop || @status == :restart rescue Exception => e @log_writer.unknown_error e, nil, "Exception handling servers" ensure # Errno::EBADF is infrequently raised [@check, @notify].each do |io| begin io.close unless io.closed? rescue Errno::EBADF end end @notify = nil @check = nil end @events.fire :state, :done end
def inherit_binder(bind)
def inherit_binder(bind) @binder = bind end
def initialize(app, events = nil, options = {})
- Note: - The `events` parameter is set to nil, and set to `Events.new` in code.
Note: - Several instance variables exist so they are available for testing,
def initialize(app, events = nil, options = {}) @app = app @events = events || Events.new @check, @notify = nil @status = :stop @thread = nil @thread_pool = nil @reactor = nil @env_set_http_version = nil @options = if options.is_a?(UserFileDefaultOptions) options else UserFileDefaultOptions.new(options, Configuration::DEFAULTS) end @clustered = (@options.fetch :workers, 0) > 0 @worker_write = @options[:worker_write] @log_writer = @options.fetch :log_writer, LogWriter.stdio @early_hints = @options[:early_hints] @first_data_timeout = @options[:first_data_timeout] @persistent_timeout = @options[:persistent_timeout] @idle_timeout = @options[:idle_timeout] @min_threads = @options[:min_threads] @max_threads = @options[:max_threads] @queue_requests = @options[:queue_requests] @max_keep_alive = @options[:max_keep_alive] @enable_keep_alives = @options[:enable_keep_alives] @enable_keep_alives &&= @queue_requests @io_selector_backend = @options[:io_selector_backend] @http_content_length_limit = @options[:http_content_length_limit] @cluster_accept_loop_delay = ClusterAcceptLoopDelay.new( workers: @options[:workers], max_delay: @options[:wait_for_less_busy_worker] || 0 # Real default is in Configuration::DEFAULTS, this is for unit testing ) if @options[:fiber_per_request] singleton_class.prepend(FiberPerRequest) end # make this a hash, since we prefer `key?` over `include?` @supported_http_methods = if @options[:supported_http_methods] == :any :any else if (ary = @options[:supported_http_methods]) ary else SUPPORTED_HTTP_METHODS end.sort.product([nil]).to_h.freeze end temp = !!(@options[:environment] =~ /\A(development|test)\z/) @leak_stack_on_error = @options[:environment] ? temp : true @binder = Binder.new(log_writer, @options) ENV['RACK_ENV'] ||= "development" @mode = :http @precheck_closing = true @requests_count = 0 @idle_timeout_reached = false end
def lowlevel_error(e, env, status=500)
A fallback rack response if +@app+ raises as exception.
def lowlevel_error(e, env, status=500) if handler = options[:lowlevel_error_handler] if handler.arity == 1 return handler.call(e) elsif handler.arity == 2 return handler.call(e, env) else return handler.call(e, env, status) end end if @leak_stack_on_error backtrace = e.backtrace.nil? ? '<no backtrace available>' : e.backtrace.join("\n") [status, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{backtrace}"]] else [status, {}, [""]] end end
def new_client(io, sock)
def new_client(io, sock) client = Client.new(io, @binder.env(sock)) client.listener = sock client.env_set_http_version = @env_set_http_version client.http_content_length_limit = @http_content_length_limit client.supported_http_methods = @supported_http_methods client end
def notify_safely(message)
def notify_safely(message) @notify << message rescue IOError, NoMethodError, Errno::EPIPE, Errno::EBADF # The server, in another thread, is shutting down rescue RuntimeError => e # Temporary workaround for https://bugs.ruby-lang.org/issues/13239 if e.message.include?('IOError') # ignore else raise e end end
def pool_capacity
value would be 4 until it finishes processing.
a request. If one request comes in, then the
there are 5 threads sitting idle ready to take
For example if the number is 5 then it means
the server is capable of taking right now.
This number represents the number of requests that
def pool_capacity @thread_pool&.pool_capacity end
def process_client(processor, client)
returning.
indicates that it supports keep alive, wait for another request before
This method supports HTTP Keep-Alive so it may, depending on if the client
This method is called from a ThreadPool processor thread.
or queue the connection in the Reactor if no request is available.
Given a connection on +client+, handle the incoming requests,
def process_client(processor, client) close_socket = true requests = 0 begin if @queue_requests && !client.eagerly_finish client.set_timeout(@first_data_timeout) if @reactor.add client close_socket = false return false end end with_force_shutdown(client) do client.finish(@first_data_timeout) end can_loop = true while can_loop can_loop = false @requests_count += 1 case handle_request(processor, client, requests + 1) when :close when :async close_socket = false when :keep_alive requests += 1 client.reset # This indicates data exists in the client read buffer and there may be # additional requests on it, so process them next_request_ready = if client.has_back_to_back_requests? with_force_shutdown(client) { client.process_back_to_back_requests } else with_force_shutdown(client) { client.eagerly_finish } end if next_request_ready # When Puma has spare threads, allow this one to be monopolized # Perf optimization for https://github.com/puma/puma/issues/3788 if @thread_pool.waiting > 0 can_loop = true else @thread_pool << client close_socket = false end elsif @queue_requests client.set_timeout @persistent_timeout if @reactor.add client close_socket = false end end end end true rescue StandardError => e client_error(e, client, requests) # The ensure tries to close +client+ down requests > 0 ensure client.io_buffer.reset close_client_safely(client) if close_socket end end
def reactor_wakeup(client)
(return `false`). When the client sends more data to the socket the `Puma::Client` object
Otherwise, if the full request is not ready then the client will remain in the reactor
ThreadPool, depending on their current state (#can_close?).
If the Reactor is shutting down, all Clients are either timed out or passed to the
and the object is removed from the reactor (return `true`).
If a client object times out, a 408 response is written, its connection is closed,
The Client is then removed from the reactor (return `true`).
so that a "processor thread" can pick up the request and begin to execute application logic.
then the request is passed to the ThreadPool (`@thread_pool << client`)
the `Puma::Client#try_to_finish` method. If the full request has been sent,
The method checks to see if it has the full header and body with
For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/main/docs/architecture.md).
such as nginx) then the application would be subject to a slow client attack.
If read buffering is not done, and no other read buffering is performed (such as by an application server
before it starts to be processed by the ThreadPool. This may be known as read buffering.
It is responsible for ensuring that a request has been completely received
from the server.
While the code lives in the Server, the logic is executed on the reactor thread, independently
times out, or when the Reactor is shutting down.
This method is called from the Reactor thread when a queued Client receives data,
def reactor_wakeup(client) shutdown = !@queue_requests if client.try_to_finish || (shutdown && !client.can_close?) @thread_pool << client elsif shutdown || client.timeout == 0 client.timeout! else client.set_timeout(@first_data_timeout) false end rescue StandardError => e client_error(e, client) close_client_safely(client) true end
def reset_max
def reset_max @reactor.reactor_max = 0 if @reactor @thread_pool&.reset_max end
def response_to_error(client, requests, err, status_code)
def response_to_error(client, requests, err, status_code) # @todo remove sometime later if status_code == 413 status = 413 res_body = ["Payload Too Large"] headers = {} headers[CONTENT_LENGTH2] = 17 else status, headers, res_body = lowlevel_error(err, client.env, status_code) end prepare_response(status, headers, res_body, requests, client) end
def run(background=true, thread_name: 'srv')
are handled synchronously.
up in the background to handle requests. Otherwise requests
If +background+ is true (the default) then a thread is spun
Runs the server.
def run(background=true, thread_name: 'srv') BasicSocket.do_not_reverse_lookup = true @events.fire :state, :booting @status = :run @thread_pool = ThreadPool.new(thread_name, options, server: self) do |processor, client| process_client(processor, client) end if @queue_requests @reactor = Reactor.new(@io_selector_backend) { |c| # Inversion of control, the reactor is calling a method on the server when it # is done buffering a request or receives a new request from a keepalive connection. self.reactor_wakeup(c) } @reactor.run end @thread_pool.auto_reap! if options[:reaping_time] @thread_pool.auto_trim! if @min_threads != @max_threads && options[:auto_trim_time] @check, @notify = Puma::Util.pipe unless @notify @events.fire :state, :running if background @thread = Thread.new do Puma.set_thread_name thread_name handle_servers end return @thread else handle_servers end end
def running
def running @thread_pool&.spawned end
def shutting_down?
def shutting_down? @status == :stop || @status == :restart end
def stats
-
(Hash)- hash containing stat info from `Server` and `ThreadPool`
Other tags:
- Version: - 5.0.0
def stats stats = @thread_pool&.stats || {} stats[:max_threads] = @max_threads stats[:requests_count] = @requests_count stats[:reactor_max] = @reactor.reactor_max if @reactor reset_max stats end
def stop(sync=false)
def stop(sync=false) notify_safely(STOP_COMMAND) @thread.join if @thread && sync end
def tcp_cork_supported?
- Version: - 5.0.0
def tcp_cork_supported? Socket.const_defined?(:TCP_CORK) && Socket.const_defined?(:IPPROTO_TCP) end
def uncork_socket(socket)
def uncork_socket(socket) skt = socket.to_io begin skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if skt.kind_of? TCPSocket rescue IOError, SystemCallError end end
def uncork_socket(socket)
def uncork_socket(socket) end
def update_thread_pool_min_max(min: @min_threads, max: @max_threads)
- Example: Update only the maximum threads -
Example: Update only the minimum threads -
Example: Update both min and max threads -
Other tags:
- Note: - If validation fails, a warning message is logged and no changes are made.
Returns:
-
(void)-
Parameters:
-
max(Integer) -- The maximum number of threads allowed in the pool. -
min(Integer) -- The minimum number of threads to maintain in the pool.
def update_thread_pool_min_max(min: @min_threads, max: @max_threads) if min > max @log_writer.log "`min' value cannot be greater than `max' value." return end if min < 0 @log_writer.log "`min' value cannot be less than 0" return end @thread_pool&.with_mutex do @thread_pool.min, @thread_pool.max = min, max @min_threads, @max_threads = min, max end end
def with_force_shutdown(client, &block)
Triggers a client timeout if the thread-pool shuts down
def with_force_shutdown(client, &block) @thread_pool.with_force_shutdown(&block) rescue ThreadPool::ForceShutdown client.timeout! end