class Rake::ThreadPool

:nodoc: all

def __queue__ # :nodoc:

:nodoc:
def __queue__ # :nodoc:
  @queue
end

def __threads__ # :nodoc:

:nodoc:
def __threads__ # :nodoc:
  @threads.dup
end

def future(*args,&block)

result (or raise an exception thrown from the future)
current thread until the future is finished and will return the
pool. Sending #value to the object will sleep the
a future which has been created and added to the queue in the
Thread#new) The return value is an object representing
The args are passed to the block when executing (similarly to

Creates a future executed by the +ThreadPool+.
def future(*args,&block)
  # capture the local args for the block (like Thread#start)
  local_args = args.collect { |a| begin; a.dup; rescue; a; end }
  promise_mutex = Mutex.new
  promise_result = promise_error = NOT_SET
  # (promise code builds on Ben Lavender's public-domain 'promise' gem)
  promise = lambda do
    # return immediately if the future has been executed
    unless promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
      return promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
    end
    # try to get the lock and execute the promise, otherwise, sleep.
    if promise_mutex.try_lock
      if promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
        #execute the promise
        begin
          promise_result = block.call(*local_args)
        rescue Exception => e
          promise_error = e
        end
        block = local_args = nil # GC can now clean these up
      end
      promise_mutex.unlock
    else
      # Even if we didn't get the lock, we need to sleep until the
      # promise has finished executing. If, however, the current
      # thread is part of the thread pool, we need to free up a
      # new thread in the pool so there will always be a thread
      # doing work.
      wait_for_promise = lambda {
        stat :waiting, item_id: promise.object_id
        promise_mutex.synchronize {}
        stat :continue, item_id: promise.object_id
      }
      unless @threads_mon.synchronize { @threads.include? Thread.current }
        wait_for_promise.call
      else
        @threads_mon.synchronize { @max_active_threads += 1 }
        start_thread
        wait_for_promise.call
        @threads_mon.synchronize { @max_active_threads -= 1 }
      end
    end
    promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
  end
  def promise.value
    call
  end
  @queue.enq promise
  stat :item_queued, item_id: promise.object_id
  start_thread
  promise
end

def gather_history #:nodoc:

:nodoc:
Enable the gathering of history events.
def gather_history          #:nodoc:
  @history_start_time = Time.now if @history_start_time.nil?
end

def history # :nodoc:

:nodoc:
complete (i.e. after ThreadPool#join is called).
(see #gather_history). Best to call this when the job is
History gathering must be enabled to be able to see the events

Return a array of history events for the thread pool.
def history                 # :nodoc:
  @history_mon.synchronize { @history.dup }
end

def initialize(thread_count)

The parameter is the size of the pool.
Creates a ThreadPool object.
def initialize(thread_count)
  @max_active_threads = [thread_count, 0].max
  @threads = Set.new
  @threads_mon = Monitor.new
  @queue = Queue.new
  @join_cond = @threads_mon.new_cond
  @history_start_time = nil
  @history = []
  @history_mon = Monitor.new
  @total_threads_in_play = 0
end

def join

Waits until the queue of futures is empty and all threads have exited.
def join
  @threads_mon.synchronize do
    begin
        @join_cond.wait unless @threads.empty?
    rescue Exception => e
      $stderr.puts e
      $stderr.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n"
      $stderr.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n"
      $stderr.puts e.backtrace.join("\n")
      @threads.each do |t|
        $stderr.print "Thread #{t} status = #{t.status}\n"
        $stderr.puts t.backtrace.join("\n") if t.respond_to? :backtrace
      end
      raise e
    end
  end
end

def start_thread # :nodoc:

:nodoc:
def start_thread # :nodoc:
  @threads_mon.synchronize do
    next unless @threads.count < @max_active_threads
    t = Thread.new do
      begin
        while @threads.count <= @max_active_threads && !@queue.empty? do
          # Even though we just asked if the queue was empty, it
          # still could have had an item which by this statement
          # is now gone. For this reason we pass true to Queue#deq
          # because we will sleep indefinitely if it is empty.
          block = @queue.deq(true)
          stat :item_dequeued, item_id: block.object_id
          block.call
        end
      rescue ThreadError # this means the queue is empty
      ensure
        @threads_mon.synchronize do
          @threads.delete Thread.current
          stat :thread_deleted, deleted_thread: Thread.current.object_id, thread_count: @threads.count
          @join_cond.broadcast if @threads.empty?
        end
      end
    end
    @threads << t
    stat :thread_created, new_thread: t.object_id, thread_count: @threads.count
    @total_threads_in_play = @threads.count if @threads.count > @total_threads_in_play
  end
end

def stat(event, data=nil) # :nodoc:

:nodoc:
def stat(event, data=nil) # :nodoc:
  return if @history_start_time.nil?
  info = {
    event: event,
    data: data,
    time: (Time.now-@history_start_time),
    thread: Thread.current.object_id,
  }
  @history_mon.synchronize { @history << info }
end

def statistics # :nodoc:

:nodoc:
Return a hash of always collected statistics for the thread pool.
def statistics              #  :nodoc:
  {
    total_threads_in_play: @total_threads_in_play,
    max_active_threads: @max_active_threads,
  }
end