class Concurrent::Agent

@return [Fixnum] the maximum number of seconds before an update is cancelled
@!attribute [r] timeout
score.value #=> 170
sleep(0.1)
score << proc{|current| current - 50 }
score.value #=> 220
sleep(0.1)
score << proc{|current| current * 2 }
score.value #=> 110
sleep(0.1)
score << proc{|current| current + 100 }
score.value #=> 10
score = Concurrent::Agent.new(10)
@example Basic usage
A good example of an agent is a shared incrementing counter, such as the score in a video game.
will become the new value of the agent. Agents support two error handling modes: fail and continue.
will receive the current value of the agent as its sole parameter. The return value of the block
the global thread pool. Consumers can ‘#post` code blocks to the agent. The code block (function)
of the agent can be requested at any time (`#deref`). Each agent has a work queue and operates on
An agent is a single atomic value that represents an identity. The current value

def <<(block)

Other tags:
    Yieldreturn: - the new value

Other tags:
    Yieldparam: value - the current value

Other tags:
    Yield: - the operation to be performed with the current value in order to calculate
def <<(block)
  post(&block)
  self
end

def await(timeout = nil)

Returns:
  • (Boolean) - false on timeout, true otherwise

Parameters:
  • timeout (Numeric) -- the maximum time in second to wait.
def await(timeout = nil)
  done = Event.new
  post { |val| done.set; val }
  done.wait timeout
end

def initialize(initial, opts = {})

Options Hash: (**opts)
  • :copy_on_deref (String) -- call the given `Proc` passing the internal value and
  • :freeze_on_deref (String) -- call `#freeze` before returning the data
  • :dup_on_deref (String) -- call `#dup` before returning the data
  • :executor (object) -- when provided will run all operations on
  • :operation (Boolean) -- when `true` will execute the future on the global
  • :timeout (Fixnum) -- maximum number of seconds before an update is cancelled

Parameters:
  • opts (Hash) -- the options used to define the behavior at update and deref
  • initial (Object) -- the initial value
def initialize(initial, opts = {})
  @value              = initial
  @rescuers           = []
  @validator          = Proc.new { |result| true }
  @timeout            = opts.fetch(:timeout, TIMEOUT).freeze
  self.observers      = CopyOnWriteObserverSet.new
  @one_by_one         = OneByOne.new
  @task_executor      = OptionsParser.get_task_executor_from(opts)
  @operation_executor = OptionsParser.get_operation_executor_from(opts)
  init_mutex
  set_deref_options(opts)
end

def post(&block)

Returns:
  • (true, nil) - nil when no block is given

Other tags:
    Yieldreturn: - the new value

Other tags:
    Yieldparam: value - the current value

Other tags:
    Yield: - the operation to be performed with the current value in order to calculate
def post(&block)
  post_on(@task_executor, &block)
end

def post_off(&block)

Returns:
  • (true, nil) - nil when no block is given

Other tags:
    Yieldreturn: - the new value

Other tags:
    Yieldparam: value - the current value

Other tags:
    Yield: - the operation to be performed with the current value in order to calculate
def post_off(&block)
  post_on(@operation_executor, &block)
end

def post_on(executor, &block)

def post_on(executor, &block)
  return nil if block.nil?
  @one_by_one.post(executor) { work(&block) }
  true
end

def rescue(clazz = StandardError, &block)

Other tags:
    Yieldparam: ex - the caught exception

Other tags:
    Yield: - the block to be called when a matching exception is caught

Parameters:
  • clazz (Exception) -- the class of exception to catch
def rescue(clazz = StandardError, &block)
  unless block.nil?
    mutex.synchronize do
      @rescuers << Rescuer.new(clazz, block)
    end
  end
  self
end

def try_rescue(ex) # :nodoc:

:nodoc:
@!visibility private
def try_rescue(ex) # :nodoc:
  rescuer = mutex.synchronize do
    @rescuers.find { |r| ex.is_a?(r.clazz) }
  end
  rescuer.block.call(ex) if rescuer
rescue Exception => ex
  # supress
end

def validate(&block)

Other tags:
    Yieldreturn: - true if the value is valid else false

Other tags:
    Yieldparam: value - the result of the last update operation

Other tags:
    Yield: - the block to be called after every update operation to determine if
def validate(&block)
  unless block.nil?
    mutex.lock
    @validator = block
    mutex.unlock
  end
  self
end

def work(&handler) # :nodoc:

:nodoc:
@!visibility private
def work(&handler) # :nodoc:
  validator, value = mutex.synchronize { [@validator, @value] }
  begin
    # FIXME creates second thread
    result, valid = Concurrent::timeout(@timeout) do
      result = handler.call(value)
      [result, validator.call(result)]
    end
  rescue Exception => ex
    exception = ex
  end
  mutex.lock
  should_notify = if !exception && valid
                    @value = result
                    true
                  end
  mutex.unlock
  if should_notify
    time = Time.now
    observers.notify_observers { [time, self.value] }
  end
  try_rescue(exception)
end