lib/concurrent/actor.rb
require 'thread' require 'observer' require 'concurrent/event' require 'concurrent/obligation' require 'concurrent/postable' require 'concurrent/runnable' module Concurrent # Actor-based concurrency is all the rage in some circles. Originally described in # 1973, the actor model is a paradigm for creating asynchronous, concurrent objects # that is becoming increasingly popular. Much has changed since actors were first # written about four decades ago, which has led to a serious fragmentation within # the actor community. There is *no* universally accepted, strict definition of # "actor" and actor implementations differ widely between languages and libraries. # # A good definition of "actor" is: # # An independent, concurrent, single-purpose, computational entity that communicates exclusively via message passing. # # The +Concurrent::Actor+ class in this library is based solely on the # {http://www.scala-lang.org/api/current/index.html#scala.actors.Actor Actor} trait # defined in the Scala standard library. It does not implement all the features of # Scala's +Actor+ but its behavior for what *has* been implemented is nearly identical. # The excluded features mostly deal with Scala's message semantics, strong typing, # and other characteristics of Scala that don't really apply to Ruby. # # Unlike many of the abstractions in this library, +Actor+ takes an *object-oriented* # approach to asynchronous concurrency, rather than a *functional programming* # approach. # # Because +Actor+ mixes in the +Concurrent::Runnable+ module subclasses have access to # the +#on_error+ method and can override it to implement custom error handling. The # +Actor+ base class does not use +#on_error+ so as to avoid conflit with subclasses # which override it. Generally speaking, +#on_error+ should not be used. The +Actor+ # base class provides concictent, reliable, and robust error handling already, and # error handling specifics are tied to the message posting method. Incorrect behavior # in an +#on_error+ override can lead to inconsistent +Actor+ behavior that may lead # to confusion and difficult debugging. # # The +Actor+ superclass mixes in the Ruby standard library # {http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html Observable} # module to provide consistent callbacks upon message processing completion. The normal # +Observable+ methods, including +#add_observer+ behave normally. Once an observer # is added to an +Actor+ it will be notified of all messages processed *after* # addition. Notification will *not* occur for any messages that have already been # processed. # # Observers will be notified regardless of whether the message processing is successful # or not. The +#update+ method of the observer will receive four arguments. The # appropriate method signature is: # # def update(time, message, result, reason) # # These four arguments represent: # # * The time that message processing was completed # * An array containing all elements of the original message, in order # * The result of the call to +#act+ (will be +nil+ if an exception was raised) # * Any exception raised by +#act+ (or +nil+ if message processing was successful) # # @example Actor Ping Pong # class Ping < Concurrent::Actor # # def initialize(count, pong) # super() # @pong = pong # @remaining = count # end # # def act(msg) # # if msg == :pong # print "Ping: pong\n" if @remaining % 1000 == 0 # @pong.post(:ping) # # if @remaining > 0 # @pong << :ping # @remaining -= 1 # else # print "Ping :stop\n" # @pong << :stop # self.stop # end # end # end # end # # class Pong < Concurrent::Actor # # attr_writer :ping # # def initialize # super() # @count = 0 # end # # def act(msg) # # if msg == :ping # print "Pong: ping\n" if @count % 1000 == 0 # @ping << :pong # @count += 1 # # elsif msg == :stop # print "Pong :stop\n" # self.stop # end # end # end # # pong = Pong.new # ping = Ping.new(10000, pong) # pong.ping = ping # # t1 = ping.run! # t2 = pong.run! # # ping << :pong # # @see http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html class Actor include Observable include Postable include Runnable private # @!visibility private class Poolbox # :nodoc: include Postable def initialize(queue) @queue = queue end end public # Create a pool of actors that share a common mailbox. # # Every +Actor+ instance operates on its own thread. When one thread isn't enough capacity # to manage all the messages being sent to an +Actor+ a *pool* can be used instead. A pool # is a collection of +Actor+ instances, all of the same type, that shate a message queue. # Messages from other threads are all sent to a single queue against which all +Actor+s # load balance. # # @param [Integer] count the number of actors in the pool # @param [Array] args zero or more arguments to pass to each actor in the pool # # @return [Array] two-element array with the shared mailbox as the first element # and an array of actors as the second element # # @raise ArgumentError if +count+ is zero or less # # @example # class EchoActor < Concurrent::Actor # def act(*message) # puts "#{message} handled by #{self}" # end # end # # mailbox, pool = EchoActor.pool(5) # pool.each{|echo| echo.run! } # # 10.times{|i| mailbox.post(i) } # #=> [0] handled by #<EchoActor:0x007fc8014fb8b8> # #=> [1] handled by #<EchoActor:0x007fc8014fb890> # #=> [2] handled by #<EchoActor:0x007fc8014fb868> # #=> [3] handled by #<EchoActor:0x007fc8014fb890> # #=> [4] handled by #<EchoActor:0x007fc8014fb840> # #=> [5] handled by #<EchoActor:0x007fc8014fb8b8> # #=> [6] handled by #<EchoActor:0x007fc8014fb8b8> # #=> [7] handled by #<EchoActor:0x007fc8014fb818> # #=> [8] handled by #<EchoActor:0x007fc8014fb890> def self.pool(count, *args, &block) raise ArgumentError.new('count must be greater than zero') unless count > 0 mailbox = Queue.new actors = count.times.collect do if block_given? actor = self.new(*args, &block.dup) else actor = self.new(*args) end actor.instance_variable_set(:@queue, mailbox) actor end return Poolbox.new(mailbox), actors end protected # Actors are defined by subclassing the +Concurrent::Actor+ class and overriding the # #act method. The #act method can have any signature/arity but +def act(*args)+ # is the most flexible and least error-prone signature. The #act method is called in # response to a message being post to the +Actor+ instance (see *Behavior* below). # # @param [Array] message one or more arguments representing the message sent to the # actor via one of the Concurrent::Postable methods # # @return [Object] the result obtained when the message is successfully processed # # @raise NotImplementedError unless overridden in the +Actor+ subclass # # @!visibility public def act(*message) raise NotImplementedError.new("#{self.class} does not implement #act") end # @!visibility private def on_run # :nodoc: queue.clear end # @!visibility private def on_stop # :nodoc: queue.clear queue.push(:stop) end # @!visibility private def on_task # :nodoc: package = queue.pop return if package == :stop result = ex = nil notifier = package.notifier begin if notifier.nil? || (notifier.is_a?(Event) && ! notifier.set?) result = act(*package.message) end rescue => ex on_error(Time.now, package.message, ex) ensure if notifier.is_a?(Event) && ! notifier.set? package.handler.push(result || ex) package.notifier.set elsif package.handler.is_a?(IVar) package.handler.complete(! result.nil?, result, ex) elsif package.handler.respond_to?(:post) && ex.nil? package.handler.post(result) end changed notify_observers(Time.now, package.message, result, ex) end end # @!visibility private def on_error(time, msg, ex) # :nodoc: end end end