lib/celluloid/future.rb
module Celluloid # Celluloid::Future objects allow methods and blocks to run in the # background, their values requested later class Future def self.new(*args, &block) return super unless block future = new # task = Thread.current[:celluloid_task] # actor = Thread.current[:celluloid_actor] Internals::ThreadHandle.new(Celluloid.actor_system, :future) do begin # Thread.current[:celluloid_task] = task # Thread.current[:celluloid_actor] = actor call = Call::Sync.new(future, :call, args) call.dispatch(block) rescue # Exceptions in blocks will get raised when the value is retrieved end end future end attr_reader :address def initialize(&block) @address = Celluloid.uuid @mutex = Mutex.new @ready = false @result = nil @forwards = nil @cancelled = false if block @call = Call::Sync.new(self, :call, args) Celluloid.internal_pool.get do begin @call.dispatch(block) rescue # Exceptions in blocks will get raised when the value is retrieved end end else @call = nil end end # Execute the given method in future context def execute(receiver, method, args, block) @mutex.synchronize do raise "already calling" if @call @call = Call::Sync.new(self, method, args, block) end receiver << @call end # Check if this future has a value yet def ready? @ready end # Obtain the value for this Future def value(timeout = nil) ready = result = nil begin @mutex.lock if @ready ready = true result = @result else case @forwards when Array @forwards << Celluloid.mailbox when NilClass @forwards = Celluloid.mailbox else @forwards = [@forwards, Celluloid.mailbox] end end ensure @mutex.unlock end unless ready result = Celluloid.receive(timeout) do |msg| msg.is_a?(Future::Result) && msg.future == self end end if result result.respond_to?(:value) ? result.value : result else raise TimedOut, "Timed out" end end alias call value # Signal this future with the given result value def signal(value) return if @cancelled result = Result.new(value, self) @mutex.synchronize do raise "the future has already happened!" if @ready if @forwards @forwards.is_a?(Array) ? @forwards.each { |f| f << result } : @forwards << result end @result = result @ready = true end end alias << signal def cancel(error) response = Internals::Response::Error.new(@call, error) signal response @mutex.synchronize do @cancelled = true end end # Inspect this Celluloid::Future alias inspect to_s # Wrapper for result values to distinguish them in mailboxes class Result attr_reader :future def initialize(result, future) @result = result @future = future end def value @result.value end end end end