lib/concurrent/ivar.rb
require 'thread' require 'concurrent/obligation' require 'concurrent/observable' module Concurrent MultipleAssignmentError = Class.new(StandardError) class IVar include Obligation include Concurrent::Observable NO_VALUE = Object.new # Create a new `Ivar` in the `:pending` state with the (optional) initial value. # # @param [Object] value the initial value # @param [Hash] opts the options to create a message with # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and # returning the value returned from the proc def initialize(value = NO_VALUE, opts = {}) init_obligation self.observers = CopyOnWriteObserverSet.new set_deref_options(opts) if value == NO_VALUE @state = :pending else set(value) end end # Add an observer on this object that will receive notification on update. # # Upon completion the `IVar` will notify all observers in a thread-say way. The `func` # method of the observer will be called with three arguments: the `Time` at which the # `Future` completed the asynchronous operation, the final `value` (or `nil` on rejection), # and the final `reason` (or `nil` on fulfillment). # # @param [Object] observer the object that will be notified of changes # @param [Symbol] func symbol naming the method to call when this `Observable` has changes` def add_observer(observer = nil, func = :update, &block) raise ArgumentError.new('cannot provide both an observer and a block') if observer && block direct_notification = false if block observer = block func = :call end mutex.synchronize do if event.set? direct_notification = true else observers.add_observer(observer, func) end end observer.send(func, Time.now, self.value, reason) if direct_notification observer end def set(value) complete(true, value, nil) end def fail(reason = StandardError.new) complete(false, nil, reason) end def complete(success, value, reason) mutex.synchronize do raise MultipleAssignmentError.new('multiple assignment') if [:fulfilled, :rejected].include? @state set_state(success, value, reason) event.set end time = Time.now observers.notify_and_delete_observers{ [time, self.value, reason] } self end end end