class Rake::ThreadPool

:nodoc: all

def __queue__ # :nodoc:

:nodoc:
def __queue__ # :nodoc:
  @queue
end

def __threads__ # :nodoc:

:nodoc:
def __threads__ # :nodoc:
  @threads.dup
end

def future(*args, &block)

result (or raise an exception thrown from the future)
current thread until the future is finished and will return the
pool. Sending #value to the object will sleep the
a future which has been created and added to the queue in the
Thread#new) The return value is an object representing
The args are passed to the block when executing (similarly to

Creates a future executed by the +ThreadPool+.
def future(*args, &block)
  promise = Promise.new(args, &block)
  promise.recorder = lambda { |*stats| stat(*stats) }
  @queue.enq promise
  stat :queued, :item_id => promise.object_id
  start_thread
  promise
end

def gather_history #:nodoc:

:nodoc:
Enable the gathering of history events.
def gather_history          #:nodoc:
  @history_start_time = Time.now if @history_start_time.nil?
end

def history # :nodoc:

:nodoc:
complete (i.e. after ThreadPool#join is called).
(see #gather_history). Best to call this when the job is
History gathering must be enabled to be able to see the events

Return a array of history events for the thread pool.
def history                 # :nodoc:
  @history_mon.synchronize { @history.dup }.
    sort_by { |i| i[:time] }.
    each { |i| i[:time] -= @history_start_time }
end

def initialize(thread_count)

The parameter is the size of the pool.
Creates a ThreadPool object.
def initialize(thread_count)
  @max_active_threads = [thread_count, 0].max
  @threads = Set.new
  @threads_mon = Monitor.new
  @queue = Queue.new
  @join_cond = @threads_mon.new_cond
  @history_start_time = nil
  @history = []
  @history_mon = Monitor.new
  @total_threads_in_play = 0
end

def join

Waits until the queue of futures is empty and all threads have exited.
def join
  @threads_mon.synchronize do
    begin
      stat :joining
      @join_cond.wait unless @threads.empty?
      stat :joined
    rescue Exception => e
      stat :joined
      $stderr.puts e
      $stderr.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n"
      $stderr.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n"
      $stderr.puts e.backtrace.join("\n")
      @threads.each do |t|
        $stderr.print "Thread #{t} status = #{t.status}\n"
        # 1.8 doesn't support Thread#backtrace
        $stderr.puts t.backtrace.join("\n") if t.respond_to? :backtrace
      end
      raise e
    end
  end
end

def process_queue_item #:nodoc:

:nodoc:
item to process, false if there was no item
processes one item on the queue. Returns true if there was an
def process_queue_item      #:nodoc:
  return false if @queue.empty?
  # Even though we just asked if the queue was empty, it
  # still could have had an item which by this statement
  # is now gone. For this reason we pass true to Queue#deq
  # because we will sleep indefinitely if it is empty.
  promise = @queue.deq(true)
  stat :dequeued, :item_id => promise.object_id
  promise.work
  return true
  rescue ThreadError # this means the queue is empty
  false
end

def start_thread # :nodoc:

:nodoc:
def start_thread # :nodoc:
  @threads_mon.synchronize do
    next unless @threads.count < @max_active_threads
    t = Thread.new do
      begin
        while @threads.count <= @max_active_threads
          break unless process_queue_item
        end
      ensure
        @threads_mon.synchronize do
          @threads.delete Thread.current
          stat :ended, :thread_count => @threads.count
          @join_cond.broadcast if @threads.empty?
        end
      end
    end
    @threads << t
    stat :spawned, :new_thread => t.object_id, :thread_count => @threads.count
    @total_threads_in_play = @threads.count if @threads.count > @total_threads_in_play
  end
end

def stat(event, data=nil) # :nodoc:

:nodoc:
def stat(event, data=nil) # :nodoc:
  return if @history_start_time.nil?
  info = {
    :event  => event,
    :data   => data,
    :time   => Time.now,
    :thread => Thread.current.object_id,
  }
  @history_mon.synchronize { @history << info }
end

def statistics # :nodoc:

:nodoc:
Return a hash of always collected statistics for the thread pool.
def statistics              #  :nodoc:
  {
    :total_threads_in_play => @total_threads_in_play,
    :max_active_threads => @max_active_threads,
  }
end