require'thread'require'concurrent/atomic/event'require'concurrent/concern/logging'require'concurrent/executor/ruby_executor_service'require'concurrent/utility/monotonic_time'moduleConcurrent# @!macro thread_pool_executor# @!macro thread_pool_options# @!visibility privateclassRubyThreadPoolExecutor<RubyExecutorService# @!macro thread_pool_executor_constant_default_max_pool_sizeDEFAULT_MAX_POOL_SIZE=2_147_483_647# java.lang.Integer::MAX_VALUE# @!macro thread_pool_executor_constant_default_min_pool_sizeDEFAULT_MIN_POOL_SIZE=0# @!macro thread_pool_executor_constant_default_max_queue_sizeDEFAULT_MAX_QUEUE_SIZE=0# @!macro thread_pool_executor_constant_default_thread_timeoutDEFAULT_THREAD_IDLETIMEOUT=60# @!macro thread_pool_executor_constant_default_synchronousDEFAULT_SYNCHRONOUS=false# @!macro thread_pool_executor_attr_reader_max_lengthattr_reader:max_length# @!macro thread_pool_executor_attr_reader_min_lengthattr_reader:min_length# @!macro thread_pool_executor_attr_reader_idletimeattr_reader:idletime# @!macro thread_pool_executor_attr_reader_max_queueattr_reader:max_queue# @!macro thread_pool_executor_attr_reader_synchronousattr_reader:synchronous# @!macro thread_pool_executor_method_initializedefinitialize(opts={})super(opts)end# @!macro thread_pool_executor_attr_reader_largest_lengthdeflargest_lengthsynchronize{@largest_length}end# @!macro thread_pool_executor_attr_reader_scheduled_task_countdefscheduled_task_countsynchronize{@scheduled_task_count}end# @!macro thread_pool_executor_attr_reader_completed_task_countdefcompleted_task_countsynchronize{@completed_task_count}end# @!macro executor_service_method_can_overflow_questiondefcan_overflow?synchronize{ns_limited_queue?}end# @!macro thread_pool_executor_attr_reader_lengthdeflengthsynchronize{@pool.length}end# @!macro thread_pool_executor_attr_reader_queue_lengthdefqueue_lengthsynchronize{@queue.length}end# @!macro thread_pool_executor_attr_reader_remaining_capacitydefremaining_capacitysynchronizedoifns_limited_queue?@max_queue-@queue.lengthelse-1endendend# @!visibility privatedefremove_busy_worker(worker)synchronize{ns_remove_busy_workerworker}end# @!visibility privatedefready_worker(worker,last_message)synchronize{ns_ready_workerworker,last_message}end# @!visibility privatedefworker_died(worker)synchronize{ns_worker_diedworker}end# @!visibility privatedefworker_task_completedsynchronize{@completed_task_count+=1}end# @!macro thread_pool_executor_method_prune_pooldefprune_poolsynchronize{ns_prune_pool}endprivate# @!visibility privatedefns_initialize(opts)@min_length=opts.fetch(:min_threads,DEFAULT_MIN_POOL_SIZE).to_i@max_length=opts.fetch(:max_threads,DEFAULT_MAX_POOL_SIZE).to_i@idletime=opts.fetch(:idletime,DEFAULT_THREAD_IDLETIMEOUT).to_i@max_queue=opts.fetch(:max_queue,DEFAULT_MAX_QUEUE_SIZE).to_i@synchronous=opts.fetch(:synchronous,DEFAULT_SYNCHRONOUS)@fallback_policy=opts.fetch(:fallback_policy,:abort)raiseArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0")if@synchronous&&@max_queue>0raiseArgumentError.new("#{@fallback_policy} is not a valid fallback policy")unlessFALLBACK_POLICIES.include?(@fallback_policy)raiseArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}")if@max_length<DEFAULT_MIN_POOL_SIZEraiseArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}")if@max_length>DEFAULT_MAX_POOL_SIZEraiseArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}")if@min_length<DEFAULT_MIN_POOL_SIZEraiseArgumentError.new("`min_threads` cannot be more than `max_threads`")ifmin_length>max_length@pool=[]# all workers@ready=[]# used as a stash (most idle worker is at the start)@queue=[]# used as queue# @ready or @queue is empty at all times@scheduled_task_count=0@completed_task_count=0@largest_length=0@workers_counter=0@ruby_pid=$$# detects if Ruby has forked@gc_interval=opts.fetch(:gc_interval,@idletime/2.0).to_i# undocumented@next_gc_time=Concurrent.monotonic_time+@gc_intervalend# @!visibility privatedefns_limited_queue?@max_queue!=0end# @!visibility privatedefns_execute(*args,&task)ns_reset_if_forkedifns_assign_worker(*args,&task)||ns_enqueue(*args,&task)@scheduled_task_count+=1elsereturnfallback_action(*args,&task)endns_prune_poolif@next_gc_time<Concurrent.monotonic_timenilend# @!visibility privatedefns_shutdown_executionns_reset_if_forkedif@pool.empty?# nothing to dostopped_event.setendif@queue.empty?# no more tasks will be accepted, just stop all workers@pool.each(&:stop)endend# @!visibility privatedefns_kill_execution# TODO log out unprocessed tasks in queue# TODO try to shutdown first?@pool.each(&:kill)@pool.clear@ready.clearend# tries to assign task to a worker, tries to get one from @ready or to create new one# @return [true, false] if task is assigned to a worker## @!visibility privatedefns_assign_worker(*args,&task)# keep growing if the pool is not at the minimum yetworker,_=(@ready.popif@pool.size>=@min_length)||ns_add_busy_workerifworkerworker<<[task,args]trueelsefalseendrescueThreadError# Raised when the operating system refuses to create the new threadreturnfalseend# tries to enqueue task# @return [true, false] if enqueued## @!visibility privatedefns_enqueue(*args,&task)returnfalseif@synchronousif!ns_limited_queue?||@queue.size<@max_queue@queue<<[task,args]trueelsefalseendend# @!visibility privatedefns_worker_died(worker)ns_remove_busy_workerworkerreplacement_worker=ns_add_busy_workerns_ready_workerreplacement_worker,Concurrent.monotonic_time,falseifreplacement_workerend# creates new worker which has to receive work to do after it's added# @return [nil, Worker] nil of max capacity is reached## @!visibility privatedefns_add_busy_workerreturnif@pool.size>=@max_length@workers_counter+=1@pool<<(worker=Worker.new(self,@workers_counter))@largest_length=@pool.lengthif@pool.length>@largest_lengthworkerend# handle ready worker, giving it new job or assigning back to @ready## @!visibility privatedefns_ready_worker(worker,last_message,success=true)task_and_args=@queue.shiftiftask_and_argsworker<<task_and_argselse# stop workers when !running?, do not return them to @readyifrunning?raiseunlesslast_message@ready.push([worker,last_message])elseworker.stopendendend# removes a worker which is not in not tracked in @ready## @!visibility privatedefns_remove_busy_worker(worker)@pool.delete(worker)stopped_event.setif@pool.empty?&&!running?trueend# try oldest worker if it is idle for enough time, it's returned back at the start## @!visibility privatedefns_prune_poolnow=Concurrent.monotonic_timestopped_workers=0while!@ready.empty?&&(@pool.size-stopped_workers>@min_length)worker,last_message=@ready.firstifnow-last_message>self.idletimestopped_workers+=1@ready.shiftworker<<:stopelsebreakendend@next_gc_time=Concurrent.monotonic_time+@gc_intervalenddefns_reset_if_forkedif$$!=@ruby_pid@queue.clear@ready.clear@pool.clear@scheduled_task_count=0@completed_task_count=0@largest_length=0@workers_counter=0@ruby_pid=$$endend# @!visibility privateclassWorkerincludeConcern::Loggingdefinitialize(pool,id)# instance variables accessed only under pool's lock so no need to sync here again@queue=Queue.new@pool=pool@thread=create_worker@queue,pool,pool.idletimeif@thread.respond_to?(:name=)@thread.name=[pool.name,'worker',id].compact.join('-')endenddef<<(message)@queue<<messageenddefstop@queue<<:stopenddefkill@thread.killendprivatedefcreate_worker(queue,pool,idletime)Thread.new(queue,pool,idletime)do|my_queue,my_pool,my_idletime|catch(:stop)doloopdocasemessage=my_queue.popwhen:stopmy_pool.remove_busy_worker(self)throw:stopelsetask,args=messagerun_taskmy_pool,task,argsmy_pool.ready_worker(self,Concurrent.monotonic_time)endendendendenddefrun_task(pool,task,args)task.call(*args)pool.worker_task_completedrescue=>ex# let it faillogDEBUG,exrescueException=>exlogERROR,expool.worker_died(self)throw:stopendendprivate_constant:Workerendend