class Rake::ThreadPool
def future(*args,&block)
current thread until the future is finished and will return the
pool. Sending #value to the object will sleep the
a future which has been created and added to the queue in the
Thread#new) The return value is an object representing
The args are passed to the block when executing (similarly to
Creates a future executed by the +ThreadPool+.
def future(*args,&block) # capture the local args for the block (like Thread#start) local_args = args.collect { |a| begin; a.dup; rescue; a; end } promise_mutex = Mutex.new promise_result = promise_error = NOT_SET # (promise code builds on Ben Lavender's public-domain 'promise' gem) promise = lambda do # return immediately if the future has been executed unless promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET) return promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error) end # try to get the lock and execute the promise, otherwise, sleep. if promise_mutex.try_lock if promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET) #execute the promise begin promise_result = block.call(*local_args) rescue Exception => e promise_error = e end block = local_args = nil # GC can now clean these up end promise_mutex.unlock else # Even if we didn't get the lock, we need to sleep until the # promise has finished executing. If, however, the current # thread is part of the thread pool, we need to free up a # new thread in the pool so there will always be a thread # doing work. wait_for_promise = lambda { stat :waiting, item_id: promise.object_id promise_mutex.synchronize {} stat :continue, item_id: promise.object_id } unless @threads_mon.synchronize { @threads.include? Thread.current } wait_for_promise.call else @threads_mon.synchronize { @max_active_threads += 1 } start_thread wait_for_promise.call @threads_mon.synchronize { @max_active_threads -= 1 } end end promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error) end def promise.value call end @queue.enq promise stat :item_queued, item_id: promise.object_id start_thread promise end