class Concurrent::Agent
@return [Fixnum] the maximum number of seconds before an update is cancelled
@!attribute [r] timeout
score.value #=> 170
sleep(0.1)
score << proc{|current| current - 50 }
score.value #=> 220
sleep(0.1)
score << proc{|current| current * 2 }
score.value #=> 110
sleep(0.1)
score << proc{|current| current + 100 }
score.value #=> 10
score = Concurrent::Agent.new(10)
@example Basic usage
A good example of an agent is a shared incrementing counter, such as the score in a video game.
will become the new value of the agent. Agents support two error handling modes: fail and continue.
will receive the current value of the agent as its sole parameter. The return value of the block
the global thread pool. Consumers can #post code blocks to the agent. The code block (function)
of the agent can be requested at any time (#deref). Each agent has a work queue and operates on
An agent is a single atomic value that represents an identity. The current value
def <<(block)
- Yieldreturn: - the new value
Other tags:
- Yieldparam: value - the current value
Other tags:
- Yield: - the operation to be performed with the current value in order to calculate
def <<(block) self.post(&block) self end
def add_observer(observer, func=:update)
def add_observer(observer, func=:update) @observers.add_observer(observer, func) end
def delete_observer(observer)
def delete_observer(observer) @observers.delete_observer(observer) end
def initialize(initial, opts = {})
(**opts)-
:copy_on_deref(String) -- call the given +Proc+ passing the internal value and -
:freeze_on_deref(String) -- call +#freeze+ before returning the data -
:dup_on_deref(String) -- call +#dup+ before returning the data -
:timeout(Fixnum) -- maximum number of seconds before an update is cancelled
Parameters:
-
opts(Hash) -- the options used to define the behavior at update and deref -
initial(Object) -- the initial value
def initialize(initial, opts = {}) @value = initial @rescuers = [] @validator = Proc.new { |result| true } @timeout = opts.fetch(:timeout, TIMEOUT).freeze @observers = CopyOnWriteObserverSet.new init_mutex set_deref_options(opts) end
def post(&block)
- Yieldreturn: - the new value
Other tags:
- Yieldparam: value - the current value
Other tags:
- Yield: - the operation to be performed with the current value in order to calculate
def post(&block) Agent.thread_pool.post{ work(&block) } unless block.nil? end
def rescue(clazz = StandardError, &block)
- Yieldparam: ex - the caught exception
Other tags:
- Yield: - the block to be called when a matching exception is caught
Parameters:
-
clazz(Exception) -- the class of exception to catch
def rescue(clazz = StandardError, &block) unless block.nil? mutex.synchronize do @rescuers << Rescuer.new(clazz, block) end end self end
def try_rescue(ex) # :nodoc:
@!visibility private
def try_rescue(ex) # :nodoc: rescuer = mutex.synchronize do @rescuers.find{|r| ex.is_a?(r.clazz) } end rescuer.block.call(ex) if rescuer rescue Exception => ex # supress end
def validate(&block)
- Yieldreturn: - true if the value is valid else false
Other tags:
- Yieldparam: value - the result of the last update operation
Other tags:
- Yield: - the block to be called after every update operation to determine if
def validate(&block) @validator = block unless block.nil? self end
def work(&handler) # :nodoc:
@!visibility private
def work(&handler) # :nodoc: begin should_notify = false mutex.synchronize do result = Concurrent::timeout(@timeout) do handler.call(@value) end if @validator.call(result) @value = result should_notify = true end end time = Time.now @observers.notify_observers{ [time, self.value] } if should_notify rescue Exception => ex try_rescue(ex) end end