class Concurrent::RubyThreadPoolExecutor
@!macro thread_pool_executor
def create_worker_thread
-
(Thread)
- the new thread.
def create_worker_thread wrkr = RubyThreadPoolWorker.new(@queue, self) Thread.new(wrkr, self) do |worker, parent| Thread.current.abort_on_exception = false worker.run parent.on_worker_exit(worker) end return wrkr end
def drain_pool
Reclaim all threads in the pool.
def drain_pool @pool.each {|worker| worker.kill } @pool.clear end
def ensure_capacity?
-
(Boolean)
- true if the pool has enough capacity else false
def ensure_capacity? additional = 0 capacity = true if @pool.size < @min_length additional = @min_length - @pool.size elsif @queue.empty? && @queue.num_waiting >= 1 additional = 0 elsif @pool.size == 0 && @min_length == 0 additional = 1 elsif @pool.size < @max_length || @max_length == 0 additional = 1 elsif @max_queue == 0 || @queue.size < @max_queue additional = 0 else capacity = false end additional.times do @pool << create_worker_thread end if additional > 0 @largest_length = [@largest_length, @pool.length].max end capacity end
def execute(*args, &task)
def execute(*args, &task) prune_pool if ensure_capacity? @scheduled_task_count += 1 @queue << [args, task] else handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue end end
def handle_overflow(*args)
-
args
(Array
) -- the arguments to the task which is being handled.
def handle_overflow(*args) case @overflow_policy when :abort raise RejectedExecutionError when :discard false when :caller_runs begin yield(*args) rescue # let it fail end true end end
def initialize(opts = {})
- See: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html -
Raises:
-
(ArgumentError)
- if `:overflow_policy` is not one of the values specified -
(ArgumentError)
- if `:min_threads` is less than zero -
(ArgumentError)
- if `:max_threads` is less than one
Options Hash:
(**opts)
-
:overflow_policy
(Symbol
) -- the policy for handling new -
:max_queue
(Integer
) -- the maximum -
:idletime
(Integer
) -- the maximum -
:min_threads
(Integer
) -- the minimum -
:max_threads
(Integer
) -- the maximum
Parameters:
-
opts
(Hash
) -- the options which configure the thread pool
def initialize(opts = {}) @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i @overflow_policy = opts.fetch(:overflow_policy, :abort) raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0 raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0 raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy) init_executor @pool = [] @queue = Queue.new @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max end
def kill_execution
def kill_execution @queue.clear drain_pool end
def length
-
(Integer)
- the length
def length mutex.synchronize{ running? ? @pool.length : 0 } end
def on_end_task
Run on task completion.
def on_end_task mutex.synchronize do @completed_task_count += 1 #if success break unless running? end end
def on_worker_exit(worker)
Run when a thread worker exits.
def on_worker_exit(worker) mutex.synchronize do @pool.delete(worker) if @pool.empty? && ! running? stop_event.set stopped_event.set end end end
def prune_pool
interval has passed.
pruned and only run if the configured garbage collection
have been idle too long. Will check the last time the pool was
Scan all threads in the pool and reclaim any that are dead or
def prune_pool if Time.now.to_f - @gc_interval >= @last_gc_time @pool.delete_if do |worker| worker.dead? || (@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity) end @last_gc_time = Time.now.to_f end end
def queue_length
-
(Integer)
- the queue_length
def queue_length mutex.synchronize{ running? ? @queue.length : 0 } end
def remaining_capacity
-
(Integer)
- the remaining_capacity
def remaining_capacity mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length } end
def shutdown_execution
def shutdown_execution @queue.clear if @pool.empty? stopped_event.set else @pool.length.times{ @queue << :stop } end end
def status
Returns an array with the status of each thread in the pool
def status warn '[DEPRECATED] `status` is deprecated and will be removed soon.' mutex.synchronize { @pool.collect { |worker| worker.status } } end