class Concurrent::Agent

@see clojure.org/state Values and Change - Clojure’s approach to Identity and State
@see clojure.org/Agents Clojure Agents
@!macro thread_safe_variable_comparison
hopelessly deadlock the Agent with no possibility of recovery.
Calling either {#await} or {#wait} (with a timeout of ‘nil`) will
block/proc/lambda. The call will block the Agent and will always fail.
({#await}, {#await_for}, {#await_for!}, and {#wait}) from within an action
NOTE Never, *under any circumstances*, call any of the “await” methods
@!macro agent_await_warning
“`
end
3.14
agent.send {|v| v + 42 }
agent.send do |value|
agent = Concurrent::Agent.new(0)
“`
Over this:
“`
agent.value #=> 45.14
end
3.14
this.send {|v| v + 42 }
agent.send(agent) do |value, this|
agent = Concurrent::Agent.new(0)
“`
Prefer this:
Ruby to more effectively manage the closing scope.
dispatching nested actions is to pass the Agent as an argument. This allows
in this context will not reference the Agent. The preferred method for
cannot be post using `self` from within the action block/proc/lambda; `self`
scope or by passing the reference in as a “send” argument. Nested actions
Nested actions can be called using the Agent reference from the enclosing
outer action.
with one another and a failure in a nested action will never affect the
with action dispatches from other threads. Nested actions never deadlock
outer action completes, in the order they were sent, possibly interleaved
The nested actions will be enqueued normally then processed after the
It is possible for an Agent action to post further actions back to itself.
## Nested Actions
value.
This is not an error. It simply means that the action returned the same
action completed. Note that `old_value` and `new_value` may be the same.
processing. The `new_value` is the value to which the Agent was set when the
occurred. The `old_value` is the value of the Agent when the action began
and `new_value`. The `time` argument is the time at which the value change
When notified the observer will receive three arguments: `time`, `old_value`,
action raises an exception, if validation fails, or when a {#restart} occurs.
the new value is successfully validated. Observation will not occur if the
Notification of observers occurs every time an action dispatch returns and
Agents support observers through the {Concurrent::Observable} mixin module.
## Observation
“`
agent.value #=> [0, 1, 1, 2, 3, 5, 8]
# get the current value
agent.await
# wait for them to complete
end
agent.send{|set| next_fibonacci(set) }
5.times do
# send a few update requests
agent = Concurrent::Agent.new(next_fibonacci)
# create an agent with an initial value
end
set + [set.reduce{|sum,x| sum + x }]
return [0, 1] if set.nil?
def next_fibonacci(set = nil)
“`
## Example
Unlike in Clojure, `Agent` cannot participate in `Concurrent::TVar` transactions.
appropriate for actions that may block on IO.
be used for actions that are CPU limited, while the `#send_off` method is
dispatched to the same agent from other sources. The `#send` method should
occur in the order they were sent, potentially interleaved with actions
Actions dispatched to an agent from another single agent or thread will
At any point in time, at most one action for each Agent is being executed.
The actions of all Agents get interleaved amongst threads in a thread pool.
examined with `#error` and the agent restarted with `#restart`.
an exception, until the agent’s errors are cleared. Agent errors can be
Agent has errors cached, any subsequent interactions will immediately throw
will occur, and the exception will be cached in the Agent itself. When an
If any exceptions are thrown by an action function, no nested dispatches
has been changed.
or indirectly), they will be held until after the ‘#value` of the Agent
5. If during the `action` execution any other dispatches are made (directly
`#add_observer` for details.
4. If any observers were added to the Agent, they will be notified. See
`#initialize` for details.
of the given `action` will become the new `#value` of the Agent. See
3. If the validator succeeds or if no validator was given, the return value
if one has been set on the Agent.
2. The return value of `action` will be passed to the validator lambda,
`args`, if any were supplied.
1. The given `action` will be applied to the state of the Agent and the
the following will happen:
methods always return immediately. At some point later, in another thread,
Agent action dispatches are made using the various `#send` methods. These
cooperation or coordination.
any thread without any messages, i.e. observation does not require
and the `#value` of an Agent is always immediately available for reading by
and no blocking receive. The state of an Agent should be itself immutable
Agents are reactive, not autonomous - there is no imperative message loop
prevents parity.
functionally equivalent to Clojure’s agent, except where the runtime
of an update does not need to be known immediately. ‘Agent` is (mostly)
the value will undergo frequent, complex updates. Suitable when the result
uncoordinated, asynchronous change of individual values. Best used when
function. An agent is a shared, mutable variable providing independent,
`Agent` is inspired by Clojure’s [agent](clojure.org/agents)

def <<(action)

Other tags:
    See: #send_off -

Returns:
  • (Concurrent::Agent) - self

Parameters:
  • action (Proc) -- the action dispatch to be enqueued
def <<(action)
  send_off(&action)
  self
end

def await

Returns:
  • (Boolean) - self
def await
  wait(nil)
  self
end

def await(*agents)

Returns:
  • (Boolean) - true

Parameters:
  • agents (Array) -- the Agents on which to wait
def await(*agents)
  agents.each { |agent| agent.await }
  true
end

def await_for(timeout)

Returns:
  • (Boolean) - true if all actions complete before timeout else false

Parameters:
  • timeout (Float) -- the maximum number of seconds to wait
def await_for(timeout)
  wait(timeout.to_f)
end

def await_for(timeout, *agents)

Returns:
  • (Boolean) - true if all actions complete before timeout else false

Parameters:
  • agents (Array) -- the Agents on which to wait
  • timeout (Float) -- the maximum number of seconds to wait
def await_for(timeout, *agents)
  end_at = Concurrent.monotonic_time + timeout.to_f
  ok     = agents.length.times do |i|
    break false if (delay = end_at - Concurrent.monotonic_time) < 0
    break false unless agents[i].await_for(delay)
  end
  !!ok
end

def await_for!(timeout)

Raises:
  • (Concurrent::TimeoutError) - when timeout is reached

Returns:
  • (Boolean) - true if all actions complete before timeout

Parameters:
  • timeout (Float) -- the maximum number of seconds to wait
def await_for!(timeout)
  raise Concurrent::TimeoutError unless wait(timeout.to_f)
  true
end

def await_for!(timeout, *agents)

Raises:
  • (Concurrent::TimeoutError) - when timeout is reached

Returns:
  • (Boolean) - true if all actions complete before timeout

Parameters:
  • agents (Array) -- the Agents on which to wait
  • timeout (Float) -- the maximum number of seconds to wait
def await_for!(timeout, *agents)
  raise Concurrent::TimeoutError unless await_for(timeout, *agents)
  true
end

def enqueue_action_job(action, args, executor)

def enqueue_action_job(action, args, executor)
  raise ArgumentError.new('no action given') unless action
  job = Job.new(action, args, executor, @caller.value || Thread.current.object_id)
  synchronize { ns_enqueue_job(job) }
end

def enqueue_await_job(latch)

def enqueue_await_job(latch)
  synchronize do
    if (index = ns_find_last_job_for_thread)
      job = Job.new(AWAIT_ACTION, [latch], Concurrent.global_immediate_executor,
                    Thread.current.object_id)
      ns_enqueue_job(job, index+1)
    else
      latch.count_down
      true
    end
  end
end

def error

Returns:
  • (nil, Error) - the error which caused the failure when {#failed?}
def error
  @error.value
end

def execute_next_job

def execute_next_job
  job       = synchronize { @queue.first }
  old_value = @current.value
  @caller.value = job.caller # for nested actions
  new_value     = job.action.call(old_value, *job.args)
  @caller.value = nil
  return if new_value == AWAIT_FLAG
  if ns_validate(new_value)
    @current.value = new_value
    observers.notify_observers(Time.now, old_value, new_value)
  else
    handle_error(ValidationError.new)
  end
rescue => error
  handle_error(error)
ensure
  synchronize do
    @queue.shift
    unless failed? || @queue.empty?
      ns_post_next_job
    end
  end
end

def failed?

Other tags:
    See: #restart -
def failed?
  !@error.value.nil?
end

def handle_error(error)

def handle_error(error)
  # stop new jobs from posting
  @error.value = error if @error_mode == :fail
  @error_handler.call(self, error)
rescue
  # do nothing
end

def initialize(initial, opts = {})

Options Hash: (**opts)
  • :validator (nil, Proc) -- the (optional) validation procedure
  • :error_handler (nil, Proc) -- the (optional) error handler
  • :error_mode (Symbol) -- either `:continue` or `:fail`

Parameters:
  • opts (Hash) -- the configuration options
  • initial (Object) -- the initial value
def initialize(initial, opts = {})
  super()
  synchronize { ns_initialize(initial, opts) }
end

def ns_enqueue_job(job, index = nil)

def ns_enqueue_job(job, index = nil)
  # a non-nil index means this is an await job
  return false if index.nil? && failed?
  index ||= @queue.length
  @queue.insert(index, job)
  # if this is the only job, post to executor
  ns_post_next_job if @queue.length == 1
  true
end

def ns_find_last_job_for_thread

def ns_find_last_job_for_thread
  @queue.rindex { |job| job.caller == Thread.current.object_id }
end

def ns_initialize(initial, opts)

def ns_initialize(initial, opts)
  @error_mode    = opts[:error_mode]
  @error_handler = opts[:error_handler]
  if @error_mode && !ERROR_MODES.include?(@error_mode)
    raise ArgumentError.new('unrecognized error mode')
  elsif @error_mode.nil?
    @error_mode = @error_handler ? :continue : :fail
  end
  @error_handler ||= DEFAULT_ERROR_HANDLER
  @validator     = opts.fetch(:validator, DEFAULT_VALIDATOR)
  @current       = Concurrent::AtomicReference.new(initial)
  @error         = Concurrent::AtomicReference.new(nil)
  @caller        = Concurrent::ThreadLocalVar.new(nil)
  @queue         = []
  self.observers = Collection::CopyOnNotifyObserverSet.new
end

def ns_post_next_job

def ns_post_next_job
  @queue.first.executor.post { execute_next_job }
end

def ns_validate(value)

def ns_validate(value)
  @validator.call(value)
rescue
  false
end

def restart(new_value, opts = {})

Raises:
  • (Concurrent:AgentError) - when not failed

Returns:
  • (Boolean) - true

Options Hash: (**opts)
  • :clear_actions (Symbol) -- true if all enqueued but unprocessed

Parameters:
  • opts (Hash) -- the configuration options
  • new_value (Object) -- the new value for the Agent once restarted
def restart(new_value, opts = {})
  clear_actions = opts.fetch(:clear_actions, false)
  synchronize do
    raise Error.new('agent is not failed') unless failed?
    raise ValidationError unless ns_validate(new_value)
    @current.value = new_value
    @error.value   = nil
    @queue.clear if clear_actions
    ns_post_next_job unless @queue.empty?
  end
  true
end

def send(*args, &action)

Returns:
  • (Boolean) - true if the action is successfully enqueued, false if

Other tags:
    Yieldreturn: - the new value of the Agent

Other tags:
    Yieldparam: args - zero or more arguments to pass to the
    Yieldparam: value - the current {#value} of the Agent

Other tags:
    Yield: - process the old value and return the new

Parameters:
  • action (Proc) -- the action dispatch to be enqueued
  • args (Array) -- zero or more arguments to be passed to
    def send(*args, &action)
      enqueue_action_job(action, args, Concurrent.global_fast_executor)
    end

    def send!(*args, &action)

    Raises:
    • (Concurrent::Agent::Error) - if the Agent is {#failed?}

    Returns:
    • (Boolean) - true if the action is successfully enqueued
    def send!(*args, &action)
      raise Error.new unless send(*args, &action)
      true
    end

    def send_off(*args, &action)

    @!macro send_return
    @!macro agent_send
    def send_off(*args, &action)
      enqueue_action_job(action, args, Concurrent.global_io_executor)
    end

    def send_off!(*args, &action)

    @!macro send_bang_return_and_raise
    @!macro agent_send
    def send_off!(*args, &action)
      raise Error.new unless send_off(*args, &action)
      true
    end

    def send_via(executor, *args, &action)

    Parameters:
    • executor (Concurrent::ExecutorService) -- the executor on which the
    def send_via(executor, *args, &action)
      enqueue_action_job(action, args, executor)
    end

    def send_via!(executor, *args, &action)

    Parameters:
    • executor (Concurrent::ExecutorService) -- the executor on which the
    def send_via!(executor, *args, &action)
      raise Error.new unless send_via(executor, *args, &action)
      true
    end

    def value

    Returns:
    • (Object) - the current value
    def value
      @current.value # TODO (pitr 12-Sep-2015): broken unsafe read?
    end

    def wait(timeout = nil)

    Returns:
    • (Boolean) - true if all actions complete before timeout else false

    Parameters:
    • timeout (Float) -- the maximum number of seconds to wait
    def wait(timeout = nil)
      latch = Concurrent::CountDownLatch.new(1)
      enqueue_await_job(latch)
      latch.wait(timeout)
    end