class Concurrent::Promises::AbstractEventFuture

Common ancestor of {Event} and {Future} classes, many shared methods are defined here.

def add_callback(method, *args)

def add_callback(method, *args)
  state = internal_state
  if state.resolved?
    call_callback method, state, args
  else
    @Callbacks.push [method, args]
    state = internal_state
    # take back if it was resolved in the meanwhile
    call_callbacks state if state.resolved?
  end
  self
end

def add_callback_clear_delayed_node(node)

@!visibility private
def add_callback_clear_delayed_node(node)
  add_callback(:callback_clear_delayed_node, node)
end

def add_callback_notify_blocked(promise, index)

@!visibility private
def add_callback_notify_blocked(promise, index)
  add_callback :callback_notify_blocked, promise, index
end

def async_callback_on_resolution(state, executor, args, callback)

def async_callback_on_resolution(state, executor, args, callback)
  with_async(executor, state, args, callback) do |st, ar, cb|
    callback_on_resolution st, ar, cb
  end
end

def blocks

Returns:
  • (Array) -
def blocks
  @Callbacks.each_with_object([]) do |(method, args), promises|
    promises.push(args[0]) if method == :callback_notify_blocked
  end
end

def call_callback(method, state, args)

def call_callback(method, state, args)
  self.send method, state, *args
end

def call_callbacks(state)

def call_callbacks(state)
  method, args = @Callbacks.pop
  while method
    call_callback method, state, args
    method, args = @Callbacks.pop
  end
end

def callback_clear_delayed_node(state, node)

def callback_clear_delayed_node(state, node)
  node.value = nil
end

def callback_notify_blocked(state, promise, index)

def callback_notify_blocked(state, promise, index)
  promise.on_blocker_resolution self, index
end

def callbacks

@!visibility private
For inspection.
def callbacks
  @Callbacks.each.to_a
end

def chain(*args, &task)

Returns:
  • (Future) -
def chain(*args, &task)
  chain_on @DefaultExecutor, *args, &task
end

def chain_on(executor, *args, &task)

Other tags:
    Yieldparam: reason -
    Yieldparam: value -
    Yieldparam: fulfilled -

Other tags:
    Yield: - to the task.
    Yield: - to the task.

Overloads:
  • a_future.chain_on(executor, *args, &task)
  • an_event.chain_on(executor, *args, &task)

Returns:
  • (Future) -
def chain_on(executor, *args, &task)
  ChainPromise.new_blocked_by1(self, executor, executor, args, &task).future
end

def chain_resolvable(resolvable)

Returns:
  • (self) -

Parameters:
  • resolvable (Resolvable) --
def chain_resolvable(resolvable)
  on_resolution! { resolvable.resolve_with internal_state }
end

def default_executor

Other tags:
    See: similar -
    See: FactoryMethods#any_fulfilled_future_on -
    See: FactoryMethods#resolvable_future -
    See: FactoryMethods#future_on -
    See: #with_default_executor -

Returns:
  • (Executor) - default executor
def default_executor
  @DefaultExecutor
end

def initialize(promise, default_executor)

def initialize(promise, default_executor)
  super()
  @Lock               = Mutex.new
  @Condition          = ConditionVariable.new
  @Promise            = promise
  @DefaultExecutor    = default_executor
  @Callbacks          = LockFreeStack.new
  @Waiters            = AtomicFixnum.new 0
  self.internal_state = PENDING
end

def on_resolution(*args, &callback)

Returns:
  • (self) -
def on_resolution(*args, &callback)
  on_resolution_using @DefaultExecutor, *args, &callback
end

def on_resolution!(*args, &callback)

Other tags:
    Yieldparam: reason -
    Yieldparam: value -
    Yieldparam: fulfilled -

Other tags:
    Yield: - to the callback.
    Yield: - to the callback.

Overloads:
  • a_future.on_resolution!(*args, &callback)
  • an_event.on_resolution!(*args, &callback)

Returns:
  • (self) -
def on_resolution!(*args, &callback)
  add_callback :callback_on_resolution, args, callback
end

def on_resolution_using(executor, *args, &callback)

Other tags:
    Yieldparam: reason -
    Yieldparam: value -
    Yieldparam: fulfilled -

Other tags:
    Yield: - to the callback.
    Yield: - to the callback.

Overloads:
  • a_future.on_resolution_using(executor, *args, &callback)
  • an_event.on_resolution_using(executor, *args, &callback)

Returns:
  • (self) -
def on_resolution_using(executor, *args, &callback)
  add_callback :async_callback_on_resolution, executor, args, callback
end

def pending?

Returns:
  • (Boolean) -
def pending?
  !internal_state.resolved?
end

def promise

@!visibility private
For inspection.
def promise
  @Promise
end

def resolve_with(state, raise_on_reassign = true, reserved = false)

@!visibility private
def resolve_with(state, raise_on_reassign = true, reserved = false)
  if compare_and_set_internal_state(reserved ? RESERVED : PENDING, state)
    # go to synchronized block only if there were waiting threads
    @Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0
    call_callbacks state
  else
    return rejected_resolution(raise_on_reassign, state)
  end
  self
end

def resolved?

Returns:
  • (Boolean) -
def resolved?
  internal_state.resolved?
end

def state

Returns:
  • (:pending, :fulfilled, :rejected) -
  • (:pending, :resolved) -
  • (Symbol) -

Overloads:
  • a_future.state
  • an_event.state
def state
  internal_state.to_sym
end

def to_s

Returns:
  • (String) - Short string representation.
def to_s
  format '%s %s>', super[0..-2], state
end

def touch

Returns:
  • (self) -
def touch
  @Promise.touch
  self
end

def touched?

@!visibility private
For inspection.
def touched?
  promise.touched?
end

def wait(timeout = nil)

Returns:
  • (self, true, false) - self implies timeout was not used, true implies timeout was used
def wait(timeout = nil)
  result = wait_until_resolved(timeout)
  timeout ? result : self
end

def wait_until_resolved(timeout)

Returns:
  • (Boolean) -
def wait_until_resolved(timeout)
  return true if resolved?
  touch
  @Lock.synchronize do
    @Waiters.increment
    begin
      if timeout
        start = Concurrent.monotonic_time
        until resolved?
          break if @Condition.wait(@Lock, timeout) == nil # nil means timeout
          timeout -= (Concurrent.monotonic_time - start)
          break if timeout <= 0
        end
      else
        until resolved?
          @Condition.wait(@Lock, timeout)
        end
      end
    ensure
      # JRuby may raise ConcurrencyError
      @Waiters.decrement
    end
  end
  resolved?
end

def waiting_threads

@!visibility private
For inspection.
def waiting_threads
  @Waiters.each.to_a
end

def with_async(executor, *args, &block)

def with_async(executor, *args, &block)
  Concurrent.executor(executor).post(*args, &block)
end

def with_default_executor(executor)

Returns:
  • (AbstractEventFuture) -

Other tags:
    Abstract: -
def with_default_executor(executor)
  raise NotImplementedError
end

def with_hidden_resolvable

@!visibility private
def with_hidden_resolvable
  # TODO (pitr-ch 10-Dec-2018): documentation, better name if in edge
  self
end