class Concurrent::RubyThreadPoolExecutor

@!macro thread_pool_executor

def create_worker_thread

Returns:
  • (Thread) - the new thread.
def create_worker_thread
  wrkr = RubyThreadPoolWorker.new(@queue, self)
  Thread.new(wrkr, self) do |worker, parent|
    Thread.current.abort_on_exception = false
    worker.run
    parent.on_worker_exit(worker)
  end
  return wrkr
end

def drain_pool

@!visibility private

Reclaim all threads in the pool.
def drain_pool
  @pool.each {|worker| worker.kill }
  @pool.clear
end

def ensure_capacity?

Returns:
  • (Boolean) - true if the pool has enough capacity else false
def ensure_capacity?
  additional = 0
  capacity = true
  if @pool.size < @min_length
    additional = @min_length - @pool.size
  elsif @queue.empty? && @queue.num_waiting >= 1
    additional = 0
  elsif @pool.size == 0 && @min_length == 0
    additional = 1
  elsif @pool.size < @max_length || @max_length == 0
    additional = 1
  elsif @max_queue == 0 || @queue.size < @max_queue
    additional = 0
  else
    capacity = false
  end
  additional.times do
    @pool << create_worker_thread
  end
  if additional > 0
    @largest_length = [@largest_length, @pool.length].max
  end
  capacity
end

def execute(*args, &task)

@!visibility private
def execute(*args, &task)
  prune_pool
  if ensure_capacity?
    @scheduled_task_count += 1
    @queue << [args, task]
  else
    handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
  end
end

def handle_overflow(*args)

Parameters:
  • args (Array) -- the arguments to the task which is being handled.
def handle_overflow(*args)
  case @overflow_policy
  when :abort
    raise RejectedExecutionError
  when :discard
    false
  when :caller_runs
    begin
      yield(*args)
    rescue
      # let it fail
    end
    true
  end
end

def initialize(opts = {})

Other tags:
    See: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html -

Raises:
  • (ArgumentError) - if `:overflow_policy` is not one of the values specified
  • (ArgumentError) - if `:min_threads` is less than zero
  • (ArgumentError) - if `:max_threads` is less than one

Options Hash: (**opts)
  • :overflow_policy (Symbol) -- the policy for handling new
  • :max_queue (Integer) -- the maximum
  • :idletime (Integer) -- the maximum
  • :min_threads (Integer) -- the minimum
  • :max_threads (Integer) -- the maximum

Parameters:
  • opts (Hash) -- the options which configure the thread pool
def initialize(opts = {})
  @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
  @overflow_policy = opts.fetch(:overflow_policy, :abort)
  raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
  raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
  raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
  init_executor
  @pool = []
  @queue = Queue.new
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length = 0
  @gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
  @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
end

def kill_execution

@!visibility private
def kill_execution
  @queue.clear
  drain_pool
end

def length

Returns:
  • (Integer) - the length
def length
  mutex.synchronize{ running? ? @pool.length : 0 }
end

def on_end_task

@!visibility private

Run on task completion.
def on_end_task
  mutex.synchronize do
    @completed_task_count += 1 #if success
    break unless running?
  end
end

def on_worker_exit(worker)

@!visibility private

Run when a thread worker exits.
def on_worker_exit(worker)
  mutex.synchronize do
    @pool.delete(worker)
    if @pool.empty? && ! running?
      stop_event.set
      stopped_event.set
    end
  end
end

def prune_pool

@!visibility private

interval has passed.
pruned and only run if the configured garbage collection
have been idle too long. Will check the last time the pool was
Scan all threads in the pool and reclaim any that are dead or
def prune_pool
  if Time.now.to_f - @gc_interval >= @last_gc_time
    @pool.delete_if do |worker|
      worker.dead? ||
        (@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity)
    end
    @last_gc_time = Time.now.to_f
  end
end

def queue_length

Returns:
  • (Integer) - the queue_length
def queue_length
  mutex.synchronize{ running? ? @queue.length : 0 }
end

def remaining_capacity

Returns:
  • (Integer) - the remaining_capacity
def remaining_capacity
  mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
end

def shutdown_execution

@!visibility private
def shutdown_execution
  @queue.clear
  if @pool.empty?
    stopped_event.set
  else
    @pool.length.times{ @queue << :stop }
  end
end

def status

This method is deprecated and will be removed soon.

Returns an array with the status of each thread in the pool
def status
  warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
  mutex.synchronize { @pool.collect { |worker| worker.status } }
end