lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb
require 'concurrent/utility/engine' require 'concurrent/executor/thread_pool_executor' module Concurrent # @!macro thread_pool_executor_constant_default_max_pool_size # Default maximum number of threads that will be created in the pool. # @!macro thread_pool_executor_constant_default_min_pool_size # Default minimum number of threads that will be retained in the pool. # @!macro thread_pool_executor_constant_default_max_queue_size # Default maximum number of tasks that may be added to the task queue. # @!macro thread_pool_executor_constant_default_thread_timeout # Default maximum number of seconds a thread in the pool may remain idle # before being reclaimed. # @!macro thread_pool_executor_constant_default_synchronous # Default value of the :synchronous option. # @!macro thread_pool_executor_attr_reader_max_length # The maximum number of threads that may be created in the pool. # @return [Integer] The maximum number of threads that may be created in the pool. # @!macro thread_pool_executor_attr_reader_min_length # The minimum number of threads that may be retained in the pool. # @return [Integer] The minimum number of threads that may be retained in the pool. # @!macro thread_pool_executor_attr_reader_largest_length # The largest number of threads that have been created in the pool since construction. # @return [Integer] The largest number of threads that have been created in the pool since construction. # @!macro thread_pool_executor_attr_reader_scheduled_task_count # The number of tasks that have been scheduled for execution on the pool since construction. # @return [Integer] The number of tasks that have been scheduled for execution on the pool since construction. # @!macro thread_pool_executor_attr_reader_completed_task_count # The number of tasks that have been completed by the pool since construction. # @return [Integer] The number of tasks that have been completed by the pool since construction. # @!macro thread_pool_executor_attr_reader_idletime # The number of seconds that a thread may be idle before being reclaimed. # @return [Integer] The number of seconds that a thread may be idle before being reclaimed. # @!macro thread_pool_executor_attr_reader_synchronous # Whether or not a value of 0 for :max_queue option means the queue must perform direct hand-off or rather unbounded queue. # @return [true, false] # @!macro thread_pool_executor_attr_reader_max_queue # The maximum number of tasks that may be waiting in the work queue at any one time. # When the queue size reaches `max_queue` subsequent tasks will be rejected in # accordance with the configured `fallback_policy`. # # @return [Integer] The maximum number of tasks that may be waiting in the work queue at any one time. # When the queue size reaches `max_queue` subsequent tasks will be rejected in # accordance with the configured `fallback_policy`. # @!macro thread_pool_executor_attr_reader_length # The number of threads currently in the pool. # @return [Integer] The number of threads currently in the pool. # @!macro thread_pool_executor_attr_reader_queue_length # The number of tasks in the queue awaiting execution. # @return [Integer] The number of tasks in the queue awaiting execution. # @!macro thread_pool_executor_attr_reader_remaining_capacity # Number of tasks that may be enqueued before reaching `max_queue` and rejecting # new tasks. A value of -1 indicates that the queue may grow without bound. # # @return [Integer] Number of tasks that may be enqueued before reaching `max_queue` and rejecting # new tasks. A value of -1 indicates that the queue may grow without bound. # @!macro thread_pool_executor_method_prune_pool # Prune the thread pool of unneeded threads # # What is being pruned is controlled by the min_threads and idletime # parameters passed at pool creation time # # This is a no-op on some pool implementation (e.g. the Java one). The Ruby # pool will auto-prune each time a new job is posted. You will need to call # this method explicitely in case your application post jobs in bursts (a # lot of jobs and then nothing for long periods) # @!macro thread_pool_executor_public_api # # @!macro abstract_executor_service_public_api # # @!attribute [r] max_length # @!macro thread_pool_executor_attr_reader_max_length # # @!attribute [r] min_length # @!macro thread_pool_executor_attr_reader_min_length # # @!attribute [r] largest_length # @!macro thread_pool_executor_attr_reader_largest_length # # @!attribute [r] scheduled_task_count # @!macro thread_pool_executor_attr_reader_scheduled_task_count # # @!attribute [r] completed_task_count # @!macro thread_pool_executor_attr_reader_completed_task_count # # @!attribute [r] idletime # @!macro thread_pool_executor_attr_reader_idletime # # @!attribute [r] max_queue # @!macro thread_pool_executor_attr_reader_max_queue # # @!attribute [r] length # @!macro thread_pool_executor_attr_reader_length # # @!attribute [r] queue_length # @!macro thread_pool_executor_attr_reader_queue_length # # @!attribute [r] remaining_capacity # @!macro thread_pool_executor_attr_reader_remaining_capacity # # @!method can_overflow? # @!macro executor_service_method_can_overflow_question # # @!method prune_pool # @!macro thread_pool_executor_method_prune_pool # @!macro thread_pool_options # # **Thread Pool Options** # # Thread pools support several configuration options: # # * `idletime`: The number of seconds that a thread may be idle before being reclaimed. # * `name`: The name of the executor (optional). Printed in the executor's `#to_s` output and # a `<name>-worker-<id>` name is given to its threads if supported by used Ruby # implementation. `<id>` is uniq for each thread. # * `max_queue`: The maximum number of tasks that may be waiting in the work queue at # any one time. When the queue size reaches `max_queue` and no new threads can be created, # subsequent tasks will be rejected in accordance with the configured `fallback_policy`. # * `auto_terminate`: When true (default), the threads started will be marked as daemon. # * `fallback_policy`: The policy defining how rejected tasks are handled. # # Three fallback policies are supported: # # * `:abort`: Raise a `RejectedExecutionError` exception and discard the task. # * `:discard`: Discard the task and return false. # * `:caller_runs`: Execute the task on the calling thread. # # **Shutting Down Thread Pools** # # Killing a thread pool while tasks are still being processed, either by calling # the `#kill` method or at application exit, will have unpredictable results. There # is no way for the thread pool to know what resources are being used by the # in-progress tasks. When those tasks are killed the impact on those resources # cannot be predicted. The *best* practice is to explicitly shutdown all thread # pools using the provided methods: # # * Call `#shutdown` to initiate an orderly termination of all in-progress tasks # * Call `#wait_for_termination` with an appropriate timeout interval an allow # the orderly shutdown to complete # * Call `#kill` *only when* the thread pool fails to shutdown in the allotted time # # On some runtime platforms (most notably the JVM) the application will not # exit until all thread pools have been shutdown. To prevent applications from # "hanging" on exit, all threads can be marked as daemon according to the # `:auto_terminate` option. # # ```ruby # pool1 = Concurrent::FixedThreadPool.new(5) # threads will be marked as daemon # pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # mark threads as non-daemon # ``` # # @note Failure to properly shutdown a thread pool can lead to unpredictable results. # Please read *Shutting Down Thread Pools* for more information. # # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html Java Tutorials: Thread Pools # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html Java Executors class # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html Java ExecutorService interface # @see https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#setDaemon-boolean- # @!macro fixed_thread_pool # # A thread pool that reuses a fixed number of threads operating off an unbounded queue. # At any point, at most `num_threads` will be active processing tasks. When all threads are busy new # tasks `#post` to the thread pool are enqueued until a thread becomes available. # Should a thread crash for any reason the thread will immediately be removed # from the pool and replaced. # # The API and behavior of this class are based on Java's `FixedThreadPool` # # @!macro thread_pool_options class FixedThreadPool < ThreadPoolExecutor # @!macro fixed_thread_pool_method_initialize # # Create a new thread pool. # # @param [Integer] num_threads the number of threads to allocate # @param [Hash] opts the options defining pool behavior. # @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy # # @raise [ArgumentError] if `num_threads` is less than or equal to zero # @raise [ArgumentError] if `fallback_policy` is not a known policy # # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int- def initialize(num_threads, opts = {}) raise ArgumentError.new('number of threads must be greater than zero') if num_threads.to_i < 1 defaults = { max_queue: DEFAULT_MAX_QUEUE_SIZE, idletime: DEFAULT_THREAD_IDLETIMEOUT } overrides = { min_threads: num_threads, max_threads: num_threads } super(defaults.merge(opts).merge(overrides)) end end end