class Puma::ThreadPool

Experimental RBS support (using type sampling data from the type_fusion project).

# sig/puma/thread_pool.rbs

class Puma::ThreadPool
  def reap: () -> untyped
  def with_mutex: () -> untyped
end

and processes it.
Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array
a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.
the whole request is buffered into memory. Once the request is ready, it is passed into
`Puma::Client` instance and then passed to the `Puma::Reactor` to ensure
First a connection to a client is made in `Puma::Server`. It is wrapped in a
Each Puma “worker” has a thread pool to process requests.
Internal Docs for A simple thread pool management object.

def self.clean_thread_locals

def self.clean_thread_locals
  Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods
    Thread.current[key] = nil unless key == :__recursive_key__
  end
end

def <<(work)

Add +work+ to the todo list for a Thread to pickup and process.
def <<(work)
  with_mutex do
    if @shutdown
      raise "Unable to add work while shutting down"
    end
    @todo << work
    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end
    @not_empty.signal
  end
end

def auto_reap!(timeout=@reaping_time)

def auto_reap!(timeout=@reaping_time)
  @reaper = Automaton.new(self, timeout, "#{@name} threadpool reaper", :reap)
  @reaper.start!
end

def auto_trim!(timeout=@auto_trim_time)

def auto_trim!(timeout=@auto_trim_time)
  @auto_trim = Automaton.new(self, timeout, "#{@name} threadpool trimmer", :trim)
  @auto_trim.start!
end

def backlog


How many objects have yet to be processed by the pool?
def backlog
  with_mutex { @todo.size }
end

def busy_threads

Other tags:
    Version: - 5.0.0
def busy_threads
  with_mutex { @spawned - @waiting + @todo.size }
end

def initialize(name, options = {}, &block)


thread.
The block passed is the work that will be performed in each

