lib/concurrent/cached_thread_pool.rb



require 'thread'

require 'concurrent/event'
require 'concurrent/cached_thread_pool/worker'

module Concurrent

  class CachedThreadPool

    MIN_POOL_SIZE = 1
    MAX_POOL_SIZE = 256

    DEFAULT_THREAD_IDLETIME = 60

    attr_accessor :max_threads

    def initialize(opts = {})
      @idletime = (opts[:idletime] || DEFAULT_THREAD_IDLETIME).to_i
      raise ArgumentError.new('idletime must be greater than zero') if @idletime <= 0

      @max_threads = opts[:max_threads] || opts[:max] || MAX_POOL_SIZE
      if @max_threads < MIN_POOL_SIZE || @max_threads > MAX_POOL_SIZE
        raise ArgumentError.new("size must be from #{MIN_POOL_SIZE} to #{MAX_POOL_SIZE}")
      end

      @state = :running
      @pool = []
      @terminator = Event.new
      @mutex = Mutex.new

      @busy = []
      @idle = []
    end

    def <<(block)
      self.post(&block)
      return self
    end

    def post(*args, &block)
      raise ArgumentError.new('no block given') if block.nil?
      @mutex.synchronize do
        break false unless @state == :running

        if @idle.empty?
          if @idle.length + @busy.length < @max_threads
            worker = create_worker_thread
          else
            worker = @busy.shift
          end
        else
          worker = @idle.pop
        end

        @busy.push(worker)
        worker.signal(*args, &block)

        prune_stale_workers
        true
      end
    end

    def running?
      return @state == :running
    end

    def wait_for_termination(timeout = nil)
      return @terminator.wait(timeout)
    end

    def shutdown
      @mutex.synchronize do
        break unless @state == :running
        if @idle.empty? && @busy.empty?
          @state = :shutdown
          @terminator.set
        else
          @state = :shuttingdown
          @idle.each{|worker| worker.stop }
          @busy.each{|worker| worker.stop }
        end
      end
    end

    def kill
      @mutex.synchronize do
        break if @state == :shutdown
        @state = :shutdown
          @idle.each{|worker| worker.kill }
          @busy.each{|worker| worker.kill }
        @terminator.set
      end
    end

    def length
      @mutex.synchronize do
        @state == :running ? @busy.length + @idle.length : 0
      end
    end

    def on_worker_exit(worker)
      @mutex.synchronize do
        @idle.delete(worker)
        @busy.delete(worker)
        if @idle.empty? && @busy.empty? && @state != :running
          @state = :shutdown
          @terminator.set
        end
      end
    end

    def on_end_task(worker)
      @mutex.synchronize do
        break unless @state == :running
        @busy.delete(worker)
        @idle.push(worker)
      end
    end

    protected

    def create_worker_thread
      wrkr = Worker.new(self)
      Thread.new(wrkr, self) do |worker, parent|
        Thread.current.abort_on_exception = false
        worker.run
        parent.on_worker_exit(worker)
      end
      return wrkr
    end

    def prune_stale_workers
      @idle.reject! do |worker|
        if worker.idletime > @idletime
          worker.stop
          true
        else
          worker.dead?
        end
      end
    end
  end
end