class Concurrent::SimpleActorRef
def initialize(actor, opts = {})
def initialize(actor, opts = {}) @actor = actor @mutex = Mutex.new @one_by_one = OneByOne.new @executor = OptionsParser::get_executor_from(opts) @stop_event = Event.new @reset_on_error = opts.fetch(:reset_on_error, true) @exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError @args = opts.fetch(:args, []) if @reset_on_error @actor.define_singleton_method(:shutdown, &method(:set_stop_event)) @actor.on_start end
def join(limit = nil)
def join(limit = nil) @stop_event.wait(limit) end
def post(*msg, &block)
def post(*msg, &block) raise ArgumentError.new('message cannot be empty') if msg.empty? ivar = IVar.new @one_by_one.post(@executor, Message.new(msg, ivar, block), &method(:process_message)) ivar end
def post!(timeout, *msg)
def post!(timeout, *msg) raise Concurrent::TimeoutError unless timeout.nil? || timeout >= 0 ivar = self.post(*msg) ivar.value(timeout) if ivar.incomplete? raise Concurrent::TimeoutError elsif ivar.reason raise ivar.reason end ivar.value end
def process_message(message)
def process_message(message) result = ex = nil begin result = @actor.receive(*message.payload) rescue @exception_class => ex @actor.on_error(Time.now, message.payload, ex) if @reset_on_error @mutex.synchronize{ @actor = @actor.class.new(*@args) } end ensure now = Time.now message.ivar.complete(ex.nil?, result, ex) begin message.callback.call(now, result, ex) if message.callback rescue @exception_class => ex # suppress end end end
def running?
def running? not @stop_event.set? end
def set_stop_event
def set_stop_event @stop_event.set end
def shutdown
def shutdown @mutex.synchronize do return if shutdown? @actor.on_shutdown @stop_event.set end end
def shutdown?
def shutdown? @stop_event.set? end