lib/concurrent/executor/java_thread_pool_executor.rb



if RUBY_PLATFORM == 'java'
  require_relative 'executor'

  module Concurrent

    # @!macro thread_pool_executor
    class JavaThreadPoolExecutor
      include JavaExecutor

      # Default maximum number of threads that will be created in the pool.
      DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647

      # Default minimum number of threads that will be retained in the pool.
      DEFAULT_MIN_POOL_SIZE = 0

      # Default maximum number of tasks that may be added to the task queue.
      DEFAULT_MAX_QUEUE_SIZE = 0

      # Default maximum number of seconds a thread in the pool may remain idle
      # before being reclaimed.
      DEFAULT_THREAD_IDLETIMEOUT = 60

      # The set of possible overflow policies that may be set at thread pool creation.
      OVERFLOW_POLICIES = {
        abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
        discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
        caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
      }.freeze

      # The maximum number of threads that may be created in the pool.
      attr_reader :max_length

      # The maximum number of tasks that may be waiting in the work queue at any one time.
      # When the queue size reaches `max_queue` subsequent tasks will be rejected in
      # accordance with the configured `overflow_policy`.
      attr_reader :max_queue

      # The policy defining how rejected tasks (tasks received once the queue size reaches
      # the configured `max_queue`) are handled. Must be one of the values specified in
      # `OVERFLOW_POLICIES`.
      attr_reader :overflow_policy

      # Create a new thread pool.
      #
      # @param [Hash] opts the options which configure the thread pool
      #
      # @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum
      #   number of threads to be created
      # @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) the minimum
      #   number of threads to be retained
      # @option opts [Integer] :idletime (DEFAULT_THREAD_IDLETIMEOUT) the maximum
      #   number of seconds a thread may be idle before being reclaimed
      # @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum
      #   number of tasks allowed in the work queue at any one time; a value of
      #   zero means the queue may grow without bounnd
      # @option opts [Symbol] :overflow_policy (:abort) the policy for handling new
      #   tasks that are received when the queue size has reached `max_queue`
      #
      # @raise [ArgumentError] if `:max_threads` is less than one
      # @raise [ArgumentError] if `:min_threads` is less than zero
      # @raise [ArgumentError] if `:overflow_policy` is not one of the values specified
      #   in `OVERFLOW_POLICIES`
      #
      # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
      def initialize(opts = {})
        min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
        max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
        idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
        @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
        @overflow_policy = opts.fetch(:overflow_policy, :abort)

        raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
        raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
        raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)

        if min_length == 0 && @max_queue == 0
          queue = java.util.concurrent.SynchronousQueue.new
        elsif @max_queue == 0
          queue = java.util.concurrent.LinkedBlockingQueue.new
        else
          queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
        end

        @executor = java.util.concurrent.ThreadPoolExecutor.new(
          min_length, max_length,
          idletime, java.util.concurrent.TimeUnit::SECONDS,
          queue, OVERFLOW_POLICIES[@overflow_policy].new)

        set_shutdown_hook
      end

      # The minimum number of threads that may be retained in the pool.
      #
      # @return [Integer] the min_length
      def min_length
        @executor.getCorePoolSize
      end

      # The maximum number of threads that may be created in the pool.
      #
      # @return [Integer] the max_length
      def max_length
        @executor.getMaximumPoolSize
      end

      # The number of threads currently in the pool.
      #
      # @return [Integer] the length
      def length
        @executor.getPoolSize
      end
      alias_method :current_length, :length

      # The largest number of threads that have been created in the pool since construction.
      #
      # @return [Integer] the largest_length
      def largest_length
        @executor.getLargestPoolSize
      end

      # The number of tasks that have been scheduled for execution on the pool since construction.
      #
      # @return [Integer] the scheduled_task_count
      def scheduled_task_count
        @executor.getTaskCount
      end

      # The number of tasks that have been completed by the pool since construction.
      #
      # @return [Integer] the completed_task_count
      def completed_task_count
        @executor.getCompletedTaskCount
      end

      # The number of seconds that a thread may be idle before being reclaimed.
      #
      # @return [Integer] the idletime
      def idletime
        @executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
      end

      # The number of tasks in the queue awaiting execution.
      #
      # @return [Integer] the queue_length
      def queue_length
        @executor.getQueue.size
      end

      # Number of tasks that may be enqueued before reaching `max_queue` and rejecting
      # new tasks. A value of -1 indicates that the queue may grow without bound.
      #
      # @return [Integer] the remaining_capacity
      def remaining_capacity
        @max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity
      end

      # This method is deprecated and will be removed soon.
      # This method is supost to return the threads status, but Java API doesn't
      # provide a way to get the thread status. So we return an empty Array instead.
      def status
        warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
        warn "Calls to `status` return an empty Array. Java ThreadPoolExecutor does not provide thread's status."
        []
      end

      # Is the thread pool running?
      #
      # @return [Boolean] `true` when running, `false` when shutting down or shutdown
      def running?
        super && ! @executor.isTerminating
      end

      # Begin an orderly shutdown. Tasks already in the queue will be executed,
      # but no new tasks will be accepted. Has no additional effect if the
      # thread pool is not running.
      def shutdown
        super
        @executor.getQueue.clear
        nil
      end
    end
  end
end