# frozen_string_literal: truerequire'thread'require_relative'io_buffer'modulePuma# Internal Docs for A simple thread pool management object.## Each Puma "worker" has a thread pool to process requests.## First a connection to a client is made in `Puma::Server`. It is wrapped in a# `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure# the whole request is buffered into memory. Once the request is ready, it is passed into# a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.## Each thread in the pool has an internal loop where it pulls a request from the `@todo` array# and processes it.classThreadPoolclassForceShutdown<RuntimeErrorend# How long, after raising the ForceShutdown of a thread during# forced shutdown mode, to wait for the thread to try and finish# up its work before leaving the thread to die on the vine.SHUTDOWN_GRACE_TIME=5# seconds# Maintain a minimum of +min+ and maximum of +max+ threads# in the pool.## The block passed is the work that will be performed in each# thread.#definitialize(name,options={},&block)@not_empty=ConditionVariable.new@not_full=ConditionVariable.new@mutex=Mutex.new@todo=[]@spawned=0@waiting=0@name=name@min=Integer(options[:min_threads])@max=Integer(options[:max_threads])# Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI# to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI# makes stubbing constants difficult.@shutdown_grace_time=Float(options[:pool_shutdown_grace_time]||SHUTDOWN_GRACE_TIME)@block=block@out_of_band=options[:out_of_band]@clean_thread_locals=options[:clean_thread_locals]@before_thread_start=options[:before_thread_start]@before_thread_exit=options[:before_thread_exit]@reaping_time=options[:reaping_time]@auto_trim_time=options[:auto_trim_time]@shutdown=false@trim_requested=0@out_of_band_pending=false@workers=[]@auto_trim=nil@reaper=nil@mutex.synchronizedo@min.timesdospawn_thread@not_full.wait(@mutex)endend@force_shutdown=false@shutdown_mutex=Mutex.newendattr_reader:spawned,:trim_requested,:waitingdefself.clean_thread_localsThread.current.keys.eachdo|key|# rubocop: disable Style/HashEachMethodsThread.current[key]=nilunlesskey==:__recursive_key__endend# generate stats hash so as not to perform multiple locks# @return [Hash] hash containing stat info from ThreadPooldefstatswith_mutexdo{backlog: @todo.size,running: @spawned,pool_capacity: @waiting+(@max-@spawned),busy_threads: @spawned-@waiting+@todo.size}endend# How many objects have yet to be processed by the pool?#defbacklogwith_mutex{@todo.size}end# @!attribute [r] pool_capacitydefpool_capacitywaiting+(@max-spawned)end# @!attribute [r] busy_threads# @version 5.0.0defbusy_threadswith_mutex{@spawned-@waiting+@todo.size}end# :nodoc:## Must be called with @mutex held!#defspawn_thread@spawned+=1trigger_before_thread_start_hooksth=Thread.new(@spawned)do|spawned|Puma.set_thread_name'%s tp %03i'%[@name,spawned]todo=@todoblock=@blockmutex=@mutexnot_empty=@not_emptynot_full=@not_fullwhiletruework=nilmutex.synchronizedowhiletodo.empty?if@trim_requested>0@trim_requested-=1@spawned-=1@workers.deletethnot_full.signaltrigger_before_thread_exit_hooksThread.exitend@waiting+=1if@out_of_band_pending&&trigger_out_of_band_hook@out_of_band_pending=falseendnot_full.signalbeginnot_empty.waitmutexensure@waiting-=1endendwork=todo.shiftendif@clean_thread_localsThreadPool.clean_thread_localsendbegin@out_of_band_pending=trueifblock.call(work)rescueException=>eSTDERR.puts"Error reached top of thread-pool: #{e.message} (#{e.class})"endendend@workers<<ththendprivate:spawn_threaddeftrigger_before_thread_start_hooksreturnunless@before_thread_start&.any?@before_thread_start.eachdo|b|beginb.callrescueException=>eSTDERR.puts"WARNING before_thread_start hook failed with exception (#{e.class}) #{e.message}"endendnilendprivate:trigger_before_thread_start_hooksdeftrigger_before_thread_exit_hooksreturnunless@before_thread_exit&.any?@before_thread_exit.eachdo|b|beginb.callrescueException=>eSTDERR.puts"WARNING before_thread_exit hook failed with exception (#{e.class}) #{e.message}"endendnilendprivate:trigger_before_thread_exit_hooks# @version 5.0.0deftrigger_out_of_band_hookreturnfalseunless@out_of_band&.any?# we execute on idle hook when all threads are freereturnfalseunless@spawned==@waiting@out_of_band.each(&:call)truerescueException=>eSTDERR.puts"Exception calling out_of_band_hook: #{e.message} (#{e.class})"trueendprivate:trigger_out_of_band_hook# @version 5.0.0defwith_mutex(&block)@mutex.owned??yield:@mutex.synchronize(&block)end# Add +work+ to the todo list for a Thread to pickup and process.def<<(work)with_mutexdoif@shutdownraise"Unable to add work while shutting down"end@todo<<workif@waiting<@todo.sizeand@spawned<@maxspawn_threadend@not_empty.signalendend# This method is used by `Puma::Server` to let the server know when# the thread pool can pull more requests from the socket and# pass to the reactor.## The general idea is that the thread pool can only work on a fixed# number of requests at the same time. If it is already processing that# number of requests then it is at capacity. If another Puma process has# spare capacity, then the request can be left on the socket so the other# worker can pick it up and process it.## For example: if there are 5 threads, but only 4 working on# requests, this method will not wait and the `Puma::Server`# can pull a request right away.## If there are 5 threads and all 5 of them are busy, then it will# pause here, and wait until the `not_full` condition variable is# signaled, usually this indicates that a request has been processed.## It's important to note that even though the server might accept another# request, it might not be added to the `@todo` array right away.# For example if a slow client has only sent a header, but not a body# then the `@todo` array would stay the same size as the reactor works# to try to buffer the request. In that scenario the next call to this# method would not block and another request would be added into the reactor# by the server. This would continue until a fully buffered request# makes it through the reactor and can then be processed by the thread pool.defwait_until_not_fullwith_mutexdowhiletruereturnif@shutdown# If we can still spin up new threads and there# is work queued that cannot be handled by waiting# threads, then accept more work until we would# spin up the max number of threads.returnifbusy_threads<@max@not_full.wait@mutexendendend# @version 5.0.0defwait_for_less_busy_worker(delay_s)returnunlessdelay_s&&delay_s>0# Ruby MRI does GVL, this can result# in processing contention when multiple threads# (requests) are running concurrentlyreturnunlessPuma.mri?with_mutexdoreturnif@shutdown# do not delay, if we are not busyreturnunlessbusy_threads>0# this will be signaled once a request finishes,# which can happen earlier than delay@not_full.wait@mutex,delay_sendend# If there are any free threads in the pool, tell one to go ahead# and exit. If +force+ is true, then a trim request is requested# even if all threads are being utilized.#deftrim(force=false)with_mutexdofree=@waiting-@todo.sizeif(forceorfree>0)and@spawned-@trim_requested>@min@trim_requested+=1@not_empty.signalendendend# If there are dead threads in the pool make them go away while decreasing# spawned counter so that new healthy threads could be created again.defreapwith_mutexdodead_workers=@workers.reject(&:alive?)dead_workers.eachdo|worker|worker.kill@spawned-=1end@workers.delete_ifdo|w|dead_workers.include?(w)endendendclassAutomatondefinitialize(pool,timeout,thread_name,message)@pool=pool@timeout=timeout@thread_name=thread_name@message=message@running=falseenddefstart!@running=true@thread=Thread.newdoPuma.set_thread_name@thread_namewhile@running@pool.public_send(@message)sleep@timeoutendendenddefstop@running=false@thread.wakeupendenddefauto_trim!(timeout=@auto_trim_time)@auto_trim=Automaton.new(self,timeout,"#{@name} tp trim",:trim)@auto_trim.start!enddefauto_reap!(timeout=@reaping_time)@reaper=Automaton.new(self,timeout,"#{@name} tp reap",:reap)@reaper.start!end# Allows ThreadPool::ForceShutdown to be raised within the# provided block if the thread is forced to shutdown during execution.defwith_force_shutdownt=Thread.current@shutdown_mutex.synchronizedoraiseForceShutdownif@force_shutdownt[:with_force_shutdown]=trueendyieldensuret[:with_force_shutdown]=falseend# Tell all threads in the pool to exit and wait for them to finish.# Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads.# Next, wait an extra +@shutdown_grace_time+ seconds then force-kill remaining# threads. Finally, wait 1 second for remaining threads to exit.#defshutdown(timeout=-1)threads=with_mutexdo@shutdown=true@trim_requested=@spawned@not_empty.broadcast@not_full.broadcast@auto_trim&.stop@reaper&.stop# dup workers so that we join them all safely@workers.dupendiftimeout==-1# Wait for threads to finish without force shutdown.threads.each(&:join)elsejoin=->(inner_timeout)dostart=Process.clock_gettime(Process::CLOCK_MONOTONIC)threads.reject!do|t|elapsed=Process.clock_gettime(Process::CLOCK_MONOTONIC)-startt.joininner_timeout-elapsedendend# Wait +timeout+ seconds for threads to finish.join.call(timeout)# If threads are still running, raise ForceShutdown and wait to finish.@shutdown_mutex.synchronizedo@force_shutdown=truethreads.eachdo|t|t.raiseForceShutdownift[:with_force_shutdown]endendjoin.call(@shutdown_grace_time)# If threads are _still_ running, forcefully kill them and wait to finish.threads.each(&:kill)join.call(1)end@spawned=0@workers=[]endendend