lib/concurrent/actor/simple_actor_ref.rb
require 'concurrent/actor/actor_ref' require 'concurrent/atomic/event' require 'concurrent/executor/single_thread_executor' require 'concurrent/ivar' module Concurrent class SimpleActorRef include ActorRef def initialize(actor, opts = {}) @actor = actor @mutex = Mutex.new @one_by_one = OneByOne.new @executor = OptionsParser::get_executor_from(opts) @stop_event = Event.new @reset_on_error = opts.fetch(:reset_on_error, true) @exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError @args = opts.fetch(:args, []) if @reset_on_error @actor.define_singleton_method(:shutdown, &method(:set_stop_event)) @actor.on_start end def running? not @stop_event.set? end def shutdown? @stop_event.set? end def post(*msg, &block) raise ArgumentError.new('message cannot be empty') if msg.empty? ivar = IVar.new @one_by_one.post(@executor, Message.new(msg, ivar, block), &method(:process_message)) ivar end def post!(timeout, *msg) raise Concurrent::TimeoutError unless timeout.nil? || timeout >= 0 ivar = self.post(*msg) ivar.value(timeout) if ivar.incomplete? raise Concurrent::TimeoutError elsif ivar.reason raise ivar.reason end ivar.value end def shutdown @mutex.synchronize do return if shutdown? @actor.on_shutdown @stop_event.set end end def join(limit = nil) @stop_event.wait(limit) end private Message = Struct.new(:payload, :ivar, :callback) def set_stop_event @stop_event.set end def process_message(message) result = ex = nil begin result = @actor.receive(*message.payload) rescue @exception_class => ex @actor.on_error(Time.now, message.payload, ex) if @reset_on_error @mutex.synchronize{ @actor = @actor.class.new(*@args) } end ensure now = Time.now message.ivar.complete(ex.nil?, result, ex) begin message.callback.call(now, result, ex) if message.callback rescue @exception_class => ex # suppress end end end end end