lib/concurrent-ruby/concurrent/delay.rb



require 'thread'
require 'concurrent/concern/obligation'
require 'concurrent/executor/immediate_executor'
require 'concurrent/synchronization/lockable_object'

module Concurrent

  # This file has circular require issues. It must be autoloaded here.
  autoload :Options, 'concurrent/options'

  # Lazy evaluation of a block yielding an immutable result. Useful for
  # expensive operations that may never be needed. It may be non-blocking,
  # supports the `Concern::Obligation` interface, and accepts the injection of
  # custom executor upon which to execute the block. Processing of
  # block will be deferred until the first time `#value` is called.
  # At that time the caller can choose to return immediately and let
  # the block execute asynchronously, block indefinitely, or block
  # with a timeout.
  #
  # When a `Delay` is created its state is set to `pending`. The value and
  # reason are both `nil`. The first time the `#value` method is called the
  # enclosed opration will be run and the calling thread will block. Other
  # threads attempting to call `#value` will block as well. Once the operation
  # is complete the *value* will be set to the result of the operation or the
  # *reason* will be set to the raised exception, as appropriate. All threads
  # blocked on `#value` will return. Subsequent calls to `#value` will immediately
  # return the cached value. The operation will only be run once. This means that
  # any side effects created by the operation will only happen once as well.
  #
  # `Delay` includes the `Concurrent::Concern::Dereferenceable` mixin to support thread
  # safety of the reference returned by `#value`.
  #
  # @!macro copy_options
  #
  # @!macro delay_note_regarding_blocking
  #   @note The default behavior of `Delay` is to block indefinitely when
  #     calling either `value` or `wait`, executing the delayed operation on
  #     the current thread. This makes the `timeout` value completely
  #     irrelevant. To enable non-blocking behavior, use the `executor`
  #     constructor option. This will cause the delayed operation to be
  #     execute on the given executor, allowing the call to timeout.
  #
  # @see Concurrent::Concern::Dereferenceable
  class Delay < Synchronization::LockableObject
    include Concern::Obligation

    # NOTE: Because the global thread pools are lazy-loaded with these objects
    # there is a performance hit every time we post a new task to one of these
    # thread pools. Subsequently it is critical that `Delay` perform as fast
    # as possible post-completion. This class has been highly optimized using
    # the benchmark script `examples/lazy_and_delay.rb`. Do NOT attempt to
    # DRY-up this class or perform other refactoring with running the
    # benchmarks and ensuring that performance is not negatively impacted.

    # Create a new `Delay` in the `:pending` state.
    #
    # @!macro executor_and_deref_options
    #
    # @yield the delayed operation to perform
    #
    # @raise [ArgumentError] if no block is given
    def initialize(opts = {}, &block)
      raise ArgumentError.new('no block given') unless block_given?
      super(&nil)
      synchronize { ns_initialize(opts, &block) }
    end

    # Return the value this object represents after applying the options
    # specified by the `#set_deref_options` method. If the delayed operation
    # raised an exception this method will return nil. The exception object
    # can be accessed via the `#reason` method.
    #
    # @param [Numeric] timeout the maximum number of seconds to wait
    # @return [Object] the current value of the object
    #
    # @!macro delay_note_regarding_blocking
    def value(timeout = nil)
      if @executor # TODO (pitr 12-Sep-2015): broken unsafe read?
        super
      else
        # this function has been optimized for performance and
        # should not be modified without running new benchmarks
        synchronize do
          execute = @evaluation_started = true unless @evaluation_started
          if execute
            begin
              set_state(true, @task.call, nil)
            rescue => ex
              set_state(false, nil, ex)
            end
          elsif incomplete?
            raise IllegalOperationError, 'Recursive call to #value during evaluation of the Delay'
          end
        end
        if @do_nothing_on_deref
          @value
        else
          apply_deref_options(@value)
        end
      end
    end

    # Return the value this object represents after applying the options
    # specified by the `#set_deref_options` method. If the delayed operation
    # raised an exception, this method will raise that exception (even when)
    # the operation has already been executed).
    #
    # @param [Numeric] timeout the maximum number of seconds to wait
    # @return [Object] the current value of the object
    # @raise [Exception] when `#rejected?` raises `#reason`
    #
    # @!macro delay_note_regarding_blocking
    def value!(timeout = nil)
      if @executor
        super
      else
        result = value
        raise @reason if @reason
        result
      end
    end

    # Return the value this object represents after applying the options
    # specified by the `#set_deref_options` method.
    #
    # @param [Integer] timeout (nil) the maximum number of seconds to wait for
    #   the value to be computed. When `nil` the caller will block indefinitely.
    #
    # @return [Object] self
    #
    # @!macro delay_note_regarding_blocking
    def wait(timeout = nil)
      if @executor
        execute_task_once
        super(timeout)
      else
        value
      end
      self
    end

    # Reconfigures the block returning the value if still `#incomplete?`
    #
    # @yield the delayed operation to perform
    # @return [true, false] if success
    def reconfigure(&block)
      synchronize do
        raise ArgumentError.new('no block given') unless block_given?
        unless @evaluation_started
          @task = block
          true
        else
          false
        end
      end
    end

    protected

    def ns_initialize(opts, &block)
      init_obligation
      set_deref_options(opts)
      @executor = opts[:executor]

      @task               = block
      @state              = :pending
      @evaluation_started = false
    end

    private

    # @!visibility private
    def execute_task_once # :nodoc:
      # this function has been optimized for performance and
      # should not be modified without running new benchmarks
      execute = task = nil
      synchronize do
        execute = @evaluation_started = true unless @evaluation_started
        task    = @task
      end

      if execute
        executor = Options.executor_from_options(executor: @executor)
        executor.post do
          begin
            result  = task.call
            success = true
          rescue => ex
            reason = ex
          end
          synchronize do
            set_state(success, result, reason)
            event.set
          end
        end
      end
    end
  end
end