class Concurrent::Promise
-
‘rescue` is aliased by `catch` and `on_error`
- `rescue { |reason| … }` is the same as `then(Proc.new { |reason| … } )`
- `on_success { |result| … }` is the same as `then {|result| … }`
the parent reason.
default callable is `{ |reason| raise reason }` that rejects the child with
|result| result }` that fulfills the child with the parent value. The
rejection. At least one of them should be passed. The default block is `{
executed upon parent fulfillment and a callable to be executed upon parent
The `then` method is the most generic alias: it accepts a block to be
#### Aliases
receive immediately rejection (they will be executed asynchronously).
Once a promise is rejected it will continue to accept children that will
“`
c2.reason #=> #<RuntimeError: Boom!>
c2.wait.state #=> :rejected
c1.value #=> 45
c1.wait.state #=> :fulfilled
c2 = p.then(-> reason { raise ’Boom!‘ })
c1 = p.then(-> reason { 42 })
p = Concurrent::Promise.execute { Thread.pass; raise StandardError }
“`ruby
receive the rejection `reason` as the rejection callable parameter:
When a promise is rejected all its children will be rejected and will
#### Rejection
“`
p.reason #=> “#<StandardError: Here comes the Boom!>”
p.rejected? #=> true
p.state #=> :rejected
sleep(0.1)
p = Concurrent::Promise.execute{ raise StandardError.new(“Here comes the Boom!”) }
“`ruby
a reason for the rejection:
If an exception occurs, the promise will be rejected and will provide
“`
p.value #=> “Hello, world!”
p.fulfilled? #=> true
p.state #=> :fulfilled
sleep(0.1)
p = Concurrent::Promise.execute{ “Hello, world!” }
“`ruby
Wait a little bit, and the promise will resolve and provide a value:
“`
p.pending? #=> true
p.state #=> :pending
p = Concurrent::Promise.execute{ “Hello, world!” }
“`ruby
Once the `execute` method is called a `Promise` becomes `pending`:
“`
p3 = Concurrent::Promise.execute{ “Hello World!” }
# execute during creation
p2 = Concurrent::Promise.new{ “Hello World!” }.execute
# create and immediately execute
p1.execute
p1.state #=> :unscheduled
p1 = Concurrent::Promise.new{ “Hello World!” }
# create, operate, then execute
“`ruby
provide identical behavior:
There are multiple ways to create and execute a new `Promise`. Both ways
of execution of Promise objects in a chain (or tree) is strictly defined.
execution and changed state). Despite being asynchronous, however, the order
parent (by the time a child is created its parent may have completed
child Promise finishes initialization it may be in a different state than its
Promises are executed asynchronously from the main thread. By the time a
- if parent is rejected the child will be pending (but will ultimately be rejected)
- if parent is fulfilled the child will be pending
- if parent is pending the child will be pending
- if parent is unscheduled the child will be unscheduled
The initial state of a newly created Promise depends on the state of its parent:
“`
then(executor: different_executor){|result| result % 5 }.execute
then{|result| result * 3 }.
then{|result| result - 10 }.
p = Concurrent::Promise.fulfill(20).
“`ruby
And so on, and so on, and so on…
“`
p = Concurrent::Promise.new{10}.then{|x| x * 2}.then{|result| result - 10 }.execute
“`ruby
to chained promises.
on rejection. The result of the each promise is passed as the block argument
block and an executor, to be executed on fulfillment, and a callable argument to be executed
Promises can be chained using the `then` method. The `then` method accepts a
“`
end
42
# do something
p = Concurrent::Promise.execute do
“`ruby
Then create one
“`
require ’concurrent/promise’
“‘ruby
Start by requiring promises
### Examples
@!macro copy_options
Promises run on the global thread pool.
seconds to block.
block. Any other integer or float value will indicate the maximum number of
block. If `nil` the call will block indefinitely. If `0` the call will not
timeout value can be passed to `value` to limit how long the call will
`value` will block until the promise is either rejected or fulfilled. A
immediately return the current value. When a promise is pending a call to
immediately. When a promise is fulfilled a call to `value` will
operation. When a promise is rejected a call to `value` will return `nil`
`deref`) method. Obtaining the value of a promise is a potentially blocking
Retrieving the value of a promise is done through the `value` (alias:
as can the `#state` method, which returns a symbol.
`#fulfilled?` can be called at any time to obtain the state of the Promise,
predicate methods `#unscheduled?`, `#pending?`, `#rejected?`, and
`reason` will be updated with a reference to the thrown exception. The
will be updated to reflect the result of the operation. If :rejected the
:fulfilled, indicating success. If a Promise is :fulfilled its `#value`
:rejected, indicating that an exception was thrown during processing, or
:processing is considered `#incomplete?`. A `#complete?` Promise is either
is complete. A future that is in the :unscheduled, :pending, or
becomes :processing. The future will remain in this state until processing
to a thread for processing (often immediately upon `#post`) the state
:pending. Once a job is pulled from the thread pool’s queue and is given
:unscheduled. Once the ‘#execute` method is called the state becomes
`#incomplete?` and `#complete?`. When a Promise is created it is set to
:processing, :rejected, or :fulfilled. These are also aggregated as
Promises have several possible states: :unscheduled, :pending,
will receive the reason.
When a promise is rejected all its children will be summarily rejected and
result of each promise is passed to each of its children upon resolution.
resolution and an optional callable to be executed upon parent failure. The
method takes two parameters: an optional block to be executed upon parent
before their children, children before their younger siblings. The `then`
main thread) but in a strict order: parents are guaranteed to be resolved
another promise. Promises are resolved asynchronously (with respect to the
chained using the `then` method. The result of a call to `then` is always
structure where each promise may have zero or more children. Promises are
Promises are far more robust, however. Promises can be chained in a tree
Promises are similar to futures and share many of the same behaviours.
> completion of an operation.
> A promise represents the eventual value returned from the single
and [Promises/A+](promises-aplus.github.io/promises-spec/) specifications.
Promises are inspired by the JavaScript [Promises/A](wiki.commonjs.org/wiki/Promises/A)
def self.aggregate(method, *promises)
returns false.
`true` or executing the composite's `#rescue` handlers if the predicate
then executing the composite's `#then` handlers if the predicate returns
or `one?`) on the collection checking for the success or failure of each,
call the given Ruby `Ennnumerable` predicate (such as `any?`, `all?`, `none?`,
execute the aggregated promises and collect them into a standard Ruby array,
Aggregate a collection of zero or more promises under a composite promise,
def self.aggregate(method, *promises) composite = Promise.new do completed = promises.collect do |promise| promise.execute if promise.unscheduled? promise.wait promise end unless completed.empty? || completed.send(method){|promise| promise.fulfilled? } raise PromiseExecutionError end end composite end
def self.all?(*promises)
-
(Promise)
- an unscheduled (not executed) promise that aggregates
Parameters:
-
promises
(Array
) -- Zero or more promises to aggregate
def self.all?(*promises) aggregate(:all?, *promises) end
def self.any?(*promises)
were not already executed.
fail. Upon execution will execute any of the aggregate promises that
a `Concurrent::PromiseExecutionError` if any of the aggregated promises
if any aggregated promises succeed. Executes the `rescue` handler with
Aggregates a collection of promises and executes the `then` condition
def self.any?(*promises) aggregate(:any?, *promises) end
def self.execute(opts = {}, &block)
-
(ArgumentError)
- if no block is given
Returns:
-
(Promise)
- the newly created `Promise` in the `:pending` state
def self.execute(opts = {}, &block) new(opts, &block).execute end
def self.fulfill(value, opts = {})
-
(Promise)
- the newly created `Promise`
Raises:
-
(ArgumentError)
- if no block is given
def self.fulfill(value, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, true, value, nil) } end
def self.reject(reason, opts = {})
-
(Promise)
- the newly created `Promise`
Raises:
-
(ArgumentError)
- if no block is given
def self.reject(reason, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, false, nil, reason) } end
def self.zip(*promises)
-
(Promise
-)
Options Hash:
(**opts)
-
:execute
(Boolean
) -- execute promise before returning -
:executor
(Executor
) -- when set use the given `Executor` instance.
Parameters:
-
opts
(Hash
) -- the configuration options -
promises
(Array
) -- -
promises
(Array
) --
Overloads:
-
zip(*promises, opts)
-
zip(*promises)
def self.zip(*promises) opts = promises.last.is_a?(::Hash) ? promises.pop.dup : {} opts[:executor] ||= ImmediateExecutor.new zero = if !opts.key?(:execute) || opts.delete(:execute) fulfill([], opts) else Promise.new(opts) { [] } end promises.reduce(zero) do |p1, p2| p1.flat_map do |results| p2.then do |next_result| results << next_result end end end end
def complete(success, value, reason)
def complete(success, value, reason) children_to_notify = synchronize do set_state!(success, value, reason) @children.dup end children_to_notify.each { |child| notify_child(child) } observers.notify_and_delete_observers{ [Time.now, self.value, reason] } end
def execute
-
(Promise)
- a reference to `self`
def execute if root? if compare_and_set_state(:pending, :unscheduled) set_pending realize(@promise_body) end else compare_and_set_state(:pending, :unscheduled) @parent.execute end self end
def fail(reason = StandardError.new)
-
(Concurrent::PromiseExecutionError)
- if not the root promise
def fail(reason = StandardError.new) set { raise reason } end
def flat_map(&block)
-
(Promise)
-
def flat_map(&block) child = Promise.new( parent: self, executor: ImmediateExecutor.new, ) on_error { |e| child.on_reject(e) } on_success do |result1| begin inner = block.call(result1) inner.execute inner.on_success { |result2| child.on_fulfill(result2) } inner.on_error { |e| child.on_reject(e) } rescue => e child.on_reject(e) end end child end
def initialize(opts = {}, &block)
- See: http://promises-aplus.github.io/promises-spec/ -
See: http://wiki.commonjs.org/wiki/Promises/A -
Raises:
-
(ArgumentError)
- if no block is given
Other tags:
- Yield: - The block operation to be performed asynchronously.
Options Hash:
(**opts)
-
:args
(object, Array
) -- zero or more arguments to be passed -
:on_reject
(Proc
) -- rejection handler -
:on_fulfill
(Proc
) -- fulfillment handler -
:parent
(Promise
) -- the parent `Promise` when building a chain/tree
def initialize(opts = {}, &block) opts.delete_if { |k, v| v.nil? } super(NULL, opts.merge(__promise_body_from_block__: block), &nil) end
def notify_child(child)
def notify_child(child) if_state(:fulfilled) { child.on_fulfill(apply_deref_options(@value)) } if_state(:rejected) { child.on_reject(@reason) } end
def ns_initialize(value, opts)
def ns_initialize(value, opts) super @executor = Options.executor_from_options(opts) || Concurrent.global_io_executor @args = get_arguments_from(opts) @parent = opts.fetch(:parent) { nil } @on_fulfill = opts.fetch(:on_fulfill) { Proc.new { |result| result } } @on_reject = opts.fetch(:on_reject) { Proc.new { |reason| raise reason } } @promise_body = opts[:__promise_body_from_block__] || Proc.new { |result| result } @state = :unscheduled @children = [] end
def on_fulfill(result)
def on_fulfill(result) realize Proc.new { @on_fulfill.call(result) } nil end
def on_reject(reason)
def on_reject(reason) realize Proc.new { @on_reject.call(reason) } nil end
def on_success(&block)
-
(Promise)
- self
Other tags:
- Yield: - The block to execute
def on_success(&block) raise ArgumentError.new('no block given') unless block_given? self.then(&block) end
def realize(task)
def realize(task) @executor.post do success, value, reason = SafeTaskExecutor.new(task, rescue_exception: true).execute(*@args) complete(success, value, reason) end end
def rescue(&block)
-
(Promise)
- self
Other tags:
- Yield: - The block to execute
def rescue(&block) self.then(block) end
def root? # :nodoc:
@!visibility private
def root? # :nodoc: @parent.nil? end
def set(value = NULL, &block)
-
(Concurrent::PromiseExecutionError)
- if not the root promise
def set(value = NULL, &block) raise PromiseExecutionError.new('supported only on root promise') unless root? check_for_block_or_value!(block_given?, value) synchronize do if @state != :unscheduled raise MultipleAssignmentError else @promise_body = block || Proc.new { |result| value } end end execute end
def set_pending
def set_pending synchronize do @state = :pending @children.each { |c| c.set_pending } end end
def set_state!(success, value, reason)
def set_state!(success, value, reason) set_state(success, value, reason) event.set end
def synchronized_set_state!(success, value, reason)
def synchronized_set_state!(success, value, reason) synchronize { set_state!(success, value, reason) } end
def then(*args, &block)
-
executor
(ThreadPool
) -- An optional thread pool executor to be used -
rescuer
(Proc
) -- An optional rescue block to be executed if the -
executor
(ThreadPool
) -- An optional thread pool executor to be used -
rescuer
(Proc
) -- An optional rescue block to be executed if the
Overloads:
-
then(rescuer, executor: executor, &block)
-
then(rescuer, executor, &block)
Other tags:
- Yield: - The block operation to be performed asynchronously.
Returns:
-
(Promise)
- the new promise
def then(*args, &block) if args.last.is_a?(::Hash) executor = args.pop[:executor] rescuer = args.first else rescuer, executor = args end executor ||= @executor raise ArgumentError.new('rescuers and block are both missing') if rescuer.nil? && !block_given? block = Proc.new { |result| result } unless block_given? child = Promise.new( parent: self, executor: executor, on_fulfill: block, on_reject: rescuer ) synchronize do child.state = :pending if @state == :pending child.on_fulfill(apply_deref_options(@value)) if @state == :fulfilled child.on_reject(@reason) if @state == :rejected @children << child end child end
def zip(*others)
-
(Promise
-)
Options Hash:
(**opts)
-
:execute
(Boolean
) -- execute promise before returning -
:executor
(Executor
) -- when set use the given `Executor` instance.
Parameters:
-
opts
(Hash
) -- the configuration options -
others
(Array
) -- -
others
(Array
) --
Overloads:
-
zip(*promises, opts)
-
zip(*promises)
def zip(*others) self.class.zip(self, *others) end