class Concurrent::Agent
@return [Fixnum] the maximum number of seconds before an update is cancelled
@!attribute [r] timeout
score.value #=> 170
sleep(0.1)
score << proc{|current| current - 50 }
score.value #=> 220
sleep(0.1)
score << proc{|current| current * 2 }
score.value #=> 110
sleep(0.1)
score << proc{|current| current + 100 }
score.value #=> 10
score = Concurrent::Agent.new(10)
@example Basic usage
A good example of an agent is a shared incrementing counter, such as the score in a video game.
will become the new value of the agent. Agents support two error handling modes: fail and continue.
will receive the current value of the agent as its sole parameter. The return value of the block
the global thread pool. Consumers can ‘#post` code blocks to the agent. The code block (function)
of the agent can be requested at any time (`#deref`). Each agent has a work queue and operates on
An agent is a single atomic value that represents an identity. The current value
def <<(block)
- Yieldreturn: - the new value
Other tags:
- Yieldparam: value - the current value
Other tags:
- Yield: - the operation to be performed with the current value in order to calculate
def <<(block) post(&block) self end
def await(timeout = nil)
-
(Boolean)- false on timeout, true otherwise
Parameters:
-
timeout(Numeric) -- the maximum time in second to wait.
def await(timeout = nil) done = Event.new post { |val| done.set; val } done.wait timeout end
def initialize(initial, opts = {})
(**opts)-
:copy_on_deref(String) -- call the given `Proc` passing the internal value and -
:freeze_on_deref(String) -- call `#freeze` before returning the data -
:dup_on_deref(String) -- call `#dup` before returning the data -
:executor(object) -- when provided will run all operations on -
:operation(Boolean) -- when `true` will execute the future on the global -
:timeout(Fixnum) -- maximum number of seconds before an update is cancelled
Parameters:
-
opts(Hash) -- the options used to define the behavior at update and deref -
initial(Object) -- the initial value
def initialize(initial, opts = {}) @value = initial @rescuers = [] @validator = Proc.new { |result| true } @timeout = opts.fetch(:timeout, TIMEOUT).freeze self.observers = CopyOnWriteObserverSet.new @one_by_one = OneByOne.new @task_executor = OptionsParser.get_task_executor_from(opts) @operation_executor = OptionsParser.get_operation_executor_from(opts) init_mutex set_deref_options(opts) end
def post(&block)
-
(true, nil)- nil when no block is given
Other tags:
- Yieldreturn: - the new value
Other tags:
- Yieldparam: value - the current value
Other tags:
- Yield: - the operation to be performed with the current value in order to calculate
def post(&block) post_on(@task_executor, &block) end
def post_off(&block)
-
(true, nil)- nil when no block is given
Other tags:
- Yieldreturn: - the new value
Other tags:
- Yieldparam: value - the current value
Other tags:
- Yield: - the operation to be performed with the current value in order to calculate
def post_off(&block) post_on(@operation_executor, &block) end
def post_on(executor, &block)
def post_on(executor, &block) return nil if block.nil? @one_by_one.post(executor) { work(&block) } true end
def rescue(clazz = StandardError, &block)
- Yieldparam: ex - the caught exception
Other tags:
- Yield: - the block to be called when a matching exception is caught
Parameters:
-
clazz(Exception) -- the class of exception to catch
def rescue(clazz = StandardError, &block) unless block.nil? mutex.synchronize do @rescuers << Rescuer.new(clazz, block) end end self end
def try_rescue(ex) # :nodoc:
@!visibility private
def try_rescue(ex) # :nodoc: rescuer = mutex.synchronize do @rescuers.find { |r| ex.is_a?(r.clazz) } end rescuer.block.call(ex) if rescuer rescue Exception => ex # supress end
def validate(&block)
- Yieldreturn: - true if the value is valid else false
Other tags:
- Yieldparam: value - the result of the last update operation
Other tags:
- Yield: - the block to be called after every update operation to determine if
def validate(&block) unless block.nil? mutex.lock @validator = block mutex.unlock end self end
def work(&handler) # :nodoc:
@!visibility private
def work(&handler) # :nodoc: validator, value = mutex.synchronize { [@validator, @value] } begin # FIXME creates second thread result, valid = Concurrent::timeout(@timeout) do result = handler.call(value) [result, validator.call(result)] end rescue Exception => ex exception = ex end mutex.lock should_notify = if !exception && valid @value = result true end mutex.unlock if should_notify time = Time.now observers.notify_observers { [time, self.value] } end try_rescue(exception) end