class Concurrent::Promises::AbstractEventFuture
Common ancestor of {Event} and {Future} classes, many shared methods are defined here.
def add_callback(method, *args)
def add_callback(method, *args) state = internal_state if state.resolved? call_callback method, state, args else @Callbacks.push [method, args] state = internal_state # take back if it was resolved in the meanwhile call_callbacks state if state.resolved? end self end
def add_callback_clear_delayed_node(node)
def add_callback_clear_delayed_node(node) add_callback(:callback_clear_delayed_node, node) end
def add_callback_notify_blocked(promise, index)
def add_callback_notify_blocked(promise, index) add_callback :callback_notify_blocked, promise, index end
def async_callback_on_resolution(state, executor, args, callback)
def async_callback_on_resolution(state, executor, args, callback) with_async(executor, state, args, callback) do |st, ar, cb| callback_on_resolution st, ar, cb end end
def blocks
-
(Array
-)
def blocks @Callbacks.each_with_object([]) do |(method, args), promises| promises.push(args[0]) if method == :callback_notify_blocked end end
def call_callback(method, state, args)
def call_callback(method, state, args) self.send method, state, *args end
def call_callbacks(state)
def call_callbacks(state) method, args = @Callbacks.pop while method call_callback method, state, args method, args = @Callbacks.pop end end
def callback_clear_delayed_node(state, node)
def callback_clear_delayed_node(state, node) node.value = nil end
def callback_notify_blocked(state, promise, index)
def callback_notify_blocked(state, promise, index) promise.on_blocker_resolution self, index end
def callbacks
For inspection.
def callbacks @Callbacks.each.to_a end
def chain(*args, &task)
-
(Future)
-
def chain(*args, &task) chain_on @DefaultExecutor, *args, &task end
def chain_on(executor, *args, &task)
- Yieldparam: reason -
Yieldparam: value -
Yieldparam: fulfilled -
Other tags:
- Yield: - to the task.
Yield: - to the task.
Overloads:
-
a_future.chain_on(executor, *args, &task)
-
an_event.chain_on(executor, *args, &task)
Returns:
-
(Future)
-
def chain_on(executor, *args, &task) ChainPromise.new_blocked_by1(self, executor, executor, args, &task).future end
def chain_resolvable(resolvable)
-
(self)
-
Parameters:
-
resolvable
(Resolvable
) --
def chain_resolvable(resolvable) on_resolution! { resolvable.resolve_with internal_state } end
def default_executor
- See: similar -
See: FactoryMethods#any_fulfilled_future_on -
See: FactoryMethods#resolvable_future -
See: FactoryMethods#future_on -
See: #with_default_executor -
Returns:
-
(Executor)
- default executor
def default_executor @DefaultExecutor end
def initialize(promise, default_executor)
def initialize(promise, default_executor) super() @Lock = Mutex.new @Condition = ConditionVariable.new @Promise = promise @DefaultExecutor = default_executor @Callbacks = LockFreeStack.new @Waiters = AtomicFixnum.new 0 self.internal_state = PENDING end
def on_resolution(*args, &callback)
-
(self)
-
def on_resolution(*args, &callback) on_resolution_using @DefaultExecutor, *args, &callback end
def on_resolution!(*args, &callback)
- Yieldparam: reason -
Yieldparam: value -
Yieldparam: fulfilled -
Other tags:
- Yield: - to the callback.
Yield: - to the callback.
Overloads:
-
a_future.on_resolution!(*args, &callback)
-
an_event.on_resolution!(*args, &callback)
Returns:
-
(self)
-
def on_resolution!(*args, &callback) add_callback :callback_on_resolution, args, callback end
def on_resolution_using(executor, *args, &callback)
- Yieldparam: reason -
Yieldparam: value -
Yieldparam: fulfilled -
Other tags:
- Yield: - to the callback.
Yield: - to the callback.
Overloads:
-
a_future.on_resolution_using(executor, *args, &callback)
-
an_event.on_resolution_using(executor, *args, &callback)
Returns:
-
(self)
-
def on_resolution_using(executor, *args, &callback) add_callback :async_callback_on_resolution, executor, args, callback end
def pending?
-
(Boolean)
-
def pending? !internal_state.resolved? end
def promise
For inspection.
def promise @Promise end
def resolve_with(state, raise_on_reassign = true, reserved = false)
def resolve_with(state, raise_on_reassign = true, reserved = false) if compare_and_set_internal_state(reserved ? RESERVED : PENDING, state) # go to synchronized block only if there were waiting threads @Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0 call_callbacks state else return rejected_resolution(raise_on_reassign, state) end self end
def resolved?
-
(Boolean)
-
def resolved? internal_state.resolved? end
def state
-
(:pending, :fulfilled, :rejected)
- -
(:pending, :resolved)
- -
(Symbol)
-
Overloads:
-
a_future.state
-
an_event.state
def state internal_state.to_sym end
def to_s
-
(String)
- Short string representation.
def to_s format '%s %s>', super[0..-2], state end
def touch
-
(self)
-
def touch @Promise.touch self end
def touched?
For inspection.
def touched? promise.touched? end
def wait(timeout = nil)
-
(self, true, false)
- self implies timeout was not used, true implies timeout was used
def wait(timeout = nil) result = wait_until_resolved(timeout) timeout ? result : self end
def wait_until_resolved(timeout)
-
(Boolean)
-
def wait_until_resolved(timeout) return true if resolved? touch @Lock.synchronize do @Waiters.increment begin if timeout start = Concurrent.monotonic_time until resolved? break if @Condition.wait(@Lock, timeout) == nil # nil means timeout timeout -= (Concurrent.monotonic_time - start) break if timeout <= 0 end else until resolved? @Condition.wait(@Lock, timeout) end end ensure # JRuby may raise ConcurrencyError @Waiters.decrement end end resolved? end
def waiting_threads
For inspection.
def waiting_threads @Waiters.each.to_a end
def with_async(executor, *args, &block)
def with_async(executor, *args, &block) Concurrent.executor(executor).post(*args, &block) end
def with_default_executor(executor)
-
(AbstractEventFuture)
-
Other tags:
- Abstract: -
def with_default_executor(executor) raise NotImplementedError end
def with_hidden_resolvable
def with_hidden_resolvable # TODO (pitr-ch 10-Dec-2018): documentation, better name if in edge self end