class Concurrent::Promise

  • ‘rescue` is aliased by `catch` and `on_error`
    - `rescue { |reason| … }` is the same as `then(Proc.new { |reason| … } )`
    - `on_success { |result| … }` is the same as `then {|result| … }`
    the parent reason.
    default callable is `{ |reason| raise reason }` that rejects the child with
    |result| result }` that fulfills the child with the parent value. The
    rejection. At least one of them should be passed. The default block is `{
    executed upon parent fulfillment and a callable to be executed upon parent
    The `then` method is the most generic alias: it accepts a block to be
    #### Aliases
    receive immediately rejection (they will be executed asynchronously).
    Once a promise is rejected it will continue to accept children that will
    “`
    c2.reason #=> #<RuntimeError: Boom!>
    c2.wait.state #=> :rejected
    c1.value #=> 45
    c1.wait.state #=> :fulfilled
    c2 = p.then(-> reason { raise ’Boom!‘ })
    c1 = p.then(-> reason { 42 })
    p = Concurrent::Promise.execute { Thread.pass; raise StandardError }
    “`ruby
    receive the rejection `reason` as the rejection callable parameter:
    When a promise is rejected all its children will be rejected and will
    #### Rejection
    “`
    p.reason #=> “#<StandardError: Here comes the Boom!>”
    p.rejected? #=> true
    p.state #=> :rejected
    sleep(0.1)
    p = Concurrent::Promise.execute{ raise StandardError.new(“Here comes the Boom!”) }
    “`ruby
    a reason for the rejection:
    If an exception occurs, the promise will be rejected and will provide
    “`
    p.value #=> “Hello, world!”
    p.fulfilled? #=> true
    p.state #=> :fulfilled
    sleep(0.1)
    p = Concurrent::Promise.execute{ “Hello, world!” }
    “`ruby
    Wait a little bit, and the promise will resolve and provide a value:
    “`
    p.pending? #=> true
    p.state #=> :pending
    p = Concurrent::Promise.execute{ “Hello, world!” }
    “`ruby
    Once the `execute` method is called a `Promise` becomes `pending`:
    “`
    p3 = Concurrent::Promise.execute{ “Hello World!” }
    # execute during creation
    p2 = Concurrent::Promise.new{ “Hello World!” }.execute
    # create and immediately execute
    p1.execute
    p1.state #=> :unscheduled
    p1 = Concurrent::Promise.new{ “Hello World!” }
    # create, operate, then execute
    “`ruby
    provide identical behavior:
    There are multiple ways to create and execute a new `Promise`. Both ways
    of execution of Promise objects in a chain (or tree) is strictly defined.
    execution and changed state). Despite being asynchronous, however, the order
    parent (by the time a child is created its parent may have completed
    child Promise finishes initialization it may be in a different state than its
    Promises are executed asynchronously from the main thread. By the time a
    - if parent is rejected the child will be pending (but will ultimately be rejected)
    - if parent is fulfilled the child will be pending
    - if parent is pending the child will be pending
    - if parent is unscheduled the child will be unscheduled
    The initial state of a newly created Promise depends on the state of its parent:
    “`
    then(executor: different_executor){|result| result % 5 }.execute
    then{|result| result * 3 }.
    then{|result| result - 10 }.
    p = Concurrent::Promise.fulfill(20).
    “`ruby
    And so on, and so on, and so on…
    “`
    p = Concurrent::Promise.new{10}.then{|x| x * 2}.then{|result| result - 10 }.execute
    “`ruby
    to chained promises.
    on rejection. The result of the each promise is passed as the block argument
    block and an executor, to be executed on fulfillment, and a callable argument to be executed
    Promises can be chained using the `then` method. The `then` method accepts a
    “`
    end
    42
    # do something
    p = Concurrent::Promise.execute do
    “`ruby
    Then create one
    “`
    require ’concurrent/promise’
    “‘ruby
    Start by requiring promises
    ### Examples
    @!macro copy_options
    Promises run on the global thread pool.
    seconds to block.
    block. Any other integer or float value will indicate the maximum number of
    block. If `nil` the call will block indefinitely. If `0` the call will not
    timeout value can be passed to `value` to limit how long the call will
    `value` will block until the promise is either rejected or fulfilled. A
    immediately return the current value. When a promise is pending a call to
    immediately. When a promise is fulfilled a call to `value` will
    operation. When a promise is rejected a call to `value` will return `nil`
    `deref`) method. Obtaining the value of a promise is a potentially blocking
    Retrieving the value of a promise is done through the `value` (alias:
    as can the `#state` method, which returns a symbol.
    `#fulfilled?` can be called at any time to obtain the state of the Promise,
    predicate methods `#unscheduled?`, `#pending?`, `#rejected?`, and
    `reason` will be updated with a reference to the thrown exception. The
    will be updated to reflect the result of the operation. If :rejected the
    :fulfilled, indicating success. If a Promise is :fulfilled its `#value`
    :rejected, indicating that an exception was thrown during processing, or
    :processing is considered `#incomplete?`. A `#complete?` Promise is either
    is complete. A future that is in the :unscheduled, :pending, or
    becomes :processing. The future will remain in this state until processing
    to a thread for processing (often immediately upon `#post`) the state
    :pending. Once a job is pulled from the thread pool’s queue and is given
    :unscheduled. Once the ‘#execute` method is called the state becomes
    `#incomplete?` and `#complete?`. When a Promise is created it is set to
    :processing, :rejected, or :fulfilled. These are also aggregated as
    Promises have several possible states: :unscheduled, :pending,
    will receive the reason.
    When a promise is rejected all its children will be summarily rejected and
    result of each promise is passed to each of its children upon resolution.
    resolution and an optional callable to be executed upon parent failure. The
    method takes two parameters: an optional block to be executed upon parent
    before their children, children before their younger siblings. The `then`
    main thread) but in a strict order: parents are guaranteed to be resolved
    another promise. Promises are resolved asynchronously (with respect to the
    chained using the `then` method. The result of a call to `then` is always
    structure where each promise may have zero or more children. Promises are
    Promises are far more robust, however. Promises can be chained in a tree
    Promises are similar to futures and share many of the same behaviours.
    > completion of an operation.
    > A promise represents the eventual value returned from the single
    and [Promises/A+](promises-aplus.github.io/promises-spec/) specifications.
    Promises are inspired by the JavaScript [Promises/A](wiki.commonjs.org/wiki/Promises/A)

def self.aggregate(method, *promises)

@!macro promise_self_aggregate

returns false.
`true` or executing the composite's `#rescue` handlers if the predicate
then executing the composite's `#then` handlers if the predicate returns
or `one?`) on the collection checking for the success or failure of each,
call the given Ruby `Ennnumerable` predicate (such as `any?`, `all?`, `none?`,
execute the aggregated promises and collect them into a standard Ruby array,
Aggregate a collection of zero or more promises under a composite promise,
def self.aggregate(method, *promises)
  composite = Promise.new do
    completed = promises.collect do |promise|
      promise.execute if promise.unscheduled?
      promise.wait
      promise
    end
    unless completed.empty? || completed.send(method){|promise| promise.fulfilled? }
      raise PromiseExecutionError
    end
  end
  composite
end

def self.all?(*promises)

Returns:
  • (Promise) - an unscheduled (not executed) promise that aggregates

Parameters:
  • promises (Array) -- Zero or more promises to aggregate
def self.all?(*promises)
  aggregate(:all?, *promises)
end

def self.any?(*promises)

@!macro promise_self_aggregate

were not already executed.
fail. Upon execution will execute any of the aggregate promises that
a `Concurrent::PromiseExecutionError` if any of the aggregated promises
if any aggregated promises succeed. Executes the `rescue` handler with
Aggregates a collection of promises and executes the `then` condition
def self.any?(*promises)
  aggregate(:any?, *promises)
end

def self.execute(opts = {}, &block)

Raises:
  • (ArgumentError) - if no block is given

Returns:
  • (Promise) - the newly created `Promise` in the `:pending` state
def self.execute(opts = {}, &block)
  new(opts, &block).execute
end

def self.fulfill(value, opts = {})

Returns:
  • (Promise) - the newly created `Promise`

Raises:
  • (ArgumentError) - if no block is given
def self.fulfill(value, opts = {})
  Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, true, value, nil) }
