class Concurrent::CachedThreadPool
def <<(block)
def <<(block) self.post(&block) return self end
def create_worker_thread
def create_worker_thread wrkr = Worker.new(self) Thread.new(wrkr, self) do |worker, parent| Thread.current.abort_on_exception = false worker.run parent.on_worker_exit(worker) end return wrkr end
def initialize(opts = {})
def initialize(opts = {}) @idletime = (opts[:idletime] || DEFAULT_THREAD_IDLETIME).to_i raise ArgumentError.new('idletime must be greater than zero') if @idletime <= 0 @max_threads = opts[:max_threads] || opts[:max] || MAX_POOL_SIZE if @max_threads < MIN_POOL_SIZE || @max_threads > MAX_POOL_SIZE raise ArgumentError.new("size must be from #{MIN_POOL_SIZE} to #{MAX_POOL_SIZE}") end @state = :running @pool = [] @terminator = Event.new @mutex = Mutex.new @busy = [] @idle = [] end
def kill
def kill @mutex.synchronize do break if @state == :shutdown @state = :shutdown @idle.each{|worker| worker.kill } @busy.each{|worker| worker.kill } @terminator.set end end
def length
def length @mutex.synchronize do @state == :running ? @busy.length + @idle.length : 0 end end
def on_end_task(worker)
def on_end_task(worker) @mutex.synchronize do break unless @state == :running @busy.delete(worker) @idle.push(worker) end end
def on_worker_exit(worker)
def on_worker_exit(worker) @mutex.synchronize do @idle.delete(worker) @busy.delete(worker) if @idle.empty? && @busy.empty? && @state != :running @state = :shutdown @terminator.set end end end
def post(*args, &block)
def post(*args, &block) raise ArgumentError.new('no block given') if block.nil? @mutex.synchronize do break false unless @state == :running if @idle.empty? if @idle.length + @busy.length < @max_threads worker = create_worker_thread else worker = @busy.shift end else worker = @idle.pop end @busy.push(worker) worker.signal(*args, &block) prune_stale_workers true end end
def prune_stale_workers
def prune_stale_workers @idle.reject! do |worker| if worker.idletime > @idletime worker.stop true else worker.dead? end end end
def running?
def running? return @state == :running end
def shutdown
def shutdown @mutex.synchronize do break unless @state == :running if @idle.empty? && @busy.empty? @state = :shutdown @terminator.set else @state = :shuttingdown @idle.each{|worker| worker.stop } @busy.each{|worker| worker.stop } end end end
def wait_for_termination(timeout = nil)
def wait_for_termination(timeout = nil) return @terminator.wait(timeout) end