lib/servolux/threaded.rb
# == Synopsis # The Threaded module is used to perform some activity at a specified # interval. # # == Details # Sometimes it is useful for an object to have its own thread of execution # to perform a task at a recurring interval. The Threaded module # encapsulates this functionality so you don't have to write it yourself. It # can be used with any object that responds to the +run+ method. # # The threaded object is run by calling the +start+ method. This will create # a new thread that will invoke the +run+ method at the desired interval. # Just before the thread is created the +before_starting+ method will be # called (if it is defined by the threaded object). Likewise, after the # thread is created the +after_starting+ method will be called (if it is # defined by the threaded object). # # The threaded object is stopped by calling the +stop+ method. This sets an # internal flag and then wakes up the thread. The thread gracefully exits # after checking the flag. Like the start method, before and after methods # are defined for stopping as well. Just before the thread is stopped the # +before_stopping+ method will be called (if it is defined by the threaded # object). Likewise, after the thread has died the +after_stopping+ method # will be called (if it is defined by the threaded object). # # Calling the +join+ method on a threaded object will cause the calling # thread to wait until the threaded object has stopped. An optional timeout # parameter can be given. # # == Examples # Take a look at the Servolux::Server class for an example of a threaded # object. # module Servolux::Threaded # This method will be called by the activity thread at the desired # interval. Implementing classes are expect to provide this # functionality. # def run raise NotImplementedError, 'The run method must be defined by the threaded object.' end # Start the activity thread. If already started this method will return # without taking any action. # # If the including class defines a 'before_starting' method, it will be # called before the thread is created and run. Likewise, if the # including class defines an 'after_starting' method, it will be called # after the thread is created. # def start return self if _activity_thread.running? logger.debug "Starting" before_starting if self.respond_to?(:before_starting) @_activity_thread.start self after_starting if self.respond_to?(:after_starting) self end # Stop the activity thread. If already stopped this method will return # without taking any action. # # If the including class defines a 'before_stopping' method, it will be # called before the thread is stopped. Likewise, if the including class # defines an 'after_stopping' method, it will be called after the thread # has stopped. # def stop return self unless _activity_thread.running? logger.debug "Stopping" before_stopping if self.respond_to?(:before_stopping) @_activity_thread.stop self end # Wait on the activity thread. If the thread is already stopped, this # method will return without taking any action. Otherwise, this method # does not return until the activity thread has stopped, or a specific # number of iterations has passed since this method was called. # def wait( limit = nil ) return self unless _activity_thread.running? initial_iterations = @_activity_thread.iterations loop { break unless @_activity_thread.running? break if limit and @_activity_thread.iterations > ( initial_iterations + limit ) Thread.pass } end # If the activity thread is running, the calling thread will suspend # execution and run the activity thread. This method does not return until # the activity thread is stopped or until _limit_ seconds have passed. # # If the activity thread is not running, this method returns immediately # with +nil+. # def join( limit = nil ) _activity_thread.join(limit) ? self : nil end # Returns +true+ if the activity thread is running. Returns +false+ # otherwise. # def running? _activity_thread.running? end # Returns +true+ if the activity thread has finished its maximum # number of iterations or the thread is no longer running. # Returns +false+ otherwise. # def finished_iterations? return true unless _activity_thread.running? @_activity_thread.finished_iterations? end # Returns the status of threaded object. # # 'sleep' : sleeping or waiting on I/O # 'run' : executing # 'aborting' : aborting # false : not running or terminated normally # nil : terminated with an exception # # If this method returns +nil+, then calling join on the threaded object # will cause the exception to be raised in the calling thread. # def status return false if _activity_thread.thread.nil? @_activity_thread.thread.status end # Sets the number of seconds to sleep between invocations of the # threaded object's 'run' method. # def interval=( value ) value = Float(value) raise ArgumentError, "Sleep interval must be >= 0" unless value >= 0 _activity_thread.interval = value end # Returns the number of seconds to sleep between invocations of the # threaded object's 'run' method. # def interval _activity_thread.interval end # Signals the activity thread to treat the sleep interval with strict # semantics. The time it takes for the 'run' method to execute will be # subtracted from the sleep interval. # # If the sleep interval is 60 seconds and the 'run' method takes 2.2 seconds # to execute, then the activity thread will sleep for 57.2 seconds. The # subsequent invocation of the 'run' will occur as close as possible to 60 # seconds after the previous invocation. # def use_strict_interval=( value ) _activity_thread.use_strict_interval = (value ? true : false) end # When true, the activity thread will treat the sleep interval with strict # semantics. See the setter method for an in depth explanation. # def use_strict_interval _activity_thread.use_strict_interval end alias :use_strict_interval? :use_strict_interval # Sets the maximum number of invocations of the threaded object's # 'run' method # def maximum_iterations=( value ) unless value.nil? value = Integer(value) raise ArgumentError, "maximum iterations must be >= 1" unless value >= 1 end _activity_thread.maximum_iterations = value end # Returns the maximum number of invocations of the threaded # object's 'run' method # def maximum_iterations _activity_thread.maximum_iterations end # Returns the number of iterations of the threaded object's 'run' method # completed thus far. # def iterations _activity_thread.iterations end # Set to +true+ to continue running the threaded object even if an error # is raised by the +run+ method. The default behavior is to stop the # activity thread when an error is raised by the run method. # # A SystemExit will never be caught; it will always cause the Ruby # interpreter to exit. # def continue_on_error=( value ) _activity_thread.continue_on_error = (value ? true : false) end # Returns +true+ if the threaded object should continue running even if an # error is raised by the run method. The default is to return +false+. The # threaded object will stop running when an error is raised. # def continue_on_error? _activity_thread.continue_on_error end # :stopdoc: def _activity_thread @_activity_thread ||= ::Servolux::Threaded::ThreadContainer.new(60, false, 0, nil, false); end # @private # @private ThreadContainer = Struct.new( :interval, :use_strict_interval, :iterations, :maximum_iterations, :continue_on_error, :thread, :running ) { def start( threaded ) self.running = true self.iterations = 0 self.thread = Thread.new { run threaded } Thread.pass end # @private def stop self.running = false thread.wakeup if thread.alive? end # @private def run( threaded ) loop do begin mark_time break unless running? threaded.run if maximum_iterations self.iterations += 1 if finished_iterations? self.running = false break end end sleep if running? rescue SystemExit; raise rescue Exception => err if continue_on_error threaded.logger.error err else threaded.logger.fatal err raise err end end end ensure if threaded.respond_to?(:after_stopping) and !self.running threaded.after_stopping end self.running = false end # @private def join( limit = nil ) return if thread.nil? limit ? thread.join(limit) : thread.join end # @private def finished_iterations? return true if maximum_iterations and (iterations >= maximum_iterations) return false end # @private alias :running? :running # Mark the start time of the run loop. # def mark_time @mark = Time.now if use_strict_interval end # @private # Sleep for "interval" seconds adjusting for the run time of the "run" # method if the "use_strict_interval" flag is set. If the run time of the # "run" method exceeds our sleep "interval", then log a warning and just # use the interval as normal for this sleep period. # def sleep time_to_sleep = interval if use_strict_interval and interval > 0 diff = Time.now - @mark time_to_sleep = interval - diff if time_to_sleep < 0 time_to_sleep = interval logger.warn "Run time [#{diff} s] exceeded strict interval [#{interval} s]" end end ::Kernel.sleep time_to_sleep end # @private } # :startdoc: end