end

def self.reject(reason, opts = {})

Returns:
  • (Promise) - the newly created `Promise`

Raises:
  • (ArgumentError) - if no block is given
def self.reject(reason, opts = {})
  Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, false, nil, reason) }
end

def self.zip(*promises)

Returns:
  • (Promise) -

Options Hash: (**opts)
  • :execute (Boolean) -- execute promise before returning
  • :executor (Executor) -- when set use the given `Executor` instance.

Parameters:
  • opts (Hash) -- the configuration options
  • promises (Array) --
  • promises (Array) --

Overloads:
  • zip(*promises, opts)
  • zip(*promises)
def self.zip(*promises)
  opts = promises.last.is_a?(::Hash) ? promises.pop.dup : {}
  opts[:executor] ||= ImmediateExecutor.new
  zero = if !opts.key?(:execute) || opts.delete(:execute)
    fulfill([], opts)
  else
    Promise.new(opts) { [] }
  end
  promises.reduce(zero) do |p1, p2|
    p1.flat_map do |results|
      p2.then do |next_result|
        results << next_result
      end
    end
  end
end

def complete(success, value, reason)

@!visibility private
def complete(success, value, reason)
  children_to_notify = synchronize do
    set_state!(success, value, reason)
    @children.dup
  end
  children_to_notify.each { |child| notify_child(child) }
  observers.notify_and_delete_observers{ [Time.now, self.value, reason] }
