class Concurrent::SimpleActorRef

def initialize(actor, opts = {})

def initialize(actor, opts = {})
  @actor           = actor
  @mutex           = Mutex.new
  @one_by_one      = OneByOne.new
  @executor        = OptionsParser::get_executor_from(opts)
  @stop_event      = Event.new
  @reset_on_error  = opts.fetch(:reset_on_error, true)
  @exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError
  @args            = opts.fetch(:args, []) if @reset_on_error
  @actor.define_singleton_method(:shutdown, &method(:set_stop_event))
  @actor.on_start
end

def join(limit = nil)

def join(limit = nil)
  @stop_event.wait(limit)
end

def post(*msg, &block)

def post(*msg, &block)
  raise ArgumentError.new('message cannot be empty') if msg.empty?
  ivar = IVar.new
  @one_by_one.post(@executor, Message.new(msg, ivar, block), &method(:process_message))
  ivar
end

def post!(timeout, *msg)

def post!(timeout, *msg)
  raise Concurrent::TimeoutError unless timeout.nil? || timeout >= 0
  ivar = self.post(*msg)
  ivar.value(timeout)
  if ivar.incomplete?
    raise Concurrent::TimeoutError
  elsif ivar.reason
    raise ivar.reason
  end
  ivar.value
end

def process_message(message)

def process_message(message)
  result = ex = nil
  begin
    result = @actor.receive(*message.payload)
  rescue @exception_class => ex
    @actor.on_error(Time.now, message.payload, ex)
    if @reset_on_error
      @mutex.synchronize{ @actor = @actor.class.new(*@args) }
    end
  ensure
    now = Time.now
    message.ivar.complete(ex.nil?, result, ex)
    begin
      message.callback.call(now, result, ex) if message.callback
    rescue @exception_class => ex
      # suppress
    end
  end
end

def running?

def running?
  not @stop_event.set?
end

def set_stop_event

def set_stop_event
  @stop_event.set
end

def shutdown

def shutdown
  @mutex.synchronize do
    return if shutdown?
    @actor.on_shutdown
    @stop_event.set
  end
end

def shutdown?

def shutdown?
  @stop_event.set?
end