require 'concurrent/synchronization/object'
require 'concurrent/atomic/atomic_boolean'
require 'concurrent/atomic/atomic_fixnum'
require 'concurrent/collection/lock_free_stack'
require 'concurrent/configuration'
require 'concurrent/errors'
require 'concurrent/re_include'
require 'concurrent/utility/monotonic_time'
module Concurrent
# {include:file:docs-source/promises-main.md}
module Promises
# @!macro promises.param.default_executor
# @param [Executor, :io, :fast] default_executor Instance of an executor or a name of the
# global executor. Default executor propagates to chained futures unless overridden with
# executor parameter or changed with {AbstractEventFuture#with_default_executor}.
#
# @!macro promises.param.executor
# @param [Executor, :io, :fast] executor Instance of an executor or a name of the
# global executor. The task is executed on it, default executor remains unchanged.
#
# @!macro promises.param.args
# @param [Object] args arguments which are passed to the task when it's executed.
# (It might be prepended with other arguments, see the @yield section).
#
# @!macro promises.shortcut.on
# Shortcut of {#$0_on} with default `:io` executor supplied.
# @see #$0_on
#
# @!macro promises.shortcut.using
# Shortcut of {#$0_using} with default `:io` executor supplied.
# @see #$0_using
#
# @!macro promise.param.task-future
# @yieldreturn will become result of the returned Future.
# Its returned value becomes {Future#value} fulfilling it,
# raised exception becomes {Future#reason} rejecting it.
#
# @!macro promise.param.callback
# @yieldreturn is forgotten.
# Container of all {Future}, {Event} factory methods. They are never constructed directly with
# new.
module FactoryMethods
extend ReInclude
extend self
module Configuration
# @return [Executor, :io, :fast] the executor which is used when none is supplied
# to a factory method. The method can be overridden in the receivers of
# `include FactoryMethod`
def default_executor
:io
end
end
include Configuration
# @!macro promises.shortcut.on
# @return [ResolvableEvent]
def resolvable_event
resolvable_event_on default_executor
end
# Creates a resolvable event, user is responsible for resolving the event once
# by calling {Promises::ResolvableEvent#resolve}.
#
# @!macro promises.param.default_executor
# @return [ResolvableEvent]
def resolvable_event_on(default_executor = self.default_executor)
ResolvableEventPromise.new(default_executor).future
end
# @!macro promises.shortcut.on
# @return [ResolvableFuture]
def resolvable_future
resolvable_future_on default_executor
end
# Creates resolvable future, user is responsible for resolving the future once by
# {Promises::ResolvableFuture#resolve}, {Promises::ResolvableFuture#fulfill},
# or {Promises::ResolvableFuture#reject}
#
# @!macro promises.param.default_executor
# @return [ResolvableFuture]
def resolvable_future_on(default_executor = self.default_executor)
ResolvableFuturePromise.new(default_executor).future
end
# @!macro promises.shortcut.on
# @return [Future]
def future(*args, &task)
future_on(default_executor, *args, &task)
end
# Constructs a new Future which will be resolved after block is evaluated on default executor.
# Evaluation begins immediately.
#
# @!macro promises.param.default_executor
# @!macro promises.param.args
# @yield [*args] to the task.
# @!macro promise.param.task-future
# @return [Future]
def future_on(default_executor, *args, &task)
ImmediateEventPromise.new(default_executor).future.then(*args, &task)
end
# Creates a resolved future with will be either fulfilled with the given value or rejected with
# the given reason.
#
# @param [true, false] fulfilled
# @param [Object] value
# @param [Object] reason
# @!macro promises.param.default_executor
# @return [Future]
def resolved_future(fulfilled, value, reason, default_executor = self.default_executor)
ImmediateFuturePromise.new(default_executor, fulfilled, value, reason).future
end
# Creates a resolved future which will be fulfilled with the given value.
#
# @!macro promises.param.default_executor
# @param [Object] value
# @return [Future]
def fulfilled_future(value, default_executor = self.default_executor)
resolved_future true, value, nil, default_executor
end
# Creates a resolved future which will be rejected with the given reason.
#
# @!macro promises.param.default_executor
# @param [Object] reason
# @return [Future]
def rejected_future(reason, default_executor = self.default_executor)
resolved_future false, nil, reason, default_executor
end
# Creates resolved event.
#
# @!macro promises.param.default_executor
# @return [Event]
def resolved_event(default_executor = self.default_executor)
ImmediateEventPromise.new(default_executor).event
end
# General constructor. Behaves differently based on the argument's type. It's provided for convenience
# but it's better to be explicit.
#
# @see rejected_future, resolved_event, fulfilled_future
# @!macro promises.param.default_executor
# @return [Event, Future]
#
# @overload make_future(nil, default_executor = self.default_executor)
# @param [nil] nil
# @return [Event] resolved event.
#
# @overload make_future(a_future, default_executor = self.default_executor)
# @param [Future] a_future
# @return [Future] a future which will be resolved when a_future is.
#
# @overload make_future(an_event, default_executor = self.default_executor)
# @param [Event] an_event
# @return [Event] an event which will be resolved when an_event is.
#
# @overload make_future(exception, default_executor = self.default_executor)
# @param [Exception] exception
# @return [Future] a rejected future with the exception as its reason.
#
# @overload make_future(value, default_executor = self.default_executor)
# @param [Object] value when none of the above overloads fits
# @return [Future] a fulfilled future with the value.
def make_future(argument = nil, default_executor = self.default_executor)
case argument
when AbstractEventFuture
# returning wrapper would change nothing
argument
when Exception
rejected_future argument, default_executor
when nil
resolved_event default_executor
else
fulfilled_future argument, default_executor
end
end
# @!macro promises.shortcut.on
# @return [Future, Event]
def delay(*args, &task)
delay_on default_executor, *args, &task
end
# Creates a new event or future which is resolved only after it is touched,
# see {Concurrent::AbstractEventFuture#touch}.
#
# @!macro promises.param.default_executor
# @overload delay_on(default_executor, *args, &task)
# If task is provided it returns a {Future} representing the result of the task.
# @!macro promises.param.args
# @yield [*args] to the task.
# @!macro promise.param.task-future
# @return [Future]
# @overload delay_on(default_executor)
# If no task is provided, it returns an {Event}
# @return [Event]
def delay_on(default_executor, *args, &task)
event = DelayPromise.new(default_executor).event
task ? event.chain(*args, &task) : event
end
# @!macro promises.shortcut.on
# @return [Future, Event]
def schedule(intended_time, *args, &task)
schedule_on default_executor, intended_time, *args, &task
end
# Creates a new event or future which is resolved in intended_time.
#
# @!macro promises.param.default_executor
# @!macro promises.param.intended_time
# @param [Numeric, Time] intended_time `Numeric` means to run in `intended_time` seconds.
# `Time` means to run on `intended_time`.
# @overload schedule_on(default_executor, intended_time, *args, &task)
# If task is provided it returns a {Future} representing the result of the task.
# @!macro promises.param.args
# @yield [*args] to the task.
# @!macro promise.param.task-future
# @return [Future]
# @overload schedule_on(default_executor, intended_time)
# If no task is provided, it returns an {Event}
# @return [Event]
def schedule_on(default_executor, intended_time, *args, &task)
event = ScheduledPromise.new(default_executor, intended_time).event
task ? event.chain(*args, &task) : event
end
# @!macro promises.shortcut.on
# @return [Future]
def zip_futures(*futures_and_or_events)
zip_futures_on default_executor, *futures_and_or_events
end
# Creates a new future which is resolved after all futures_and_or_events are resolved.
# Its value is an array of zipped future values. Its reason is an array of reasons for rejection.
# If there is an error it rejects.
# @!macro promises.event-conversion
# If event is supplied, which does not have value and can be only resolved, it's
# represented as `:fulfilled` with value `nil`.
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Future]
def zip_futures_on(default_executor, *futures_and_or_events)
ZipFuturesPromise.new_blocked_by(futures_and_or_events, default_executor).future
end
alias_method :zip, :zip_futures
# @!macro promises.shortcut.on
# @return [Event]
def zip_events(*futures_and_or_events)
zip_events_on default_executor, *futures_and_or_events
end
# Creates a new event which is resolved after all futures_and_or_events are resolved.
# (Future is resolved when fulfilled or rejected.)
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Event]
def zip_events_on(default_executor, *futures_and_or_events)
ZipEventsPromise.new_blocked_by(futures_and_or_events, default_executor).event
end
# @!macro promises.shortcut.on
# @return [Future]
def any_resolved_future(*futures_and_or_events)
any_resolved_future_on default_executor, *futures_and_or_events
end
alias_method :any, :any_resolved_future
# Creates a new future which is resolved after the first futures_and_or_events is resolved.
# Its result equals the result of the first resolved future.
# @!macro promises.any-touch
# If resolved it does not propagate {Concurrent::AbstractEventFuture#touch}, leaving delayed
# futures un-executed if they are not required any more.
# @!macro promises.event-conversion
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Future]
def any_resolved_future_on(default_executor, *futures_and_or_events)
AnyResolvedFuturePromise.new_blocked_by(futures_and_or_events, default_executor).future
end
# @!macro promises.shortcut.on
# @return [Future]
def any_fulfilled_future(*futures_and_or_events)
any_fulfilled_future_on default_executor, *futures_and_or_events
end
# Creates a new future which is resolved after the first futures_and_or_events is fulfilled.
# Its result equals the result of the first resolved future or if all futures_and_or_events reject,
# it has reason of the last rejected future.
# @!macro promises.any-touch
# @!macro promises.event-conversion
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Future]
def any_fulfilled_future_on(default_executor, *futures_and_or_events)
AnyFulfilledFuturePromise.new_blocked_by(futures_and_or_events, default_executor).future
end
# @!macro promises.shortcut.on
# @return [Event]
def any_event(*futures_and_or_events)
any_event_on default_executor, *futures_and_or_events
end
# Creates a new event which becomes resolved after the first futures_and_or_events resolves.
# @!macro promises.any-touch
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Event]
def any_event_on(default_executor, *futures_and_or_events)
AnyResolvedEventPromise.new_blocked_by(futures_and_or_events, default_executor).event
end
# TODO consider adding first(count, *futures)
# TODO consider adding zip_by(slice, *futures) processing futures in slices
# TODO or rather a generic aggregator taking a function
end
module InternalStates
# @!visibility private
class State
def resolved?
raise NotImplementedError
end
def to_sym
raise NotImplementedError
end
end
# @!visibility private
class Pending < State
def resolved?
false
end
def to_sym
:pending
end
end
# @!visibility private
class Reserved < Pending
end
# @!visibility private
class ResolvedWithResult < State
def resolved?
true
end
def to_sym
:resolved
end
def result
[fulfilled?, value, reason]
end
def fulfilled?
raise NotImplementedError
end
def value
raise NotImplementedError
end
def reason
raise NotImplementedError
end
def apply
raise NotImplementedError
end
end
# @!visibility private
class Fulfilled < ResolvedWithResult
def initialize(value)
@Value = value
end
def fulfilled?
true
end
def apply(args, block)
block.call value, *args
end
def value
@Value
end
def reason
nil
end
def to_sym
:fulfilled
end
end
# @!visibility private
class FulfilledArray < Fulfilled
def apply(args, block)
block.call(*value, *args)
end
end
# @!visibility private
class Rejected < ResolvedWithResult
def initialize(reason)
@Reason = reason
end
def fulfilled?
false
end
def value
nil
end
def reason
@Reason
end
def to_sym
:rejected
end
def apply(args, block)
block.call reason, *args
end
end
# @!visibility private
class PartiallyRejected < ResolvedWithResult
def initialize(value, reason)
super()
@Value = value
@Reason = reason
end
def fulfilled?
false
end
def to_sym
:rejected
end
def value
@Value
end
def reason
@Reason
end
def apply(args, block)
block.call(*reason, *args)
end
end
# @!visibility private
PENDING = Pending.new
# @!visibility private
RESERVED = Reserved.new
# @!visibility private
RESOLVED = Fulfilled.new(nil)
def RESOLVED.to_sym
:resolved
end
end
private_constant :InternalStates
# @!macro promises.shortcut.event-future
# @see Event#$0
# @see Future#$0
# @!macro promises.param.timeout
# @param [Numeric] timeout the maximum time in second to wait.
# @!macro promises.warn.blocks
# @note This function potentially blocks current thread until the Future is resolved.
# Be careful it can deadlock. Try to chain instead.
# Common ancestor of {Event} and {Future} classes, many shared methods are defined here.
class AbstractEventFuture < Synchronization::Object
safe_initialization!
attr_atomic(:internal_state)
private :internal_state=, :swap_internal_state, :compare_and_set_internal_state, :update_internal_state
# @!method internal_state
# @!visibility private
include InternalStates
def initialize(promise, default_executor)
super()
@Lock = Mutex.new
@Condition = ConditionVariable.new
@Promise = promise
@DefaultExecutor = default_executor
@Callbacks = LockFreeStack.new
@Waiters = AtomicFixnum.new 0
self.internal_state = PENDING
end
private :initialize
# Returns its state.
# @return [Symbol]
#
# @overload an_event.state
# @return [:pending, :resolved]
# @overload a_future.state
# Both :fulfilled, :rejected implies :resolved.
# @return [:pending, :fulfilled, :rejected]
def state
internal_state.to_sym
end
# Is it in pending state?
# @return [Boolean]
def pending?
!internal_state.resolved?
end
# Is it in resolved state?
# @return [Boolean]
def resolved?
internal_state.resolved?
end
# Propagates touch. Requests all the delayed futures, which it depends on, to be
# executed. This method is called by any other method requiring resolved state, like {#wait}.
# @return [self]
def touch
@Promise.touch
self
end
# @!macro promises.touches
# Calls {Concurrent::AbstractEventFuture#touch}.
# @!macro promises.method.wait
# Wait (block the Thread) until receiver is {#resolved?}.
# @!macro promises.touches
#
# @!macro promises.warn.blocks
# @!macro promises.param.timeout
# @return [self, true, false] self implies timeout was not used, true implies timeout was used
# and it was resolved, false implies it was not resolved within timeout.
def wait(timeout = nil)
result = wait_until_resolved(timeout)
timeout ? result : self
end
# Returns default executor.
# @return [Executor] default executor
# @see #with_default_executor
# @see FactoryMethods#future_on
# @see FactoryMethods#resolvable_future
# @see FactoryMethods#any_fulfilled_future_on
# @see similar
def default_executor
@DefaultExecutor
end
# @!macro promises.shortcut.on
# @return [Future]
def chain(*args, &task)
chain_on @DefaultExecutor, *args, &task
end
# Chains the task to be executed asynchronously on executor after it is resolved.
#
# @!macro promises.param.executor
# @!macro promises.param.args
# @return [Future]
# @!macro promise.param.task-future
#
# @overload an_event.chain_on(executor, *args, &task)
# @yield [*args] to the task.
# @overload a_future.chain_on(executor, *args, &task)
# @yield [fulfilled, value, reason, *args] to the task.
# @yieldparam [true, false] fulfilled
# @yieldparam [Object] value
# @yieldparam [Object] reason
def chain_on(executor, *args, &task)
ChainPromise.new_blocked_by1(self, executor, executor, args, &task).future
end
# @return [String] Short string representation.
def to_s
format '%s %s>', super[0..-2], state
end
alias_method :inspect, :to_s
# Resolves the resolvable when receiver is resolved.
#
# @param [Resolvable] resolvable
# @return [self]
def chain_resolvable(resolvable)
on_resolution! { resolvable.resolve_with internal_state }
end
alias_method :tangle, :chain_resolvable
# @!macro promises.shortcut.using
# @return [self]
def on_resolution(*args, &callback)
on_resolution_using @DefaultExecutor, *args, &callback
end
# Stores the callback to be executed synchronously on resolving thread after it is
# resolved.
#
# @!macro promises.param.args
# @!macro promise.param.callback
# @return [self]
#
# @overload an_event.on_resolution!(*args, &callback)
# @yield [*args] to the callback.
# @overload a_future.on_resolution!(*args, &callback)
# @yield [fulfilled, value, reason, *args] to the callback.
# @yieldparam [true, false] fulfilled
# @yieldparam [Object] value
# @yieldparam [Object] reason
def on_resolution!(*args, &callback)
add_callback :callback_on_resolution, args, callback
end
# Stores the callback to be executed asynchronously on executor after it is resolved.
#
# @!macro promises.param.executor
# @!macro promises.param.args
# @!macro promise.param.callback
# @return [self]
#
# @overload an_event.on_resolution_using(executor, *args, &callback)
# @yield [*args] to the callback.
# @overload a_future.on_resolution_using(executor, *args, &callback)
# @yield [fulfilled, value, reason, *args] to the callback.
# @yieldparam [true, false] fulfilled
# @yieldparam [Object] value
# @yieldparam [Object] reason
def on_resolution_using(executor, *args, &callback)
add_callback :async_callback_on_resolution, executor, args, callback
end
# @!macro promises.method.with_default_executor
# Crates new object with same class with the executor set as its new default executor.
# Any futures depending on it will use the new default executor.
# @!macro promises.shortcut.event-future
# @abstract
# @return [AbstractEventFuture]
def with_default_executor(executor)
raise NotImplementedError
end
# @!visibility private
def resolve_with(state, raise_on_reassign = true, reserved = false)
if compare_and_set_internal_state(reserved ? RESERVED : PENDING, state)
# go to synchronized block only if there were waiting threads
@Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0
call_callbacks state
else
return rejected_resolution(raise_on_reassign, state)
end
self
end
# For inspection.
# @!visibility private
# @return [Array<AbstractPromise>]
def blocks
@Callbacks.each_with_object([]) do |(method, args), promises|
promises.push(args[0]) if method == :callback_notify_blocked
end
end
# For inspection.
# @!visibility private
def callbacks
@Callbacks.each.to_a
end
# For inspection.
# @!visibility private
def promise
@Promise
end
# For inspection.
# @!visibility private
def touched?
promise.touched?
end
# For inspection.
# @!visibility private
def waiting_threads
@Waiters.each.to_a
end
# @!visibility private
def add_callback_notify_blocked(promise, index)
add_callback :callback_notify_blocked, promise, index
end
# @!visibility private
def add_callback_clear_delayed_node(node)
add_callback(:callback_clear_delayed_node, node)
end
# @!visibility private
def with_hidden_resolvable
# TODO (pitr-ch 10-Dec-2018): documentation, better name if in edge
self
end
private
def add_callback(method, *args)
state = internal_state
if state.resolved?
call_callback method, state, args
else
@Callbacks.push [method, args]
state = internal_state
# take back if it was resolved in the meanwhile
call_callbacks state if state.resolved?
end
self
end
def callback_clear_delayed_node(state, node)
node.value = nil
end
# @return [Boolean]
def wait_until_resolved(timeout)
return true if resolved?
touch
@Lock.synchronize do
@Waiters.increment
begin
if timeout
start = Concurrent.monotonic_time
until resolved?
break if @Condition.wait(@Lock, timeout) == nil # nil means timeout
timeout -= (Concurrent.monotonic_time - start)
break if timeout <= 0
end
else
until resolved?
@Condition.wait(@Lock, timeout)
end
end
ensure
# JRuby may raise ConcurrencyError
@Waiters.decrement
end
end
resolved?
end
def call_callback(method, state, args)
self.send method, state, *args
end
def call_callbacks(state)
method, args = @Callbacks.pop
while method
call_callback method, state, args
method, args = @Callbacks.pop
end
end
def with_async(executor, *args, &block)
Concurrent.executor(executor).post(*args, &block)
end
def async_callback_on_resolution(state, executor, args, callback)
with_async(executor, state, args, callback) do |st, ar, cb|
callback_on_resolution st, ar, cb
end
end
def callback_notify_blocked(state, promise, index)
promise.on_blocker_resolution self, index
end
end
# Represents an event which will happen in future (will be resolved). The event is either
# pending or resolved. It should be always resolved. Use {Future} to communicate rejections and
# cancellation.
class Event < AbstractEventFuture
alias_method :then, :chain
# @!macro promises.method.zip
# Creates a new event or a future which will be resolved when receiver and other are.
# Returns an event if receiver and other are events, otherwise returns a future.
# If just one of the parties is Future then the result
# of the returned future is equal to the result of the supplied future. If both are futures
# then the result is as described in {FactoryMethods#zip_futures_on}.
#
# @return [Future, Event]
def zip(other)
if other.is_a?(Future)
ZipFutureEventPromise.new_blocked_by2(other, self, @DefaultExecutor).future
else
ZipEventEventPromise.new_blocked_by2(self, other, @DefaultExecutor).event
end
end
alias_method :&, :zip
# Creates a new event which will be resolved when the first of receiver, `event_or_future`
# resolves.
#
# @return [Event]
def any(event_or_future)
AnyResolvedEventPromise.new_blocked_by2(self, event_or_future, @DefaultExecutor).event
end
alias_method :|, :any
# Creates new event dependent on receiver which will not evaluate until touched, see {#touch}.
# In other words, it inserts delay into the chain of Futures making rest of it lazy evaluated.
#
# @return [Event]
def delay
event = DelayPromise.new(@DefaultExecutor).event
ZipEventEventPromise.new_blocked_by2(self, event, @DefaultExecutor).event
end
# @!macro promise.method.schedule
# Creates new event dependent on receiver scheduled to execute on/in intended_time.
# In time is interpreted from the moment the receiver is resolved, therefore it inserts
# delay into the chain.
#
# @!macro promises.param.intended_time
# @return [Event]
def schedule(intended_time)
chain do
event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
ZipEventEventPromise.new_blocked_by2(self, event, @DefaultExecutor).event
end.flat_event
end
# Converts event to a future. The future is fulfilled when the event is resolved, the future may never fail.
#
# @return [Future]
def to_future
future = Promises.resolvable_future
ensure
chain_resolvable(future)
end
# Returns self, since this is event
# @return [Event]
def to_event
self
end
# @!macro promises.method.with_default_executor
# @return [Event]
def with_default_executor(executor)
EventWrapperPromise.new_blocked_by1(self, executor).event
end
private
def rejected_resolution(raise_on_reassign, state)
raise Concurrent::MultipleAssignmentError.new('Event can be resolved only once') if raise_on_reassign
return false
end
def callback_on_resolution(state, args, callback)
callback.call(*args)
end
end
# Represents a value which will become available in future. May reject with a reason instead,
# e.g. when the tasks raises an exception.
class Future < AbstractEventFuture
# Is it in fulfilled state?
# @return [Boolean]
def fulfilled?
state = internal_state
state.resolved? && state.fulfilled?
end
# Is it in rejected state?
# @return [Boolean]
def rejected?
state = internal_state
state.resolved? && !state.fulfilled?
end
# @!macro promises.warn.nil
# @note Make sure returned `nil` is not confused with timeout, no value when rejected,
# no reason when fulfilled, etc.
# Use more exact methods if needed, like {#wait}, {#value!}, {#result}, etc.
# @!macro promises.method.value
# Return value of the future.
# @!macro promises.touches
#
# @!macro promises.warn.blocks
# @!macro promises.warn.nil
# @!macro promises.param.timeout
# @!macro promises.param.timeout_value
# @param [Object] timeout_value a value returned by the method when it times out
# @return [Object, nil, timeout_value] the value of the Future when fulfilled,
# timeout_value on timeout,
# nil on rejection.
def value(timeout = nil, timeout_value = nil)
if wait_until_resolved timeout
internal_state.value
else
timeout_value
end
end
# Returns reason of future's rejection.
# @!macro promises.touches
#
# @!macro promises.warn.blocks
# @!macro promises.warn.nil
# @!macro promises.param.timeout
# @!macro promises.param.timeout_value
# @return [Object, timeout_value] the reason, or timeout_value on timeout, or nil on fulfillment.
def reason(timeout = nil, timeout_value = nil)
if wait_until_resolved timeout
internal_state.reason
else
timeout_value
end
end
# Returns triplet fulfilled?, value, reason.
# @!macro promises.touches
#
# @!macro promises.warn.blocks
# @!macro promises.param.timeout
# @return [Array(Boolean, Object, Object), nil] triplet of fulfilled?, value, reason, or nil
# on timeout.
def result(timeout = nil)
internal_state.result if wait_until_resolved timeout
end
# @!macro promises.method.wait
# @raise [Exception] {#reason} on rejection
def wait!(timeout = nil)
result = wait_until_resolved!(timeout)
timeout ? result : self
end
# @!macro promises.method.value
# @return [Object, nil, timeout_value] the value of the Future when fulfilled,
# or nil on rejection,
# or timeout_value on timeout.
# @raise [Exception] {#reason} on rejection
def value!(timeout = nil, timeout_value = nil)
if wait_until_resolved! timeout
internal_state.value
else
timeout_value
end
end
# Allows rejected Future to be risen with `raise` method.
# If the reason is not an exception `Runtime.new(reason)` is returned.
#
# @example
# raise Promises.rejected_future(StandardError.new("boom"))
# raise Promises.rejected_future("or just boom")
# @raise [Concurrent::Error] when raising not rejected future
# @return [Exception]
def exception(*args)
raise Concurrent::Error, 'it is not rejected' unless rejected?
raise ArgumentError unless args.size <= 1
reason = Array(internal_state.reason).flatten.compact
if reason.size > 1
ex = Concurrent::MultipleErrors.new reason
ex.set_backtrace(caller)
ex
else
ex = if reason[0].respond_to? :exception
reason[0].exception(*args)
else
RuntimeError.new(reason[0]).exception(*args)
end
ex.set_backtrace Array(ex.backtrace) + caller
ex
end
end
# @!macro promises.shortcut.on
# @return [Future]
def then(*args, &task)
then_on @DefaultExecutor, *args, &task
end
# Chains the task to be executed asynchronously on executor after it fulfills. Does not run
# the task if it rejects. It will resolve though, triggering any dependent futures.
#
# @!macro promises.param.executor
# @!macro promises.param.args
# @!macro promise.param.task-future
# @return [Future]
# @yield [value, *args] to the task.
def then_on(executor, *args, &task)
ThenPromise.new_blocked_by1(self, executor, executor, args, &task).future
end
# @!macro promises.shortcut.on
# @return [Future]
def rescue(*args, &task)
rescue_on @DefaultExecutor, *args, &task
end
# Chains the task to be executed asynchronously on executor after it rejects. Does not run
# the task if it fulfills. It will resolve though, triggering any dependent futures.
#
# @!macro promises.param.executor
# @!macro promises.param.args
# @!macro promise.param.task-future
# @return [Future]
# @yield [reason, *args] to the task.
def rescue_on(executor, *args, &task)
RescuePromise.new_blocked_by1(self, executor, executor, args, &task).future
end
# @!macro promises.method.zip
# @return [Future]
def zip(other)
if other.is_a?(Future)
ZipFuturesPromise.new_blocked_by2(self, other, @DefaultExecutor).future
else
ZipFutureEventPromise.new_blocked_by2(self, other, @DefaultExecutor).future
end
end
alias_method :&, :zip
# Creates a new event which will be resolved when the first of receiver, `event_or_future`
# resolves. Returning future will have value nil if event_or_future is event and resolves
# first.
#
# @return [Future]
def any(event_or_future)
AnyResolvedFuturePromise.new_blocked_by2(self, event_or_future, @DefaultExecutor).future
end
alias_method :|, :any
# Creates new future dependent on receiver which will not evaluate until touched, see {#touch}.
# In other words, it inserts delay into the chain of Futures making rest of it lazy evaluated.
#
# @return [Future]
def delay
event = DelayPromise.new(@DefaultExecutor).event
ZipFutureEventPromise.new_blocked_by2(self, event, @DefaultExecutor).future
end
# @!macro promise.method.schedule
# @return [Future]
def schedule(intended_time)
chain do
event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
ZipFutureEventPromise.new_blocked_by2(self, event, @DefaultExecutor).future
end.flat
end
# @!macro promises.method.with_default_executor
# @return [Future]
def with_default_executor(executor)
FutureWrapperPromise.new_blocked_by1(self, executor).future
end
# Creates new future which will have result of the future returned by receiver. If receiver
# rejects it will have its rejection.
#
# @param [Integer] level how many levels of futures should flatten
# @return [Future]
def flat_future(level = 1)
FlatFuturePromise.new_blocked_by1(self, level, @DefaultExecutor).future
end
alias_method :flat, :flat_future
# Creates new event which will be resolved when the returned event by receiver is.
# Be careful if the receiver rejects it will just resolve since Event does not hold reason.
#
# @return [Event]
def flat_event
FlatEventPromise.new_blocked_by1(self, @DefaultExecutor).event
end
# @!macro promises.shortcut.using
# @return [self]
def on_fulfillment(*args, &callback)
on_fulfillment_using @DefaultExecutor, *args, &callback
end
# Stores the callback to be executed synchronously on resolving thread after it is
# fulfilled. Does nothing on rejection.
#
# @!macro promises.param.args
# @!macro promise.param.callback
# @return [self]
# @yield [value, *args] to the callback.
def on_fulfillment!(*args, &callback)
add_callback :callback_on_fulfillment, args, callback
end
# Stores the callback to be executed asynchronously on executor after it is
# fulfilled. Does nothing on rejection.
#
# @!macro promises.param.executor
# @!macro promises.param.args
# @!macro promise.param.callback
# @return [self]
# @yield [value, *args] to the callback.
def on_fulfillment_using(executor, *args, &callback)
add_callback :async_callback_on_fulfillment, executor, args, callback
end
# @!macro promises.shortcut.using
# @return [self]
def on_rejection(*args, &callback)
on_rejection_using @DefaultExecutor, *args, &callback
end
# Stores the callback to be executed synchronously on resolving thread after it is
# rejected. Does nothing on fulfillment.
#
# @!macro promises.param.args
# @!macro promise.param.callback
# @return [self]
# @yield [reason, *args] to the callback.
def on_rejection!(*args, &callback)
add_callback :callback_on_rejection, args, callback
end
# Stores the callback to be executed asynchronously on executor after it is
# rejected. Does nothing on fulfillment.
#
# @!macro promises.param.executor
# @!macro promises.param.args
# @!macro promise.param.callback
# @return [self]
# @yield [reason, *args] to the callback.
def on_rejection_using(executor, *args, &callback)
add_callback :async_callback_on_rejection, executor, args, callback
end
# Allows to use futures as green threads. The receiver has to evaluate to a future which
# represents what should be done next. It basically flattens indefinitely until non Future
# values is returned which becomes result of the returned future. Any encountered exception
# will become reason of the returned future.
#
# @return [Future]
# @param [#call(value)] run_test
# an object which when called returns either Future to keep running with
# or nil, then the run completes with the value.
# The run_test can be used to extract the Future from deeper structure,
# or to distinguish Future which is a resulting value from a future
# which is suppose to continue running.
# @example
# body = lambda do |v|
# v += 1
# v < 5 ? Promises.future(v, &body) : v
# end
# Promises.future(0, &body).run.value! # => 5
def run(run_test = method(:run_test))
RunFuturePromise.new_blocked_by1(self, @DefaultExecutor, run_test).future
end
# @!visibility private
def apply(args, block)
internal_state.apply args, block
end
# Converts future to event which is resolved when future is resolved by fulfillment or rejection.
#
# @return [Event]
def to_event
event = Promises.resolvable_event
ensure
chain_resolvable(event)
end
# Returns self, since this is a future
# @return [Future]
def to_future
self
end
# @return [String] Short string representation.
def to_s
if resolved?
format '%s with %s>', super[0..-2], (fulfilled? ? value : reason).inspect
else
super
end
end
alias_method :inspect, :to_s
private
def run_test(v)
v if v.is_a?(Future)
end
def rejected_resolution(raise_on_reassign, state)
if raise_on_reassign
if internal_state == RESERVED
raise Concurrent::MultipleAssignmentError.new(
"Future can be resolved only once. It is already reserved.")
else
raise Concurrent::MultipleAssignmentError.new(
"Future can be resolved only once. It's #{result}, trying to set #{state.result}.",
current_result: result,
new_result: state.result)
end
end
return false
end
def wait_until_resolved!(timeout = nil)
result = wait_until_resolved(timeout)
raise self if rejected?
result
end
def async_callback_on_fulfillment(state, executor, args, callback)
with_async(executor, state, args, callback) do |st, ar, cb|
callback_on_fulfillment st, ar, cb
end
end
def async_callback_on_rejection(state, executor, args, callback)
with_async(executor, state, args, callback) do |st, ar, cb|
callback_on_rejection st, ar, cb
end
end
def callback_on_fulfillment(state, args, callback)
state.apply args, callback if state.fulfilled?
end
def callback_on_rejection(state, args, callback)
state.apply args, callback unless state.fulfilled?
end
def callback_on_resolution(state, args, callback)
callback.call(*state.result, *args)
end
end
# Marker module of Future, Event resolved manually.
module Resolvable
include InternalStates
end
# A Event which can be resolved by user.
class ResolvableEvent < Event
include Resolvable
# @!macro raise_on_reassign
# @raise [MultipleAssignmentError] when already resolved and raise_on_reassign is true.
# @!macro promise.param.raise_on_reassign
# @param [Boolean] raise_on_reassign should method raise exception if already resolved
# @return [self, false] false is returned when raise_on_reassign is false and the receiver
# is already resolved.
#
# Makes the event resolved, which triggers all dependent futures.
#
# @!macro promise.param.raise_on_reassign
# @!macro promise.param.reserved
# @param [true, false] reserved
# Set to true if the resolvable is {#reserve}d by you,
# marks resolution of reserved resolvable events and futures explicitly.
# Advanced feature, ignore unless you use {Resolvable#reserve} from edge.
def resolve(raise_on_reassign = true, reserved = false)
resolve_with RESOLVED, raise_on_reassign, reserved
end
# Creates new event wrapping receiver, effectively hiding the resolve method.
#
# @return [Event]
def with_hidden_resolvable
@with_hidden_resolvable ||= EventWrapperPromise.new_blocked_by1(self, @DefaultExecutor).event
end
# Behaves as {AbstractEventFuture#wait} but has one additional optional argument
# resolve_on_timeout.
#
# @param [true, false] resolve_on_timeout
# If it times out and the argument is true it will also resolve the event.
# @return [self, true, false]
# @see AbstractEventFuture#wait
def wait(timeout = nil, resolve_on_timeout = false)
super(timeout) or if resolve_on_timeout
# if it fails to resolve it was resolved in the meantime
# so return true as if there was no timeout
!resolve(false)
else
false
end
end
end
# A Future which can be resolved by user.
class ResolvableFuture < Future
include Resolvable
# Makes the future resolved with result of triplet `fulfilled?`, `value`, `reason`,
# which triggers all dependent futures.
#
# @param [true, false] fulfilled
# @param [Object] value
# @param [Object] reason
# @!macro promise.param.raise_on_reassign
# @!macro promise.param.reserved
def resolve(fulfilled = true, value = nil, reason = nil, raise_on_reassign = true, reserved = false)
resolve_with(fulfilled ? Fulfilled.new(value) : Rejected.new(reason), raise_on_reassign, reserved)
end
# Makes the future fulfilled with `value`,
# which triggers all dependent futures.
#
# @param [Object] value
# @!macro promise.param.raise_on_reassign
# @!macro promise.param.reserved
def fulfill(value, raise_on_reassign = true, reserved = false)
resolve_with Fulfilled.new(value), raise_on_reassign, reserved
end
# Makes the future rejected with `reason`,
# which triggers all dependent futures.
#
# @param [Object] reason
# @!macro promise.param.raise_on_reassign
# @!macro promise.param.reserved
def reject(reason, raise_on_reassign = true, reserved = false)
resolve_with Rejected.new(reason), raise_on_reassign, reserved
end
# Evaluates the block and sets its result as future's value fulfilling, if the block raises
# an exception the future rejects with it.
#
# @yield [*args] to the block.
# @yieldreturn [Object] value
# @return [self]
def evaluate_to(*args, &block)
promise.evaluate_to(*args, block)
end
# Evaluates the block and sets its result as future's value fulfilling, if the block raises
# an exception the future rejects with it.
#
# @yield [*args] to the block.
# @yieldreturn [Object] value
# @return [self]
# @raise [Exception] also raise reason on rejection.
def evaluate_to!(*args, &block)
promise.evaluate_to(*args, block).wait!
end
# @!macro promises.resolvable.resolve_on_timeout
# @param [::Array(true, Object, nil), ::Array(false, nil, Exception), nil] resolve_on_timeout
# If it times out and the argument is not nil it will also resolve the future
# to the provided resolution.
# Behaves as {AbstractEventFuture#wait} but has one additional optional argument
# resolve_on_timeout.
#
# @!macro promises.resolvable.resolve_on_timeout
# @return [self, true, false]
# @see AbstractEventFuture#wait
def wait(timeout = nil, resolve_on_timeout = nil)
super(timeout) or if resolve_on_timeout
# if it fails to resolve it was resolved in the meantime
# so return true as if there was no timeout
!resolve(*resolve_on_timeout, false)
else
false
end
end
# Behaves as {Future#wait!} but has one additional optional argument
# resolve_on_timeout.
#
# @!macro promises.resolvable.resolve_on_timeout
# @return [self, true, false]
# @raise [Exception] {#reason} on rejection
# @see Future#wait!
def wait!(timeout = nil, resolve_on_timeout = nil)
super(timeout) or if resolve_on_timeout
if resolve(*resolve_on_timeout, false)
false
else
# if it fails to resolve it was resolved in the meantime
# so return true as if there was no timeout
raise self if rejected?
true
end
else
false
end
end
# Behaves as {Future#value} but has one additional optional argument
# resolve_on_timeout.
#
# @!macro promises.resolvable.resolve_on_timeout
# @return [Object, timeout_value, nil]
# @see Future#value
def value(timeout = nil, timeout_value = nil, resolve_on_timeout = nil)
if wait_until_resolved timeout
internal_state.value
else
if resolve_on_timeout
unless resolve(*resolve_on_timeout, false)
# if it fails to resolve it was resolved in the meantime
# so return value as if there was no timeout
return internal_state.value
end
end
timeout_value
end
end
# Behaves as {Future#value!} but has one additional optional argument
# resolve_on_timeout.
#
# @!macro promises.resolvable.resolve_on_timeout
# @return [Object, timeout_value, nil]
# @raise [Exception] {#reason} on rejection
# @see Future#value!
def value!(timeout = nil, timeout_value = nil, resolve_on_timeout = nil)
if wait_until_resolved! timeout
internal_state.value
else
if resolve_on_timeout
unless resolve(*resolve_on_timeout, false)
# if it fails to resolve it was resolved in the meantime
# so return value as if there was no timeout
raise self if rejected?
return internal_state.value
end
end
timeout_value
end
end
# Behaves as {Future#reason} but has one additional optional argument
# resolve_on_timeout.
#
# @!macro promises.resolvable.resolve_on_timeout
# @return [Exception, timeout_value, nil]
# @see Future#reason
def reason(timeout = nil, timeout_value = nil, resolve_on_timeout = nil)
if wait_until_resolved timeout
internal_state.reason
else
if resolve_on_timeout
unless resolve(*resolve_on_timeout, false)
# if it fails to resolve it was resolved in the meantime
# so return value as if there was no timeout
return internal_state.reason
end
end
timeout_value
end
end
# Behaves as {Future#result} but has one additional optional argument
# resolve_on_timeout.
#
# @!macro promises.resolvable.resolve_on_timeout
# @return [::Array(Boolean, Object, Exception), nil]
# @see Future#result
def result(timeout = nil, resolve_on_timeout = nil)
if wait_until_resolved timeout
internal_state.result
else
if resolve_on_timeout
unless resolve(*resolve_on_timeout, false)
# if it fails to resolve it was resolved in the meantime
# so return value as if there was no timeout
internal_state.result
end
end
# otherwise returns nil
end
end
# Creates new future wrapping receiver, effectively hiding the resolve method and similar.
#
# @return [Future]
def with_hidden_resolvable
@with_hidden_resolvable ||= FutureWrapperPromise.new_blocked_by1(self, @DefaultExecutor).future
end
end
# @abstract
# @private
class AbstractPromise < Synchronization::Object
safe_initialization!
include InternalStates
def initialize(future)
super()
@Future = future
end
def future
@Future
end
alias_method :event, :future
def default_executor
future.default_executor
end
def state
future.state
end
def touch
end
def to_s
format '%s %s>', super[0..-2], @Future
end
alias_method :inspect, :to_s
def delayed_because
nil
end
private
def resolve_with(new_state, raise_on_reassign = true)
@Future.resolve_with(new_state, raise_on_reassign)
end
# @return [Future]
def evaluate_to(*args, block)
resolve_with Fulfilled.new(block.call(*args))
rescue Exception => error
resolve_with Rejected.new(error)
raise error unless error.is_a?(StandardError)
end
end
class ResolvableEventPromise < AbstractPromise
def initialize(default_executor)
super ResolvableEvent.new(self, default_executor)
end
end
class ResolvableFuturePromise < AbstractPromise
def initialize(default_executor)
super ResolvableFuture.new(self, default_executor)
end
public :evaluate_to
end
# @abstract
class InnerPromise < AbstractPromise
end
# @abstract
class BlockedPromise < InnerPromise
private_class_method :new
def self.new_blocked_by1(blocker, *args, &block)
blocker_delayed = blocker.promise.delayed_because
promise = new(blocker_delayed, 1, *args, &block)
blocker.add_callback_notify_blocked promise, 0
promise
end
def self.new_blocked_by2(blocker1, blocker2, *args, &block)
blocker_delayed1 = blocker1.promise.delayed_because
blocker_delayed2 = blocker2.promise.delayed_because
delayed = if blocker_delayed1 && blocker_delayed2
# TODO (pitr-ch 23-Dec-2016): use arrays when we know it will not grow (only flat adds delay)
LockFreeStack.of2(blocker_delayed1, blocker_delayed2)
else
blocker_delayed1 || blocker_delayed2
end
promise = new(delayed, 2, *args, &block)
blocker1.add_callback_notify_blocked promise, 0
blocker2.add_callback_notify_blocked promise, 1
promise
end
def self.new_blocked_by(blockers, *args, &block)
delayed = blockers.reduce(nil) { |d, f| add_delayed d, f.promise.delayed_because }
promise = new(delayed, blockers.size, *args, &block)
blockers.each_with_index { |f, i| f.add_callback_notify_blocked promise, i }
promise
end
def self.add_delayed(delayed1, delayed2)
if delayed1 && delayed2
delayed1.push delayed2
delayed1
else
delayed1 || delayed2
end
end
def initialize(delayed, blockers_count, future)
super(future)
@Delayed = delayed
@Countdown = AtomicFixnum.new blockers_count
end
def on_blocker_resolution(future, index)
countdown = process_on_blocker_resolution(future, index)
resolvable = resolvable?(countdown, future, index)
on_resolvable(future, index) if resolvable
end
def delayed_because
@Delayed
end
def touch
clear_and_propagate_touch
end
# for inspection only
def blocked_by
blocked_by = []
ObjectSpace.each_object(AbstractEventFuture) { |o| blocked_by.push o if o.blocks.include? self }
blocked_by
end
private
def clear_and_propagate_touch(stack_or_element = @Delayed)
return if stack_or_element.nil?
if stack_or_element.is_a? LockFreeStack
stack_or_element.clear_each { |element| clear_and_propagate_touch element }
else
stack_or_element.touch unless stack_or_element.nil? # if still present
end
end
# @return [true,false] if resolvable
def resolvable?(countdown, future, index)
countdown.zero?
end
def process_on_blocker_resolution(future, index)
@Countdown.decrement
end
def on_resolvable(resolved_future, index)
raise NotImplementedError
end
end
# @abstract
class BlockedTaskPromise < BlockedPromise
def initialize(delayed, blockers_count, default_executor, executor, args, &task)
raise ArgumentError, 'no block given' unless block_given?
super delayed, 1, Future.new(self, default_executor)
@Executor = executor
@Task = task
@Args = args
end
def executor
@Executor
end
end
class ThenPromise < BlockedTaskPromise
private
def initialize(delayed, blockers_count, default_executor, executor, args, &task)
super delayed, blockers_count, default_executor, executor, args, &task
end
def on_resolvable(resolved_future, index)
if resolved_future.fulfilled?
Concurrent.executor(@Executor).post(resolved_future, @Args, @Task) do |future, args, task|
evaluate_to lambda { future.apply args, task }
end
else
resolve_with resolved_future.internal_state
end
end
end
class RescuePromise < BlockedTaskPromise
private
def initialize(delayed, blockers_count, default_executor, executor, args, &task)
super delayed, blockers_count, default_executor, executor, args, &task
end
def on_resolvable(resolved_future, index)
if resolved_future.rejected?
Concurrent.executor(@Executor).post(resolved_future, @Args, @Task) do |future, args, task|
evaluate_to lambda { future.apply args, task }
end
else
resolve_with resolved_future.internal_state
end
end
end
class ChainPromise < BlockedTaskPromise
private
def on_resolvable(resolved_future, index)
if Future === resolved_future
Concurrent.executor(@Executor).post(resolved_future, @Args, @Task) do |future, args, task|
evaluate_to(*future.result, *args, task)
end
else
Concurrent.executor(@Executor).post(@Args, @Task) do |args, task|
evaluate_to(*args, task)
end
end
end
end
# will be immediately resolved
class ImmediateEventPromise < InnerPromise
def initialize(default_executor)
super Event.new(self, default_executor).resolve_with(RESOLVED)
end
end
class ImmediateFuturePromise < InnerPromise
def initialize(default_executor, fulfilled, value, reason)
super Future.new(self, default_executor).
resolve_with(fulfilled ? Fulfilled.new(value) : Rejected.new(reason))
end
end
class AbstractFlatPromise < BlockedPromise
def initialize(delayed_because, blockers_count, event_or_future)
delayed = LockFreeStack.of1(self)
super(delayed, blockers_count, event_or_future)
# noinspection RubyArgCount
@Touched = AtomicBoolean.new false
@DelayedBecause = delayed_because || LockFreeStack.new
event_or_future.add_callback_clear_delayed_node delayed.peek
end
def touch
if @Touched.make_true
clear_and_propagate_touch @DelayedBecause
end
end
private
def touched?
@Touched.value
end
def on_resolvable(resolved_future, index)
resolve_with resolved_future.internal_state
end
def resolvable?(countdown, future, index)
!@Future.internal_state.resolved? && super(countdown, future, index)
end
def add_delayed_of(future)
delayed = future.promise.delayed_because
if touched?
clear_and_propagate_touch delayed
else
BlockedPromise.add_delayed @DelayedBecause, delayed
clear_and_propagate_touch @DelayedBecause if touched?
end
end
end
class FlatEventPromise < AbstractFlatPromise
private
def initialize(delayed, blockers_count, default_executor)
super delayed, 2, Event.new(self, default_executor)
end
def process_on_blocker_resolution(future, index)
countdown = super(future, index)
if countdown.nonzero?
internal_state = future.internal_state
unless internal_state.fulfilled?
resolve_with RESOLVED
return countdown
end
value = internal_state.value
case value
when AbstractEventFuture
add_delayed_of value
value.add_callback_notify_blocked self, nil
countdown
else
resolve_with RESOLVED
end
end
countdown
end
end
class FlatFuturePromise < AbstractFlatPromise
private
def initialize(delayed, blockers_count, levels, default_executor)
raise ArgumentError, 'levels has to be higher than 0' if levels < 1
# flat promise may result to a future having delayed futures, therefore we have to have empty stack
# to be able to add new delayed futures
super delayed || LockFreeStack.new, 1 + levels, Future.new(self, default_executor)
end
def process_on_blocker_resolution(future, index)
countdown = super(future, index)
if countdown.nonzero?
internal_state = future.internal_state
unless internal_state.fulfilled?
resolve_with internal_state
return countdown
end
value = internal_state.value
case value
when AbstractEventFuture
add_delayed_of value
value.add_callback_notify_blocked self, nil
countdown
else
evaluate_to(lambda { raise TypeError, "returned value #{value.inspect} is not a Future" })
end
end
countdown
end
end
class RunFuturePromise < AbstractFlatPromise
private
def initialize(delayed, blockers_count, default_executor, run_test)
super delayed, 1, Future.new(self, default_executor)
@RunTest = run_test
end
def process_on_blocker_resolution(future, index)
internal_state = future.internal_state
unless internal_state.fulfilled?
resolve_with internal_state
return 0
end
value = internal_state.value
continuation_future = @RunTest.call value
if continuation_future
add_delayed_of continuation_future
continuation_future.add_callback_notify_blocked self, nil
else
resolve_with internal_state
end
1
end
end
class ZipEventEventPromise < BlockedPromise
def initialize(delayed, blockers_count, default_executor)
super delayed, 2, Event.new(self, default_executor)
end
private
def on_resolvable(resolved_future, index)
resolve_with RESOLVED
end
end
class ZipFutureEventPromise < BlockedPromise
def initialize(delayed, blockers_count, default_executor)
super delayed, 2, Future.new(self, default_executor)
@result = nil
end
private
def process_on_blocker_resolution(future, index)
# first blocking is future, take its result
@result = future.internal_state if index == 0
# super has to be called after above to piggyback on volatile @Countdown
super future, index
end
def on_resolvable(resolved_future, index)
resolve_with @result
end
end
class EventWrapperPromise < BlockedPromise
def initialize(delayed, blockers_count, default_executor)
super delayed, 1, Event.new(self, default_executor)
end
private
def on_resolvable(resolved_future, index)
resolve_with RESOLVED
end
end
class FutureWrapperPromise < BlockedPromise
def initialize(delayed, blockers_count, default_executor)
super delayed, 1, Future.new(self, default_executor)
end
private
def on_resolvable(resolved_future, index)
resolve_with resolved_future.internal_state
end
end
class ZipFuturesPromise < BlockedPromise
private
def initialize(delayed, blockers_count, default_executor)
super(delayed, blockers_count, Future.new(self, default_executor))
@Resolutions = ::Array.new(blockers_count, nil)
on_resolvable nil, nil if blockers_count == 0
end
def process_on_blocker_resolution(future, index)
# TODO (pitr-ch 18-Dec-2016): Can we assume that array will never break under parallel access when never re-sized?
@Resolutions[index] = future.internal_state # has to be set before countdown in super
super future, index
end
def on_resolvable(resolved_future, index)
all_fulfilled = true
values = ::Array.new(@Resolutions.size)
reasons = ::Array.new(@Resolutions.size)
@Resolutions.each_with_index do |internal_state, i|
fulfilled, values[i], reasons[i] = internal_state.result
all_fulfilled &&= fulfilled
end
if all_fulfilled
resolve_with FulfilledArray.new(values)
else
resolve_with PartiallyRejected.new(values, reasons)
end
end
end
class ZipEventsPromise < BlockedPromise
private
def initialize(delayed, blockers_count, default_executor)
super delayed, blockers_count, Event.new(self, default_executor)
on_resolvable nil, nil if blockers_count == 0
end
def on_resolvable(resolved_future, index)
resolve_with RESOLVED
end
end
# @abstract
class AbstractAnyPromise < BlockedPromise
end
class AnyResolvedEventPromise < AbstractAnyPromise
private
def initialize(delayed, blockers_count, default_executor)
super delayed, blockers_count, Event.new(self, default_executor)
end
def resolvable?(countdown, future, index)
true
end
def on_resolvable(resolved_future, index)
resolve_with RESOLVED, false
end
end
class AnyResolvedFuturePromise < AbstractAnyPromise
private
def initialize(delayed, blockers_count, default_executor)
super delayed, blockers_count, Future.new(self, default_executor)
end
def resolvable?(countdown, future, index)
true
end
def on_resolvable(resolved_future, index)
resolve_with resolved_future.internal_state, false
end
end
class AnyFulfilledFuturePromise < AnyResolvedFuturePromise
private
def resolvable?(countdown, event_or_future, index)
(event_or_future.is_a?(Event) ? event_or_future.resolved? : event_or_future.fulfilled?) ||
# inlined super from BlockedPromise
countdown.zero?
end
end
class DelayPromise < InnerPromise
def initialize(default_executor)
event = Event.new(self, default_executor)
@Delayed = LockFreeStack.of1(self)
super event
event.add_callback_clear_delayed_node @Delayed.peek
end
def touch
@Future.resolve_with RESOLVED
end
def delayed_because
@Delayed
end
end
class ScheduledPromise < InnerPromise
def intended_time
@IntendedTime
end
def inspect
"#{to_s[0..-2]} intended_time: #{@IntendedTime}>"
end
private
def initialize(default_executor, intended_time)
super Event.new(self, default_executor)
@IntendedTime = intended_time
in_seconds = begin
now = Time.now
schedule_time = if @IntendedTime.is_a? Time
@IntendedTime
else
now + @IntendedTime
end
[0, schedule_time.to_f - now.to_f].max
end
Concurrent.global_timer_set.post(in_seconds) do
@Future.resolve_with RESOLVED
end
end
end
extend FactoryMethods
private_constant :AbstractPromise,
:ResolvableEventPromise,
:ResolvableFuturePromise,
:InnerPromise,
:BlockedPromise,
:BlockedTaskPromise,
:ThenPromise,
:RescuePromise,
:ChainPromise,
:ImmediateEventPromise,
:ImmediateFuturePromise,
:AbstractFlatPromise,
:FlatFuturePromise,
:FlatEventPromise,
:RunFuturePromise,
:ZipEventEventPromise,
:ZipFutureEventPromise,
:EventWrapperPromise,
:FutureWrapperPromise,
:ZipFuturesPromise,
:ZipEventsPromise,
:AbstractAnyPromise,
:AnyResolvedFuturePromise,
:AnyFulfilledFuturePromise,
:AnyResolvedEventPromise,
:DelayPromise,
:ScheduledPromise
end
end