end

def execute

Returns:
  • (Promise) - a reference to `self`
def execute
  if root?
    if compare_and_set_state(:pending, :unscheduled)
      set_pending
      realize(@promise_body)
    end
  else
    compare_and_set_state(:pending, :unscheduled)
    @parent.execute
  end
  self
end

def fail(reason = StandardError.new)

Raises:
  • (Concurrent::PromiseExecutionError) - if not the root promise
def fail(reason = StandardError.new)
  set { raise reason }
end

def flat_map(&block)

Returns:
  • (Promise) -
def flat_map(&block)
  child = Promise.new(
    parent: self,
    executor: ImmediateExecutor.new,
  )
  on_error { |e| child.on_reject(e) }
  on_success do |result1|
    begin
      inner = block.call(result1)
      inner.execute
      inner.on_success { |result2| child.on_fulfill(result2) }
      inner.on_error { |e| child.on_reject(e) }
    rescue => e
      child.on_reject(e)
    end
  end
  child
end

def initialize(opts = {}, &block)

Other tags:
    See: http://promises-aplus.github.io/promises-spec/ -
    See: http://wiki.commonjs.org/wiki/Promises/A -

Raises:
  • (ArgumentError) - if no block is given

Other tags:
    Yield: - The block operation to be performed asynchronously.

Options Hash: (**opts)
  • :args (object, Array) -- zero or more arguments to be passed
  • :on_reject (Proc) -- rejection handler
  • :on_fulfill (Proc) -- fulfillment handler
  • :parent (Promise) -- the parent `Promise` when building a chain/tree
def initialize(opts = {}, &block)
  opts.delete_if { |k, v| v.nil? }
  super(NULL, opts.merge(__promise_body_from_block__: block), &nil)
end

def notify_child(child)

@!visibility private
def notify_child(child)
  if_state(:fulfilled) { child.on_fulfill(apply_deref_options(@value)) }
  if_state(:rejected) { child.on_reject(@reason) }
end

def ns_initialize(value, opts)

