class Puma::ThreadPool
Experimental RBS support (using type sampling data from the type_fusion
project).
# sig/puma/thread_pool.rbs class Puma::ThreadPool def reap: () -> untyped def with_mutex: () -> untyped end
and processes it.
Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array
a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.
the whole request is buffered into memory. Once the request is ready, it is passed into
`Puma::Client` instance and then passed to the `Puma::Reactor` to ensure
First a connection to a client is made in `Puma::Server`. It is wrapped in a
Each Puma “worker” has a thread pool to process requests.
Internal Docs for A simple thread pool management object.
def self.clean_thread_locals
def self.clean_thread_locals Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods Thread.current[key] = nil unless key == :__recursive_key__ end end
def <<(work)
def <<(work) with_mutex do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end
def auto_reap!(timeout=@reaping_time)
def auto_reap!(timeout=@reaping_time) @reaper = Automaton.new(self, timeout, "#{@name} threadpool reaper", :reap) @reaper.start! end
def auto_trim!(timeout=@auto_trim_time)
def auto_trim!(timeout=@auto_trim_time) @auto_trim = Automaton.new(self, timeout, "#{@name} threadpool trimmer", :trim) @auto_trim.start! end
def backlog
How many objects have yet to be processed by the pool?
def backlog with_mutex { @todo.size } end
def busy_threads
- Version: - 5.0.0
def busy_threads with_mutex { @spawned - @waiting + @todo.size } end
def initialize(name, options = {}, &block)
thread.
The block passed is the work that will be performed in each
in the pool.
Maintain a minimum of +min+ and maximum of +max+ threads
def initialize(name, options = {}, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @name = name @min = Integer(options[:min_threads]) @max = Integer(options[:max_threads]) # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI # makes stubbing constants difficult. @shutdown_grace_time = Float(options[:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME) @block = block @out_of_band = options[:out_of_band] @clean_thread_locals = options[:clean_thread_locals] @reaping_time = options[:reaping_time] @auto_trim_time = options[:auto_trim_time] @shutdown = false @trim_requested = 0 @out_of_band_pending = false @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times do spawn_thread @not_full.wait(@mutex) end end @force_shutdown = false @shutdown_mutex = Mutex.new end
def pool_capacity
def pool_capacity waiting + (@max - spawned) end
def reap
Experimental RBS support (using type sampling data from the type_fusion
project).
def reap: () -> untyped
This signature was generated using 2 samples from 1 application.
If there are dead threads in the pool make them go away while decreasing
def reap with_mutex do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers.delete_if do |w| dead_workers.include?(w) end end end
def shutdown(timeout=-1)
threads. Finally, wait 1 second for remaining threads to exit.
Next, wait an extra +@shutdown_grace_time+ seconds then force-kill remaining
Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads.
Tell all threads in the pool to exit and wait for them to finish.
def shutdown(timeout=-1) threads = with_mutex do @shutdown = true @trim_requested = @spawned @not_empty.broadcast @not_full.broadcast @auto_trim&.stop @reaper&.stop # dup workers so that we join them all safely @workers.dup end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else join = ->(inner_timeout) do start = Process.clock_gettime(Process::CLOCK_MONOTONIC) threads.reject! do |t| elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start t.join inner_timeout - elapsed end end # Wait +timeout+ seconds for threads to finish. join.call(timeout) # If threads are still running, raise ForceShutdown and wait to finish. @shutdown_mutex.synchronize do @force_shutdown = true threads.each do |t| t.raise ForceShutdown if t[:with_force_shutdown] end end join.call(@shutdown_grace_time) # If threads are _still_ running, forcefully kill them and wait to finish. threads.each(&:kill) join.call(1) end @spawned = 0 @workers = [] end
def spawn_thread
Must be called with @mutex held!
:nodoc:
def spawn_thread @spawned += 1 th = Thread.new(@spawned) do |spawned| Puma.set_thread_name '%s tp %03i' % [@name, spawned] todo = @todo block = @block mutex = @mutex not_empty = @not_empty not_full = @not_full while true work = nil mutex.synchronize do while todo.empty? if @trim_requested > 0 @trim_requested -= 1 @spawned -= 1 @workers.delete th not_full.signal Thread.exit end @waiting += 1 if @out_of_band_pending && trigger_out_of_band_hook @out_of_band_pending = false end not_full.signal begin not_empty.wait mutex ensure @waiting -= 1 end end work = todo.shift end if @clean_thread_locals ThreadPool.clean_thread_locals end begin @out_of_band_pending = true if block.call(work) rescue Exception => e STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})" end end end @workers << th th end
def trigger_out_of_band_hook
- Version: - 5.0.0
def trigger_out_of_band_hook return false unless @out_of_band&.any? # we execute on idle hook when all threads are free return false unless @spawned == @waiting @out_of_band.each(&:call) true rescue Exception => e STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})" true end
def trim(force=false)
even if all threads are being utilized.
and exit. If +force+ is true, then a trim request is requested
If there are any free threads in the pool, tell one to go ahead
def trim(force=false) with_mutex do free = @waiting - @todo.size if (force or free > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end
def wait_for_less_busy_worker(delay_s)
- Version: - 5.0.0
def wait_for_less_busy_worker(delay_s) return unless delay_s && delay_s > 0 # Ruby MRI does GVL, this can result # in processing contention when multiple threads # (requests) are running concurrently return unless Puma.mri? with_mutex do return if @shutdown # do not delay, if we are not busy return unless busy_threads > 0 # this will be signaled once a request finishes, # which can happen earlier than delay @not_full.wait @mutex, delay_s end end
def wait_until_not_full
by the server. This would continue until a fully buffered request
method would not block and another request would be added into the reactor
to try to buffer the request. In that scenario the next call to this
then the `@todo` array would stay the same size as the reactor works
For example if a slow client has only sent a header, but not a body
request, it might not be added to the `@todo` array right away.
It's important to note that even though the server might accept another
signaled, usually this indicates that a request has been processed.
pause here, and wait until the `not_full` condition variable is
If there are 5 threads and all 5 of them are busy, then it will
can pull a request right away.
requests, this method will not wait and the `Puma::Server`
For example: if there are 5 threads, but only 4 working on
worker can pick it up and process it.
spare capacity, then the request can be left on the socket so the other
number of requests then it is at capacity. If another Puma process has
number of requests at the same time. If it is already processing that
The general idea is that the thread pool can only work on a fixed
pass to the reactor.
the thread pool can pull more requests from the socket and
This method is used by `Puma::Server` to let the server know when
def wait_until_not_full with_mutex do while true return if @shutdown # If we can still spin up new threads and there # is work queued that cannot be handled by waiting # threads, then accept more work until we would # spin up the max number of threads. return if busy_threads < @max @not_full.wait @mutex end end end
def with_force_shutdown
Allows ThreadPool::ForceShutdown to be raised within the
def with_force_shutdown t = Thread.current @shutdown_mutex.synchronize do raise ForceShutdown if @force_shutdown t[:with_force_shutdown] = true end yield ensure t[:with_force_shutdown] = false end
def with_mutex(&block)
Experimental RBS support (using type sampling data from the type_fusion
project).
def with_mutex: () -> untyped
This signature was generated using 3 samples from 1 application.
- Version: - 5.0.0
def with_mutex(&block) @mutex.owned? ? yield : @mutex.synchronize(&block) end