class Puma::ThreadPool

and processes it.
Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array
a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.
the whole request is buffered into memory. Once the request is ready, it is passed into
`Puma::Client` instance and then passed to the `Puma::Reactor` to ensure
First a connection to a client is made in `Puma::Server`. It is wrapped in a
Each Puma “worker” has a thread pool to process requests.
Internal Docs for A simple thread pool management object.

def <<(work)

Add +work+ to the todo list for a Thread to pickup and process.
def <<(work)
  with_mutex do
    if @shutdown
      raise "Unable to add work while shutting down"
    end
    @todo << work
    t = @todo.size
    @backlog_max = t if t > @backlog_max
    if @waiting < @todo.size and can_spawn_processor?
      spawn_thread
    end
    @not_empty.signal
  end
  self
end

def auto_reap!(timeout=@reaping_time)

def auto_reap!(timeout=@reaping_time)
  @reaper = Automaton.new(self, timeout, "#{@name} tp reap", :reap)
  @reaper.start!
end

def auto_trim!(timeout=@auto_trim_time)

def auto_trim!(timeout=@auto_trim_time)
  @auto_trim = Automaton.new(self, timeout, "#{@name} tp trim", :trim)
  @auto_trim.start!
end

def backlog


How many objects have yet to be processed by the pool?
def backlog
  with_mutex { @todo.size }
end

def backlog_max


The maximum size of the backlog
def backlog_max
  with_mutex { @backlog_max }
end

def busy_threads

Other tags:
    Version: - 5.0.0
def busy_threads
  with_mutex { @spawned - @waiting + @todo.size }
end

def can_spawn_processor?


Must be called with @mutex held!

:nodoc:
def can_spawn_processor?
  io_processors_count = @processors.count(&:marked_as_io_thread?)
  extra_io_processors_count = io_processors_count > @max_io_threads ? io_processors_count - @max_io_threads : 0
  (@spawned - io_processors_count) < (@max - extra_io_processors_count)
end

def initialize(name, options = {}, server: nil, &block)


thread.
The block passed is the work that will be performed in each

