class Concurrent::Agent
@see clojure.org/state Values and Change - Clojure’s approach to Identity and State
@see clojure.org/Agents Clojure Agents
@!macro thread_safe_variable_comparison
hopelessly deadlock the Agent with no possibility of recovery.
Calling either {#await} or {#wait} (with a timeout of ‘nil`) will
block/proc/lambda. The call will block the Agent and will always fail.
({#await}, {#await_for}, {#await_for!}, and {#wait}) from within an action
NOTE Never, *under any circumstances*, call any of the “await” methods
@!macro agent_await_warning
“`
end
3.14
agent.send {|v| v + 42 }
agent.send do |value|
agent = Concurrent::Agent.new(0)
“`
Over this:
“`
agent.value #=> 45.14
end
3.14
this.send {|v| v + 42 }
agent.send(agent) do |value, this|
agent = Concurrent::Agent.new(0)
“`
Prefer this:
Ruby to more effectively manage the closing scope.
dispatching nested actions is to pass the Agent as an argument. This allows
in this context will not reference the Agent. The preferred method for
cannot be post using `self` from within the action block/proc/lambda; `self`
scope or by passing the reference in as a “send” argument. Nested actions
Nested actions can be called using the Agent reference from the enclosing
outer action.
with one another and a failure in a nested action will never affect the
with action dispatches from other threads. Nested actions never deadlock
outer action completes, in the order they were sent, possibly interleaved
The nested actions will be enqueued normally then processed after the
It is possible for an Agent action to post further actions back to itself.
## Nested Actions
value.
This is not an error. It simply means that the action returned the same
action completed. Note that `old_value` and `new_value` may be the same.
processing. The `new_value` is the value to which the Agent was set when the
occurred. The `old_value` is the value of the Agent when the action began
and `new_value`. The `time` argument is the time at which the value change
When notified the observer will receive three arguments: `time`, `old_value`,
action raises an exception, if validation fails, or when a {#restart} occurs.
the new value is successfully validated. Observation will not occur if the
Notification of observers occurs every time an action dispatch returns and
Agents support observers through the {Concurrent::Observable} mixin module.
## Observation
“`
agent.value #=> [0, 1, 1, 2, 3, 5, 8]
# get the current value
agent.await
# wait for them to complete
end
agent.send{|set| next_fibonacci(set) }
5.times do
# send a few update requests
agent = Concurrent::Agent.new(next_fibonacci)
# create an agent with an initial value
end
set + [set.reduce{|sum,x| sum + x }]
return [0, 1] if set.nil?
def next_fibonacci(set = nil)
“`
## Example
Unlike in Clojure, `Agent` cannot participate in `Concurrent::TVar` transactions.
appropriate for actions that may block on IO.
be used for actions that are CPU limited, while the `#send_off` method is
dispatched to the same agent from other sources. The `#send` method should
occur in the order they were sent, potentially interleaved with actions
Actions dispatched to an agent from another single agent or thread will
At any point in time, at most one action for each Agent is being executed.
The actions of all Agents get interleaved amongst threads in a thread pool.
examined with `#error` and the agent restarted with `#restart`.
an exception, until the agent’s errors are cleared. Agent errors can be
Agent has errors cached, any subsequent interactions will immediately throw
will occur, and the exception will be cached in the Agent itself. When an
If any exceptions are thrown by an action function, no nested dispatches
has been changed.
or indirectly), they will be held until after the ‘#value` of the Agent
5. If during the `action` execution any other dispatches are made (directly
`#add_observer` for details.
4. If any observers were added to the Agent, they will be notified. See
`#initialize` for details.
of the given `action` will become the new `#value` of the Agent. See
3. If the validator succeeds or if no validator was given, the return value
if one has been set on the Agent.
2. The return value of `action` will be passed to the validator lambda,
`args`, if any were supplied.
1. The given `action` will be applied to the state of the Agent and the
the following will happen:
methods always return immediately. At some point later, in another thread,
Agent action dispatches are made using the various `#send` methods. These
cooperation or coordination.
any thread without any messages, i.e. observation does not require
and the `#value` of an Agent is always immediately available for reading by
and no blocking receive. The state of an Agent should be itself immutable
Agents are reactive, not autonomous - there is no imperative message loop
prevents parity.
functionally equivalent to Clojure’s agent, except where the runtime
of an update does not need to be known immediately. ‘Agent` is (mostly)
the value will undergo frequent, complex updates. Suitable when the result
uncoordinated, asynchronous change of individual values. Best used when
function. An agent is a shared, mutable variable providing independent,
`Agent` is inspired by Clojure’s [agent](clojure.org/agents)
def <<(action)
- See: #send_off -
Returns:
-
(Concurrent::Agent)
- self
Parameters:
-
action
(Proc
) -- the action dispatch to be enqueued
def <<(action) send_off(&action) self end
def await
-
(Boolean)
- self
def await wait(nil) self end
def await(*agents)
-
(Boolean)
- true
Parameters:
-
agents
(Array
) -- the Agents on which to wait
def await(*agents) agents.each { |agent| agent.await } true end
def await_for(timeout)
-
(Boolean)
- true if all actions complete before timeout else false
Parameters:
-
timeout
(Float
) -- the maximum number of seconds to wait
def await_for(timeout) wait(timeout.to_f) end
def await_for(timeout, *agents)
-
(Boolean)
- true if all actions complete before timeout else false
Parameters:
-
agents
(Array
) -- the Agents on which to wait -
timeout
(Float
) -- the maximum number of seconds to wait
def await_for(timeout, *agents) end_at = Concurrent.monotonic_time + timeout.to_f ok = agents.length.times do |i| break false if (delay = end_at - Concurrent.monotonic_time) < 0 break false unless agents[i].await_for(delay) end !!ok end
def await_for!(timeout)
-
(Concurrent::TimeoutError)
- when timeout is reached
Returns:
-
(Boolean)
- true if all actions complete before timeout
Parameters:
-
timeout
(Float
) -- the maximum number of seconds to wait
def await_for!(timeout) raise Concurrent::TimeoutError unless wait(timeout.to_f) true end
def await_for!(timeout, *agents)
-
(Concurrent::TimeoutError)
- when timeout is reached
Returns:
-
(Boolean)
- true if all actions complete before timeout
Parameters:
-
agents
(Array
) -- the Agents on which to wait -
timeout
(Float
) -- the maximum number of seconds to wait
def await_for!(timeout, *agents) raise Concurrent::TimeoutError unless await_for(timeout, *agents) true end
def enqueue_action_job(action, args, executor)
def enqueue_action_job(action, args, executor) raise ArgumentError.new('no action given') unless action job = Job.new(action, args, executor, @caller.value || Thread.current.object_id) synchronize { ns_enqueue_job(job) } end
def enqueue_await_job(latch)
def enqueue_await_job(latch) synchronize do if (index = ns_find_last_job_for_thread) job = Job.new(AWAIT_ACTION, [latch], Concurrent.global_immediate_executor, Thread.current.object_id) ns_enqueue_job(job, index+1) else latch.count_down true end end end
def error
-
(nil, Error)
- the error which caused the failure when {#failed?}
def error @error.value end
def execute_next_job
def execute_next_job job = synchronize { @queue.first } old_value = @current.value @caller.value = job.caller # for nested actions new_value = job.action.call(old_value, *job.args) @caller.value = nil return if new_value == AWAIT_FLAG if ns_validate(new_value) @current.value = new_value observers.notify_observers(Time.now, old_value, new_value) else handle_error(ValidationError.new) end rescue => error handle_error(error) ensure synchronize do @queue.shift unless failed? || @queue.empty? ns_post_next_job end end end
def failed?
- See: #restart -
def failed? !@error.value.nil? end
def handle_error(error)
def handle_error(error) # stop new jobs from posting @error.value = error if @error_mode == :fail @error_handler.call(self, error) rescue # do nothing end
def initialize(initial, opts = {})
(**opts)
-
:validator
(nil, Proc
) -- the (optional) validation procedure -
:error_handler
(nil, Proc
) -- the (optional) error handler -
:error_mode
(Symbol
) -- either `:continue` or `:fail`
Parameters:
-
opts
(Hash
) -- the configuration options -
initial
(Object
) -- the initial value
def initialize(initial, opts = {}) super() synchronize { ns_initialize(initial, opts) } end
def ns_enqueue_job(job, index = nil)
def ns_enqueue_job(job, index = nil) # a non-nil index means this is an await job return false if index.nil? && failed? index ||= @queue.length @queue.insert(index, job) # if this is the only job, post to executor ns_post_next_job if @queue.length == 1 true end
def ns_find_last_job_for_thread
def ns_find_last_job_for_thread @queue.rindex { |job| job.caller == Thread.current.object_id } end
def ns_initialize(initial, opts)
def ns_initialize(initial, opts) @error_mode = opts[:error_mode] @error_handler = opts[:error_handler] if @error_mode && !ERROR_MODES.include?(@error_mode) raise ArgumentError.new('unrecognized error mode') elsif @error_mode.nil? @error_mode = @error_handler ? :continue : :fail end @error_handler ||= DEFAULT_ERROR_HANDLER @validator = opts.fetch(:validator, DEFAULT_VALIDATOR) @current = Concurrent::AtomicReference.new(initial) @error = Concurrent::AtomicReference.new(nil) @caller = Concurrent::ThreadLocalVar.new(nil) @queue = [] self.observers = Collection::CopyOnNotifyObserverSet.new end
def ns_post_next_job
def ns_post_next_job @queue.first.executor.post { execute_next_job } end
def ns_validate(value)
def ns_validate(value) @validator.call(value) rescue false end
def restart(new_value, opts = {})
-
(Concurrent:AgentError)
- when not failed
Returns:
-
(Boolean)
- true
Options Hash:
(**opts)
-
:clear_actions
(Symbol
) -- true if all enqueued but unprocessed
Parameters:
-
opts
(Hash
) -- the configuration options -
new_value
(Object
) -- the new value for the Agent once restarted
def restart(new_value, opts = {}) clear_actions = opts.fetch(:clear_actions, false) synchronize do raise Error.new('agent is not failed') unless failed? raise ValidationError unless ns_validate(new_value) @current.value = new_value @error.value = nil @queue.clear if clear_actions ns_post_next_job unless @queue.empty? end true end
def send(*args, &action)
-
(Boolean)
- true if the action is successfully enqueued, false if
Other tags:
- Yieldreturn: - the new value of the Agent
Other tags:
- Yieldparam: args - zero or more arguments to pass to the
Yieldparam: value - the current {#value} of the Agent
Other tags:
- Yield: - process the old value and return the new
Parameters:
-
action
(Proc
) -- the action dispatch to be enqueued -
args
(Array
) -- zero or more arguments to be passed to
def send(*args, &action) enqueue_action_job(action, args, Concurrent.global_fast_executor) end
def send!(*args, &action)
-
(Concurrent::Agent::Error)
- if the Agent is {#failed?}
Returns:
-
(Boolean)
- true if the action is successfully enqueued
def send!(*args, &action) raise Error.new unless send(*args, &action) true end
def send_off(*args, &action)
@!macro agent_send
def send_off(*args, &action) enqueue_action_job(action, args, Concurrent.global_io_executor) end
def send_off!(*args, &action)
@!macro agent_send
def send_off!(*args, &action) raise Error.new unless send_off(*args, &action) true end
def send_via(executor, *args, &action)
-
executor
(Concurrent::ExecutorService
) -- the executor on which the
def send_via(executor, *args, &action) enqueue_action_job(action, args, executor) end
def send_via!(executor, *args, &action)
-
executor
(Concurrent::ExecutorService
) -- the executor on which the
def send_via!(executor, *args, &action) raise Error.new unless send_via(executor, *args, &action) true end
def value
-
(Object)
- the current value
def value @current.value # TODO (pitr 12-Sep-2015): broken unsafe read? end
def wait(timeout = nil)
-
(Boolean)
- true if all actions complete before timeout else false
Parameters:
-
timeout
(Float
) -- the maximum number of seconds to wait
def wait(timeout = nil) latch = Concurrent::CountDownLatch.new(1) enqueue_await_job(latch) latch.wait(timeout) end