in the pool.
Maintain a minimum of +min+ and maximum of +max+ threads
def initialize(name, options = {}, &block)
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new
  @todo = []
  @spawned = 0
  @waiting = 0
  @name = name
  @min = Integer(options[:min_threads])
  @max = Integer(options[:max_threads])
  # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI
  # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI
  # makes stubbing constants difficult.
  @shutdown_grace_time = Float(options[:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME)
  @block = block
  @out_of_band = options[:out_of_band]
  @clean_thread_locals = options[:clean_thread_locals]
  @reaping_time = options[:reaping_time]
  @auto_trim_time = options[:auto_trim_time]
  @shutdown = false
  @trim_requested = 0
  @out_of_band_pending = false
  @workers = []
  @auto_trim = nil
  @reaper = nil
  @mutex.synchronize do
    @min.times do
      spawn_thread
      @not_full.wait(@mutex)
    end
  end
  @force_shutdown = false
  @shutdown_mutex = Mutex.new
end

def pool_capacity

@!attribute [r] pool_capacity
def pool_capacity
  waiting + (@max - spawned)
end

def reap

Experimental RBS support (using type sampling data from the type_fusion project).

def reap: () -> untyped

This signature was generated using 2 samples from 1 application.

spawned counter so that new healthy threads could be created again.
If there are dead threads in the pool make them go away while decreasing
def reap
  with_mutex do
    dead_workers = @workers.reject(&:alive?)
    dead_workers.each do |worker|
      worker.kill
      @spawned -= 1
    end
    @workers.delete_if do |w|
      dead_workers.include?(w)
    end
  end
end

def shutdown(timeout=-1)


threads. Finally, wait 1 second for remaining threads to exit.
Next, wait an extra +@shutdown_grace_time+ seconds then force-kill remaining
Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads.
Tell all threads in the pool to exit and wait for them to finish.
def shutdown(timeout=-1)
  threads = with_mutex do
    @shutdown = true
    @trim_requested = @spawned
    @not_empty.broadcast
    @not_full.broadcast
    @auto_trim&.stop
    @reaper&.stop
    # dup workers so that we join them all safely
    @workers.dup
  end
  if timeout == -1
    # Wait for threads to finish without force shutdown.
    threads.each(&:join)
  else
    join = ->(inner_timeout) do
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      threads.reject! do |t|
        elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
        t.join inner_timeout - elapsed
      end
    end
    # Wait +timeout+ seconds for threads to finish.
    join.call(timeout)
    # If threads are still running, raise ForceShutdown and wait to finish.
    @shutdown_mutex.synchronize do
      @force_shutdown = true
      threads.each do |t|
        t.raise ForceShutdown if t[:with_force_shutdown]
      end
    end
    join.call(@shutdown_grace_time)
    # If threads are _still_ running, forcefully kill them and wait to finish.
    threads.each(&:kill)
    join.call(1)
  end
  @spawned = 0
  @workers = []
end

def spawn_thread


Must be called with @mutex held!

:nodoc:
def spawn_thread
  @spawned += 1
  th = Thread.new(@spawned) do |spawned|
    Puma.set_thread_name '%s tp %03i' % [@name, spawned]
    todo  = @todo
    block = @block
    mutex = @mutex
    not_empty = @not_empty
    not_full = @not_full
    while true
      work = nil
      mutex.synchronize do
        while todo.empty?
          if @trim_requested > 0
            @trim_requested -= 1
            @spawned -= 1
            @workers.delete th
            not_full.signal
            Thread.exit
          end
          @waiting += 1
          if @out_of_band_pending && trigger_out_of_band_hook
            @out_of_band_pending = false
          end
          not_full.signal
          begin
            not_empty.wait mutex
          ensure
            @waiting -= 1
          end
        end
        work = todo.shift
      end
      if @clean_thread_locals
        ThreadPool.clean_thread_locals
      end
      begin
        @out_of_band_pending = true if block.call(work)
      rescue Exception => e
        STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
      end
    end
  end
  @workers << th
  th
end

def trigger_out_of_band_hook

Other tags:
    Version: - 5.0.0
def trigger_out_of_band_hook
  return false unless @out_of_band&.any?
  # we execute on idle hook when all threads are free
  return false unless @spawned == @waiting
  @out_of_band.each(&:call)
  true
rescue Exception => e
  STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
  true
end

def trim(force=false)


even if all threads are being utilized.
and exit. If +force+ is true, then a trim request is requested
If there are any free threads in the pool, tell one to go ahead
def trim(force=false)
  with_mutex do
    free = @waiting - @todo.size
    if (force or free > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

def wait_for_less_busy_worker(delay_s)

Other tags:
    Version: - 5.0.0
def wait_for_less_busy_worker(delay_s)
  return unless delay_s && delay_s > 0
  # Ruby MRI does GVL, this can result
  # in processing contention when multiple threads
  # (requests) are running concurrently
  return unless Puma.mri?
  with_mutex do
    return if @shutdown
    # do not delay, if we are not busy
    return unless busy_threads > 0
    # this will be signaled once a request finishes,
    # which can happen earlier than delay
    @not_full.wait @mutex, delay_s
  end
end

def wait_until_not_full

makes it through the reactor and can then be processed by the thread pool.
by the server. This would continue until a fully buffered request
method would not block and another request would be added into the reactor
to try to buffer the request. In that scenario the next call to this
then the `@todo` array would stay the same size as the reactor works
For example if a slow client has only sent a header, but not a body
request, it might not be added to the `@todo` array right away.
It's important to note that even though the server might accept another

signaled, usually this indicates that a request has been processed.
pause here, and wait until the `not_full` condition variable is
If there are 5 threads and all 5 of them are busy, then it will

can pull a request right away.
requests, this method will not wait and the `Puma::Server`
For example: if there are 5 threads, but only 4 working on

worker can pick it up and process it.
spare capacity, then the request can be left on the socket so the other
number of requests then it is at capacity. If another Puma process has
number of requests at the same time. If it is already processing that
The general idea is that the thread pool can only work on a fixed

pass to the reactor.
the thread pool can pull more requests from the socket and
This method is used by `Puma::Server` to let the server know when
def wait_until_not_full
  with_mutex do
    while true
      return if @shutdown
      # If we can still spin up new threads and there
      # is work queued that cannot be handled by waiting
      # threads, then accept more work until we would
      # spin up the max number of threads.
      return if busy_threads < @max
      @not_full.wait @mutex
    end
  end
end

def with_force_shutdown

provided block if the thread is forced to shutdown during execution.
Allows ThreadPool::ForceShutdown to be raised within the
def with_force_shutdown
  t = Thread.current
  @shutdown_mutex.synchronize do
    raise ForceShutdown if @force_shutdown
    t[:with_force_shutdown] = true
  end
  yield
ensure
  t[:with_force_shutdown] = false
end

def with_mutex(&block)

Experimental RBS support (using type sampling data from the type_fusion project).

def with_mutex: () -> untyped

This signature was generated using 3 samples from 1 application.

Other tags:
    Version: - 5.0.0
def with_mutex(&block)
  @mutex.owned? ?
    yield :
    @mutex.synchronize(&block)
end