class Concurrent::Agent

@return [Fixnum] the maximum number of seconds before an update is cancelled
@!attribute [r] timeout
score.value #=> 170
sleep(0.1)
score << proc{|current| current - 50 }
score.value #=> 220
sleep(0.1)
score << proc{|current| current * 2 }
score.value #=> 110
sleep(0.1)
score << proc{|current| current + 100 }
score.value #=> 10
score = Concurrent::Agent.new(10)
@example Basic usage
A good example of an agent is a shared incrementing counter, such as the score in a video game.
will become the new value of the agent. Agents support two error handling modes: fail and continue.
will receive the current value of the agent as its sole parameter. The return value of the block
the global thread pool. Consumers can #post code blocks to the agent. The code block (function)
of the agent can be requested at any time (#deref). Each agent has a work queue and operates on
An agent is a single atomic value that represents an identity. The current value

def <<(block)

Other tags:
    Yieldreturn: - the new value

Other tags:
    Yieldparam: value - the current value

Other tags:
    Yield: - the operation to be performed with the current value in order to calculate
def <<(block)
  self.post(&block)
  self
end

def add_observer(observer, func=:update)

def add_observer(observer, func=:update)
  @observers.add_observer(observer, func)
end

def delete_observer(observer)

def delete_observer(observer)
  @observers.delete_observer(observer)
end

def initialize(initial, opts = {})

Options Hash: (**opts)
  • :copy_on_deref (String) -- call the given +Proc+ passing the internal value and
  • :freeze_on_deref (String) -- call +#freeze+ before returning the data
  • :dup_on_deref (String) -- call +#dup+ before returning the data
  • :timeout (Fixnum) -- maximum number of seconds before an update is cancelled

Parameters:
  • opts (Hash) -- the options used to define the behavior at update and deref
  • initial (Object) -- the initial value
def initialize(initial, opts = {})
  @value = initial
  @rescuers = []
  @validator = Proc.new { |result| true }
  @timeout = opts.fetch(:timeout, TIMEOUT).freeze
  @observers = CopyOnWriteObserverSet.new
  init_mutex
  set_deref_options(opts)
end

def post(&block)

Other tags:
    Yieldreturn: - the new value

Other tags:
    Yieldparam: value - the current value

Other tags:
    Yield: - the operation to be performed with the current value in order to calculate
def post(&block)
  Agent.thread_pool.post{ work(&block) } unless block.nil?
end

def rescue(clazz = StandardError, &block)

Other tags:
    Yieldparam: ex - the caught exception

Other tags:
    Yield: - the block to be called when a matching exception is caught

Parameters:
  • clazz (Exception) -- the class of exception to catch
def rescue(clazz = StandardError, &block)
  unless block.nil?
    mutex.synchronize do
      @rescuers << Rescuer.new(clazz, block)
    end
  end
  self
end

def try_rescue(ex) # :nodoc:

:nodoc:
@!visibility private
def try_rescue(ex) # :nodoc:
  rescuer = mutex.synchronize do
    @rescuers.find{|r| ex.is_a?(r.clazz) }
  end
  rescuer.block.call(ex) if rescuer
rescue Exception => ex
  # supress
end

def validate(&block)

Other tags:
    Yieldreturn: - true if the value is valid else false

Other tags:
    Yieldparam: value - the result of the last update operation

Other tags:
    Yield: - the block to be called after every update operation to determine if
def validate(&block)
  @validator = block unless block.nil?
  self
end

def work(&handler) # :nodoc:

:nodoc:
@!visibility private
def work(&handler) # :nodoc:
  begin
    should_notify = false
    mutex.synchronize do
      result = Concurrent::timeout(@timeout) do
        handler.call(@value)
      end
      if @validator.call(result)
        @value = result
        should_notify = true
      end
    end
    time = Time.now
    @observers.notify_observers{ [time, self.value] } if should_notify
  rescue Exception => ex
    try_rescue(ex)
  end
end