def ns_initialize(value, opts)
  super
  @executor = Options.executor_from_options(opts) || Concurrent.global_io_executor
  @args = get_arguments_from(opts)
  @parent = opts.fetch(:parent) { nil }
  @on_fulfill = opts.fetch(:on_fulfill) { Proc.new { |result| result } }
  @on_reject = opts.fetch(:on_reject) { Proc.new { |reason| raise reason } }
  @promise_body = opts[:__promise_body_from_block__] || Proc.new { |result| result }
  @state = :unscheduled
  @children = []
end

def on_fulfill(result)

@!visibility private
def on_fulfill(result)
  realize Proc.new { @on_fulfill.call(result) }
  nil
end

def on_reject(reason)

@!visibility private
def on_reject(reason)
  realize Proc.new { @on_reject.call(reason) }
  nil
end

def on_success(&block)

Returns:
  • (Promise) - self

Other tags:
    Yield: - The block to execute
def on_success(&block)
  raise ArgumentError.new('no block given') unless block_given?
  self.then(&block)
end

def realize(task)

@!visibility private
def realize(task)
  @executor.post do
    success, value, reason = SafeTaskExecutor.new(task, rescue_exception: true).execute(*@args)
    complete(success, value, reason)
  end
end

def rescue(&block)

Returns:
  • (Promise) - self

Other tags:
    Yield: - The block to execute
def rescue(&block)
  self.then(block)
end

def root? # :nodoc:

:nodoc:
@!visibility private
def root? # :nodoc:
  @parent.nil?
end

def set(value = NULL, &block)

Raises:
  • (Concurrent::PromiseExecutionError) - if not the root promise
def set(value = NULL, &block)
  raise PromiseExecutionError.new('supported only on root promise') unless root?
  check_for_block_or_value!(block_given?, value)
  synchronize do
    if @state != :unscheduled
      raise MultipleAssignmentError
    else
      @promise_body = block || Proc.new { |result| value }
    end
  end
  execute
end

def set_pending

@!visibility private
def set_pending
  synchronize do
    @state = :pending
    @children.each { |c| c.set_pending }
  end
end

def set_state!(success, value, reason)

@!visibility private
def set_state!(success, value, reason)
  set_state(success, value, reason)
  event.set
end

def synchronized_set_state!(success, value, reason)

@!visibility private
def synchronized_set_state!(success, value, reason)
  synchronize { set_state!(success, value, reason) }
end

def then(*args, &block)

Parameters:
  • executor (ThreadPool) -- An optional thread pool executor to be used
  • rescuer (Proc) -- An optional rescue block to be executed if the
  • executor (ThreadPool) -- An optional thread pool executor to be used
  • rescuer (Proc) -- An optional rescue block to be executed if the

Overloads:
  • then(rescuer, executor: executor, &block)
  • then(rescuer, executor, &block)

Other tags:
    Yield: - The block operation to be performed asynchronously.

Returns:
  • (Promise) - the new promise
def then(*args, &block)
  if args.last.is_a?(::Hash)
    executor = args.pop[:executor]
    rescuer = args.first
  else
    rescuer, executor = args
  end
  executor ||= @executor
  raise ArgumentError.new('rescuers and block are both missing') if rescuer.nil? && !block_given?
  block = Proc.new { |result| result } unless block_given?
  child = Promise.new(
    parent: self,
    executor: executor,
    on_fulfill: block,
    on_reject: rescuer
  )
  synchronize do
    child.state = :pending if @state == :pending
    child.on_fulfill(apply_deref_options(@value)) if @state == :fulfilled
    child.on_reject(@reason) if @state == :rejected
    @children << child
  end
  child
end

def zip(*others)

Returns:
  • (Promise) -

Options Hash: (**opts)
  • :execute (Boolean) -- execute promise before returning
  • :executor (Executor) -- when set use the given `Executor` instance.

Parameters:
  • opts (Hash) -- the configuration options
  • others (Array) --
  • others (Array) --

Overloads:
  • zip(*promises, opts)
  • zip(*promises)
def zip(*others)
  self.class.zip(self, *others)
end