lib/concurrent-ruby/concurrent/concern/obligation.rb



require 'thread'
require 'timeout'

require 'concurrent/atomic/event'
require 'concurrent/concern/dereferenceable'

module Concurrent
  module Concern

    module Obligation
      include Concern::Dereferenceable
      # NOTE: The Dereferenceable module is going away in 2.0. In the mean time
      # we need it to place nicely with the synchronization layer. This means
      # that the including class SHOULD be synchronized and it MUST implement a
      # `#synchronize` method. Not doing so will lead to runtime errors.

      # Has the obligation been fulfilled?
      #
      # @return [Boolean]
      def fulfilled?
        state == :fulfilled
      end
      alias_method :realized?, :fulfilled?

      # Has the obligation been rejected?
      #
      # @return [Boolean]
      def rejected?
        state == :rejected
      end

      # Is obligation completion still pending?
      #
      # @return [Boolean]
      def pending?
        state == :pending
      end

      # Is the obligation still unscheduled?
      #
      # @return [Boolean]
      def unscheduled?
        state == :unscheduled
      end

      # Has the obligation completed processing?
      #
      # @return [Boolean]
      def complete?
        [:fulfilled, :rejected].include? state
      end

      # Is the obligation still awaiting completion of processing?
      #
      # @return [Boolean]
      def incomplete?
        ! complete?
      end

      # The current value of the obligation. Will be `nil` while the state is
      # pending or the operation has been rejected.
      #
      # @param [Numeric] timeout the maximum time in seconds to wait.
      # @return [Object] see Dereferenceable#deref
      def value(timeout = nil)
        wait timeout
        deref
      end

      # Wait until obligation is complete or the timeout has been reached.
      #
      # @param [Numeric] timeout the maximum time in seconds to wait.
      # @return [Obligation] self
      def wait(timeout = nil)
        event.wait(timeout) if timeout != 0 && incomplete?
        self
      end

      # Wait until obligation is complete or the timeout is reached. Will re-raise
      # any exceptions raised during processing (but will not raise an exception
      # on timeout).
      #
      # @param [Numeric] timeout the maximum time in seconds to wait.
      # @return [Obligation] self
      # @raise [Exception] raises the reason when rejected
      def wait!(timeout = nil)
        wait(timeout).tap { raise self if rejected? }
      end
      alias_method :no_error!, :wait!

      # The current value of the obligation. Will be `nil` while the state is
      # pending or the operation has been rejected. Will re-raise any exceptions
      # raised during processing (but will not raise an exception on timeout).
      #
      # @param [Numeric] timeout the maximum time in seconds to wait.
      # @return [Object] see Dereferenceable#deref
      # @raise [Exception] raises the reason when rejected
      def value!(timeout = nil)
        wait(timeout)
        if rejected?
          raise self
        else
          deref
        end
      end

      # The current state of the obligation.
      #
      # @return [Symbol] the current state
      def state
        synchronize { @state }
      end

      # If an exception was raised during processing this will return the
      # exception object. Will return `nil` when the state is pending or if
      # the obligation has been successfully fulfilled.
      #
      # @return [Exception] the exception raised during processing or `nil`
      def reason
        synchronize { @reason }
      end

      # @example allows Obligation to be risen
      #   rejected_ivar = Ivar.new.fail
      #   raise rejected_ivar
      def exception(*args)
        raise 'obligation is not rejected' unless rejected?
        reason.exception(*args)
      end

      protected

      # @!visibility private
      def get_arguments_from(opts = {})
        [*opts.fetch(:args, [])]
      end

      # @!visibility private
      def init_obligation
        @event = Event.new
        @value = @reason = nil
      end

      # @!visibility private
      def event
        @event
      end

      # @!visibility private
      def set_state(success, value, reason)
        if success
          @value = value
          @state = :fulfilled
        else
          @reason = reason
          @state  = :rejected
        end
      end

      # @!visibility private
      def state=(value)
        synchronize { ns_set_state(value) }
      end

      # Atomic compare and set operation
      # State is set to `next_state` only if `current state == expected_current`.
      #
      # @param [Symbol] next_state
      # @param [Symbol] expected_current
      #
      # @return [Boolean] true is state is changed, false otherwise
      #
      # @!visibility private
      def compare_and_set_state(next_state, *expected_current)
        synchronize do
          if expected_current.include? @state
            @state = next_state
            true
          else
            false
          end
        end
      end

      # Executes the block within mutex if current state is included in expected_states
      #
      # @return block value if executed, false otherwise
      #
      # @!visibility private
      def if_state(*expected_states)
        synchronize do
          raise ArgumentError.new('no block given') unless block_given?

          if expected_states.include? @state
            yield
          else
            false
          end
        end
      end

      protected

      # Am I in the current state?
      #
      # @param [Symbol] expected The state to check against
      # @return [Boolean] true if in the expected state else false
      #
      # @!visibility private
      def ns_check_state?(expected)
        @state == expected
      end

      # @!visibility private
      def ns_set_state(value)
        @state = value
      end
    end
  end
end