lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb
require 'concurrent/utility/engine' require 'concurrent/executor/thread_pool_executor' module Concurrent # A thread pool that dynamically grows and shrinks to fit the current workload. # New threads are created as needed, existing threads are reused, and threads # that remain idle for too long are killed and removed from the pool. These # pools are particularly suited to applications that perform a high volume of # short-lived tasks. # # On creation a `CachedThreadPool` has zero running threads. New threads are # created on the pool as new operations are `#post`. The size of the pool # will grow until `#max_length` threads are in the pool or until the number # of threads exceeds the number of running and pending operations. When a new # operation is post to the pool the first available idle thread will be tasked # with the new operation. # # Should a thread crash for any reason the thread will immediately be removed # from the pool. Similarly, threads which remain idle for an extended period # of time will be killed and reclaimed. Thus these thread pools are very # efficient at reclaiming unused resources. # # The API and behavior of this class are based on Java's `CachedThreadPool` # # @!macro thread_pool_options class CachedThreadPool < ThreadPoolExecutor # @!macro cached_thread_pool_method_initialize # # Create a new thread pool. # # @param [Hash] opts the options defining pool behavior. # @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy # # @raise [ArgumentError] if `fallback_policy` is not a known policy # # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool-- def initialize(opts = {}) defaults = { idletime: DEFAULT_THREAD_IDLETIMEOUT } overrides = { min_threads: 0, max_threads: DEFAULT_MAX_POOL_SIZE, max_queue: DEFAULT_MAX_QUEUE_SIZE } super(defaults.merge(opts).merge(overrides)) end private # @!macro cached_thread_pool_method_initialize # @!visibility private def ns_initialize(opts) super(opts) if Concurrent.on_jruby? @max_queue = 0 @executor = java.util.concurrent.Executors.newCachedThreadPool( DaemonThreadFactory.new(ns_auto_terminate?)) @executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new) @executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS) end end end end