lib/concurrent-ruby/concurrent/ivar.rb



require 'concurrent/constants'
require 'concurrent/errors'
require 'concurrent/collection/copy_on_write_observer_set'
require 'concurrent/concern/obligation'
require 'concurrent/concern/observable'
require 'concurrent/executor/safe_task_executor'
require 'concurrent/synchronization/lockable_object'

module Concurrent

  # An `IVar` is like a future that you can assign. As a future is a value that
  # is being computed that you can wait on, an `IVar` is a value that is waiting
  # to be assigned, that you can wait on. `IVars` are single assignment and
  # deterministic.
  #
  # Then, express futures as an asynchronous computation that assigns an `IVar`.
  # The `IVar` becomes the primitive on which [futures](Future) and
  # [dataflow](Dataflow) are built.
  #
  # An `IVar` is a single-element container that is normally created empty, and
  # can only be set once. The I in `IVar` stands for immutable. Reading an
  # `IVar` normally blocks until it is set. It is safe to set and read an `IVar`
  # from different threads.
  #
  # If you want to have some parallel task set the value in an `IVar`, you want
  # a `Future`. If you want to create a graph of parallel tasks all executed
  # when the values they depend on are ready you want `dataflow`. `IVar` is
  # generally a low-level primitive.
  #
  # ## Examples
  #
  # Create, set and get an `IVar`
  #
  # ```ruby
  # ivar = Concurrent::IVar.new
  # ivar.set 14
  # ivar.value #=> 14
  # ivar.set 2 # would now be an error
  # ```
  #
  # ## See Also
  #
  # 1. For the theory: Arvind, R. Nikhil, and K. Pingali.
  #    [I-Structures: Data structures for parallel computing](http://dl.acm.org/citation.cfm?id=69562).
  #    In Proceedings of Workshop on Graph Reduction, 1986.
  # 2. For recent application:
  #    [DataDrivenFuture in Habanero Java from Rice](http://www.cs.rice.edu/~vs3/hjlib/doc/edu/rice/hj/api/HjDataDrivenFuture.html).
  class IVar < Synchronization::LockableObject
    include Concern::Obligation
    include Concern::Observable

    # Create a new `IVar` in the `:pending` state with the (optional) initial value.
    #
    # @param [Object] value the initial value
    # @param [Hash] opts the options to create a message with
    # @option opts [String] :dup_on_deref (false) call `#dup` before returning
    #   the data
    # @option opts [String] :freeze_on_deref (false) call `#freeze` before
    #   returning the data
    # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing
    #   the internal value and returning the value returned from the proc
    def initialize(value = NULL, opts = {}, &block)
      if value != NULL && block_given?
        raise ArgumentError.new('provide only a value or a block')
      end
      super(&nil)
      synchronize { ns_initialize(value, opts, &block) }
    end

    # Add an observer on this object that will receive notification on update.
    #
    # Upon completion the `IVar` will notify all observers in a thread-safe way.
    # The `func` method of the observer will be called with three arguments: the
    # `Time` at which the `Future` completed the asynchronous operation, the
    # final `value` (or `nil` on rejection), and the final `reason` (or `nil` on
    # fulfillment).
    #
    # @param [Object] observer the object that will be notified of changes
    # @param [Symbol] func symbol naming the method to call when this
    #   `Observable` has changes`
    def add_observer(observer = nil, func = :update, &block)
      raise ArgumentError.new('cannot provide both an observer and a block') if observer && block
      direct_notification = false

      if block
        observer = block
        func = :call
      end

      synchronize do
        if event.set?
          direct_notification = true
        else
          observers.add_observer(observer, func)
        end
      end

      observer.send(func, Time.now, self.value, reason) if direct_notification
      observer
    end

    # @!macro ivar_set_method
    #   Set the `IVar` to a value and wake or notify all threads waiting on it.
    #
    #   @!macro ivar_set_parameters_and_exceptions
    #     @param [Object] value the value to store in the `IVar`
    #     @yield A block operation to use for setting the value
    #     @raise [ArgumentError] if both a value and a block are given
    #     @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already
    #       been set or otherwise completed
    #
    #   @return [IVar] self
    def set(value = NULL)
      check_for_block_or_value!(block_given?, value)
      raise MultipleAssignmentError unless compare_and_set_state(:processing, :pending)

      begin
        value = yield if block_given?
        complete_without_notification(true, value, nil)
      rescue => ex
        complete_without_notification(false, nil, ex)
      end

      notify_observers(self.value, reason)
      self
    end

    # @!macro ivar_fail_method
    #   Set the `IVar` to failed due to some error and wake or notify all threads waiting on it.
    #
    #   @param [Object] reason for the failure
    #   @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already
    #     been set or otherwise completed
    #   @return [IVar] self
    def fail(reason = StandardError.new)
      complete(false, nil, reason)
    end

    # Attempt to set the `IVar` with the given value or block. Return a
    # boolean indicating the success or failure of the set operation.
    #
    # @!macro ivar_set_parameters_and_exceptions
    #
    # @return [Boolean] true if the value was set else false
    def try_set(value = NULL, &block)
      set(value, &block)
      true
    rescue MultipleAssignmentError
      false
    end

    protected

    # @!visibility private
    def ns_initialize(value, opts)
      value = yield if block_given?
      init_obligation
      self.observers = Collection::CopyOnWriteObserverSet.new
      set_deref_options(opts)

      @state = :pending
      if value != NULL
        ns_complete_without_notification(true, value, nil)
      end
    end

    # @!visibility private
    def safe_execute(task, args = [])
      if compare_and_set_state(:processing, :pending)
        success, val, reason = SafeTaskExecutor.new(task, rescue_exception: true).execute(*@args)
        complete(success, val, reason)
        yield(success, val, reason) if block_given?
      end
    end

    # @!visibility private
    def complete(success, value, reason)
      complete_without_notification(success, value, reason)
      notify_observers(self.value, reason)
      self
    end

    # @!visibility private
    def complete_without_notification(success, value, reason)
      synchronize { ns_complete_without_notification(success, value, reason) }
      self
    end

    # @!visibility private
    def notify_observers(value, reason)
      observers.notify_and_delete_observers{ [Time.now, value, reason] }
    end

    # @!visibility private
    def ns_complete_without_notification(success, value, reason)
      raise MultipleAssignmentError if [:fulfilled, :rejected].include? @state
      set_state(success, value, reason)
      event.set
    end

    # @!visibility private
    def check_for_block_or_value!(block_given, value) # :nodoc:
      if (block_given && value != NULL) || (! block_given && value == NULL)
        raise ArgumentError.new('must set with either a value or a block')
      end
    end
  end
end