lib/servolux/threaded.rb
# == Synopsis # The Threaded module is used to peform some activity at a specified # interval. # # == Details # Sometimes it is useful for an object to have its own thread of execution # to perform a task at a recurring interval. The Threaded module # encapsulates this functionality so you don't have to write it yourself. It # can be used with any object that responds to the +run+ method. # # The threaded object is run by calling the +start+ method. This will create # a new thread that will invoke the +run+ method at the desired interval. # Just before the thread is created the +before_starting+ method will be # called (if it is defined by the threaded object). Likewise, after the # thread is created the +after_starting+ method will be called (if it is # defeined by the threaded object). # # The threaded object is stopped by calling the +stop+ method. This sets an # internal flag and then wakes up the thread. The thread gracefully exits # after checking the flag. Like the start method, before and after methods # are defined for stopping as well. Just before the thread is stopped the # +before_stopping+ method will be called (if it is defined by the threaded # object). Likewise, after the thread has died the +after_stopping+ method # will be called (if it is defeined by the threaded object). # # Calling the +join+ method on a threaded object will cause the calling # thread to wait until the threaded object has stopped. An optional timeout # parameter can be given. # # == Examples # Take a look at the Servolux::Server class for an example of a threaded # object. # module Servolux::Threaded # This method will be called by the activity thread at the desired # interval. Implementing classes are exptect to provide this # functionality. # def run raise NotImplementedError, 'The run method must be defined by the threaded object.' end # Start the activity thread. If already started this method will return # without taking any action. # # If the including class defines a 'before_starting' method, it will be # called before the thread is created and run. Likewise, if the # including class defines an 'after_starting' method, it will be called # after the thread is created. # def start return self if running? logger.debug "Starting" before_starting if self.respond_to?(:before_starting) @activity_thread_running = true @activity_thread_iterations = 0 @activity_thread = Thread.new { begin loop { begin sleep interval if running? break unless running? run @activity_thread_iterations += 1 break if finished_iterations? rescue SystemExit; raise rescue Exception => err if continue_on_error? logger.error err else logger.fatal err raise err end end } ensure @activity_thread_running = false end } after_starting if self.respond_to?(:after_starting) self end # Stop the activity thread. If already stopped this method will return # without taking any action. Otherwise, this method does not return until # the activity thread has died or until _limit_ seconds have passed. # # If the including class defines a 'before_stopping' method, it will be # called before the thread is stopped. Likewise, if the including class # defines an 'after_stopping' method, it will be called after the thread # has stopped. # def stop( limit = nil ) return self unless running? logger.debug "Stopping" @activity_thread_running = false before_stopping if self.respond_to?(:before_stopping) @activity_thread.wakeup join limit @activity_thread = nil after_stopping if self.respond_to?(:after_stopping) self end # Wait on the activity thread. If the thread is already stopped, this # method will return without taking any action. Otherwise, this method # does not return until the activity thread has stopped, or a specific # number of iterations has passed since this method was called. # def wait( limit = nil ) return self unless running? start_waiting_iterations = self.iterations loop { break unless running? break if limit and self.iterations > ( start_waiting_iterations + limit ) } end # If the activity thread is running, the calling thread will suspend # execution and run the activity thread. This method does not return until # the activity thread is stopped or until _limit_ seconds have passed. # # If the activity thread is not running, this method returns immediately # with +nil+. # def join( limit = nil ) return if @activity_thread.nil? @activity_thread.join(limit) ? self : nil end # Returns +true+ if the activity thread is running. Returns +false+ # otherwise. # def running? @activity_thread_running end # Returns +true+ if the activity thread has finished its maximum # number of iterations or the thread is no longer running. # Returns +false+ otherwise. # def finished_iterations? return true unless running? return true if maximum_iterations and (iterations >= maximum_iterations) return false end # Returns the status of threaded object. # # 'sleep' : sleeping or waiting on I/O # 'run' : executing # 'aborting' : aborting # false : not running or terminated normally # nil : terminated with an exception # # If this method returns +nil+, then calling join on the threaded object # will cause the exception to be raised in the calling thread. # def status return false if @activity_thread.nil? @activity_thread.status end # Sets the number of seconds to sleep between invocations of the # threaded object's 'run' method. # def interval=( value ) @activity_thread_interval = value end # Returns the number of seconds to sleep between invocations of the # threaded object's 'run' method. # def interval @activity_thread_interval ||= 60 end # Sets the maximum number of invocations of the threaded object's # 'run' method # def maximum_iterations=( value ) raise ArgumentError, "maximum iterations must be >= 1" unless value.to_i >= 1 @activity_thread_maximum_iterations = value.to_i end # Returns the maximum number of invocations of the threaded # object's 'run' method # def maximum_iterations return unless defined? @activity_thread_maximum_iterations @activity_thread_maximum_iterations end # Returns the number of iterations of the threaded object's 'run' method # completed thus far. # def iterations @activity_thread_iterations ||= 0 end # Set to +true+ to continue running the threaded object even if an error # is raised by the +run+ method. The default behavior is to stop the # activity thread when an error is raised by the run method. # # A SystemExit will never be caught; it will always cause the Ruby # interpreter to exit. # def continue_on_error=( value ) @activity_thread_continue_on_error = (value ? true : false) end # Returns +true+ if the threded object should continue running even if an # error is raised by the run method. The default is to return +false+. The # threaded object will stop running when an error is raised. # def continue_on_error? return @activity_thread_continue_on_error if defined? @activity_thread_continue_on_error @activity_thread_continue_on_error = false end # :stopdoc: # # The JRuby platform has an implementation error in it's Thread#join # method. In the Matz Ruby Interpreter, Thread#join with a +nil+ argument # will sleep forever; in the JRuby implementation, join will return # immediately. # if 'java' == RUBY_PLATFORM undef :join def join( limit = nil ) return if @activity_thread.nil? if limit.nil? @activity_thread.join ? self : nil else @activity_thread.join(limit) ? self : nil end end end # :startdoc: end # module Servolux::Threaded # EOF