in the pool.
Maintain a minimum of +min+ and maximum of +max+ threads
def initialize(name, options = {}, server: nil, &block)
  @server = server
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new
  @todo = Queue.new
  @backlog_max = 0
  @spawned = 0
  @waiting = 0
  @name = name
  @min = Integer(options[:min_threads])
  @max = Integer(options[:max_threads])
  @max_io_threads = Integer(options[:max_io_threads] || 0)
  # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI
  # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI
  # makes stubbing constants difficult.
  @shutdown_grace_time = Float(options[:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME)
  @shutdown_debug = options[:shutdown_debug]
  @block = block
  @out_of_band = options[:out_of_band]
  @out_of_band_running = false
  @out_of_band_condvar = ConditionVariable.new
  @before_thread_start = options[:before_thread_start]
  @before_thread_exit = options[:before_thread_exit]
  @reaping_time = options[:reaping_time]
  @auto_trim_time = options[:auto_trim_time]
  @shutdown = false
  @trim_requested = 0
  @out_of_band_pending = false
  @processors = []
  @auto_trim = nil
  @reaper = nil
  @mutex.synchronize do
    @min.times do
      spawn_thread
      @not_full.wait(@mutex)
    end
  end
  @force_shutdown = false
  @shutdown_mutex = Mutex.new
end

def pool_capacity

@!attribute [r] pool_capacity
def pool_capacity
  (waiting + (@max - spawned)).clamp(0, Float::INFINITY)
end

def reap

spawned counter so that new healthy threads could be created again.
If there are dead threads in the pool make them go away while decreasing
def reap
  with_mutex do
    @processors, dead_processors = @processors.partition(&:alive?)
    dead_processors.each do |processor|
      processor.kill
      @spawned -= 1
    end
  end
end

def reset_max

def reset_max
  with_mutex { @backlog_max = 0 }
end

def shutdown(timeout)


threads. Finally, wait 1 second for remaining threads to exit.
Next, wait an extra +@shutdown_grace_time+ seconds then force-kill remaining
Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads.
Tell all threads in the pool to exit and wait for them to finish.
def shutdown(timeout)
  threads = with_mutex do
    @shutdown = true
    @trim_requested = @spawned
    @not_empty.broadcast
    @not_full.broadcast
    @auto_trim&.stop
    @reaper&.stop
    # dup processors so that we join them all safely
    @processors.dup
  end
  if @shutdown_debug == true
    shutdown_debug("Shutdown initiated")
  end
  if timeout == -1
    # Wait for threads to finish without force shutdown.
    threads.each(&:join)
  else
    join = ->(inner_timeout) do
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      threads.reject! do |t|
        remaining = inner_timeout - (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start)
        remaining > 0 && t.join(remaining)
      end
    end
    # Wait +timeout+ seconds for threads to finish.
    join.call(timeout)
    if @shutdown_debug == :on_force && !threads.empty?
      shutdown_debug("Shutdown timeout exceeded")
    end
    # If threads are still running, raise ForceShutdown and wait to finish.
    @shutdown_mutex.synchronize do
      @force_shutdown = true
      threads.each do |t|
        t.raise ForceShutdown if t[:with_force_shutdown]
      end
    end
    join.call(@shutdown_grace_time)
    if @shutdown_debug == :on_force && !threads.empty?
      shutdown_debug("Shutdown grace timeout exceeded")
    end
    # If threads are _still_ running, forcefully kill them and wait to finish.
    threads.each(&:kill)
    join.call(1)
  end
  @spawned = 0
  @processors = []
end

def shutdown_debug(message)

def shutdown_debug(message)
  pid = Process.pid
  threads = Thread.list
  $stdout.syswrite "#{pid}: #{message}\n"
  $stdout.syswrite "#{pid}: === Begin thread backtrace dump ===\n"
  threads.each_with_index do |thread, index|
    $stdout.syswrite "#{pid}: Thread #{index + 1}/#{threads.size}: #{thread.inspect}\n"
    $stdout.syswrite "#{pid}: #{(thread.backtrace || []).join("\n#{pid}: ")}\n\n"
  end
  $stdout.syswrite "#{pid}: === End thread backtrace dump ===\n"
end

def spawn_thread


Must be called with @mutex held!

:nodoc:
def spawn_thread
  @spawned += 1
  trigger_before_thread_start_hooks
  processor = ProcessorThread.new(self)
  processor.thread = Thread.new(processor, @spawned) do |processor, spawned|
    Puma.set_thread_name '%s tp %03i' % [@name, spawned]
    # Advertise server into the thread
    Thread.current.puma_server = @server
    todo  = @todo
    block = @block
    mutex = @mutex
    not_empty = @not_empty
    not_full = @not_full
    while true
      work = nil
      mutex.synchronize do
        if processor.marked_as_io_thread?
          if @processors.count { |t| !t.marked_as_io_thread? } < @max
            # We're not at max processor threads, so the io thread can rejoin the normal population.
            processor.marked_as_io_thread = false
          else
            # We're already at max threads, so we exit the extra io thread.
            @processors.delete(processor)
            trigger_before_thread_exit_hooks
            Thread.exit
          end
        end
        while todo.empty?
          if @trim_requested > 0
            @trim_requested -= 1
            @spawned -= 1
            @processors.delete(processor)
            not_full.signal
            trigger_before_thread_exit_hooks
            Thread.exit
          end
          @waiting += 1
          if @out_of_band_pending && trigger_out_of_band_hook
            @out_of_band_pending = false
          end
          not_full.signal
          begin
            not_empty.wait mutex
          ensure
            @waiting -= 1
          end
        end
        work = todo.shift
      end
      begin
        @out_of_band_pending = true if block.call(processor, work)
      rescue Exception => e
        STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
      end
    end
  end
  @processors << processor
  processor
end

def spawn_thread_if_needed # :nodoc:

:nodoc:
def spawn_thread_if_needed # :nodoc:
  with_mutex do
    if @waiting < @todo.size and can_spawn_processor?
      spawn_thread
    end
  end
end

def stats

Returns:
  • (Hash) - hash containing stat info from ThreadPool
def stats
  with_mutex do
    temp = @backlog_max
    @backlog_max = 0
    { backlog: @todo.size,
      running: @spawned,
      pool_capacity: pool_capacity,
      busy_threads: @spawned - @waiting + @todo.size,
      io_threads: @processors.count(&:marked_as_io_thread?),
      backlog_max: temp
    }
  end
end

def trigger_before_thread_exit_hooks

def trigger_before_thread_exit_hooks
  return unless @before_thread_exit&.any?
  @before_thread_exit.each do |b|
    begin
      b[:block].call
    rescue Exception => e
      STDERR.puts "WARNING before_thread_exit hook failed with exception (#{e.class}) #{e.message}"
    end
  end
  nil
end

def trigger_before_thread_start_hooks

def trigger_before_thread_start_hooks
  return unless @before_thread_start&.any?
  @before_thread_start.each do |b|
    begin
      b[:block].call(ServerPluginControl.new(@server))
    rescue Exception => e
      STDERR.puts "WARNING before_thread_start hook failed with exception (#{e.class}) #{e.message}"
    end
  end
  nil
end

def trigger_out_of_band_hook

Other tags:
    Version: - 5.0.0
def trigger_out_of_band_hook
  return false unless @out_of_band&.any?
  # we execute on idle hook when all threads are free
  return false unless @spawned == @waiting
  @out_of_band_running = true
  @out_of_band.each { |b| b[:block].call }
  true
rescue Exception => e
  STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
  true
ensure
  @out_of_band_running = false
  @out_of_band_condvar.broadcast
end

def trim(force=false)


even if all threads are being utilized.
and exit. If +force+ is true, then a trim request is requested
If there are any free threads in the pool, tell one to go ahead
def trim(force=false)
  with_mutex do
    free = @waiting - @todo.size
    if (force or free > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

def wait_while_out_of_band_running

def wait_while_out_of_band_running
  return unless @out_of_band_running
  with_mutex do
    @out_of_band_condvar.wait(@mutex) while @out_of_band_running
  end
end

def with_force_shutdown

provided block if the thread is forced to shutdown during execution.
Allows ThreadPool::ForceShutdown to be raised within the
def with_force_shutdown
  t = Thread.current
  @shutdown_mutex.synchronize do
    raise ForceShutdown if @force_shutdown
    t[:with_force_shutdown] = true
  end
  yield
ensure
  t[:with_force_shutdown] = false
end

def with_mutex(&block)

Other tags:
    Version: - 5.0.0
def with_mutex(&block)
  @mutex.owned? ?
    yield :
    @mutex.synchronize(&block)
end