lib/concurrent-ruby/concurrent/timer_task.rb



require 'concurrent/collection/copy_on_notify_observer_set'
require 'concurrent/concern/dereferenceable'
require 'concurrent/concern/observable'
require 'concurrent/atomic/atomic_boolean'
require 'concurrent/executor/executor_service'
require 'concurrent/executor/ruby_executor_service'
require 'concurrent/executor/safe_task_executor'
require 'concurrent/scheduled_task'

module Concurrent

  # A very common concurrency pattern is to run a thread that performs a task at
  # regular intervals. The thread that performs the task sleeps for the given
  # interval then wakes up and performs the task. Lather, rinse, repeat... This
  # pattern causes two problems. First, it is difficult to test the business
  # logic of the task because the task itself is tightly coupled with the
  # concurrency logic. Second, an exception raised while performing the task can
  # cause the entire thread to abend. In a long-running application where the
  # task thread is intended to run for days/weeks/years a crashed task thread
  # can pose a significant problem. `TimerTask` alleviates both problems.
  #
  # When a `TimerTask` is launched it starts a thread for monitoring the
  # execution interval. The `TimerTask` thread does not perform the task,
  # however. Instead, the TimerTask launches the task on a separate thread.
  # Should the task experience an unrecoverable crash only the task thread will
  # crash. This makes the `TimerTask` very fault tolerant. Additionally, the
  # `TimerTask` thread can respond to the success or failure of the task,
  # performing logging or ancillary operations.
  #
  # One other advantage of `TimerTask` is that it forces the business logic to
  # be completely decoupled from the concurrency logic. The business logic can
  # be tested separately then passed to the `TimerTask` for scheduling and
  # running.
  #
  # A `TimerTask` supports two different types of interval calculations.
  # A fixed delay will always wait the same amount of time between the
  # completion of one task and the start of the next. A fixed rate will
  # attempt to maintain a constant rate of execution regardless of the
  # duration of the task. For example, if a fixed rate task is scheduled
  # to run every 60 seconds but the task itself takes 10 seconds to
  # complete, the next task will be scheduled to run 50 seconds after
  # the start of the previous task. If the task takes 70 seconds to
  # complete, the next task will be start immediately after the previous
  # task completes. Tasks will not be executed concurrently.
  #
  # In some cases it may be necessary for a `TimerTask` to affect its own
  # execution cycle. To facilitate this, a reference to the TimerTask instance
  # is passed as an argument to the provided block every time the task is
  # executed.
  #
  # The `TimerTask` class includes the `Dereferenceable` mixin module so the
  # result of the last execution is always available via the `#value` method.
  # Dereferencing options can be passed to the `TimerTask` during construction or
  # at any later time using the `#set_deref_options` method.
  #
  # `TimerTask` supports notification through the Ruby standard library
  # {http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
  # Observable} module. On execution the `TimerTask` will notify the observers
  # with three arguments: time of execution, the result of the block (or nil on
  # failure), and any raised exceptions (or nil on success).
  #
  # @!macro copy_options
  #
  # @example Basic usage
  #   task = Concurrent::TimerTask.new{ puts 'Boom!' }
  #   task.execute
  #
  #   task.execution_interval #=> 60 (default)
  #
  #   # wait 60 seconds...
  #   #=> 'Boom!'
  #
  #   task.shutdown #=> true
  #
  # @example Configuring `:execution_interval`
  #   task = Concurrent::TimerTask.new(execution_interval: 5) do
  #          puts 'Boom!'
  #        end
  #
  #   task.execution_interval #=> 5
  #
  # @example Immediate execution with `:run_now`
  #   task = Concurrent::TimerTask.new(run_now: true){ puts 'Boom!' }
  #   task.execute
  #
  #   #=> 'Boom!'
  #
  # @example Configuring `:interval_type` with either :fixed_delay or :fixed_rate, default is :fixed_delay
  #   task = Concurrent::TimerTask.new(execution_interval: 5, interval_type: :fixed_rate) do
  #          puts 'Boom!'
  #        end
  #   task.interval_type #=> :fixed_rate
  #
  # @example Last `#value` and `Dereferenceable` mixin
  #   task = Concurrent::TimerTask.new(
  #     dup_on_deref: true,
  #     execution_interval: 5
  #   ){ Time.now }
  #
  #   task.execute
  #   Time.now   #=> 2013-11-07 18:06:50 -0500
  #   sleep(10)
  #   task.value #=> 2013-11-07 18:06:55 -0500
  #
  # @example Controlling execution from within the block
  #   timer_task = Concurrent::TimerTask.new(execution_interval: 1) do |task|
  #     task.execution_interval.to_i.times{ print 'Boom! ' }
  #     print "\n"
  #     task.execution_interval += 1
  #     if task.execution_interval > 5
  #       puts 'Stopping...'
  #       task.shutdown
  #     end
  #   end
  #
  #   timer_task.execute
  #   #=> Boom!
  #   #=> Boom! Boom!
  #   #=> Boom! Boom! Boom!
  #   #=> Boom! Boom! Boom! Boom!
  #   #=> Boom! Boom! Boom! Boom! Boom!
  #   #=> Stopping...
  #
  # @example Observation
  #   class TaskObserver
  #     def update(time, result, ex)
  #       if result
  #         print "(#{time}) Execution successfully returned #{result}\n"
  #       else
  #         print "(#{time}) Execution failed with error #{ex}\n"
  #       end
  #     end
  #   end
  #
  #   task = Concurrent::TimerTask.new(execution_interval: 1){ 42 }
  #   task.add_observer(TaskObserver.new)
  #   task.execute
  #   sleep 4
  #
  #   #=> (2013-10-13 19:08:58 -0400) Execution successfully returned 42
  #   #=> (2013-10-13 19:08:59 -0400) Execution successfully returned 42
  #   #=> (2013-10-13 19:09:00 -0400) Execution successfully returned 42
  #   task.shutdown
  #
  #   task = Concurrent::TimerTask.new(execution_interval: 1){ sleep }
  #   task.add_observer(TaskObserver.new)
  #   task.execute
  #
  #   #=> (2013-10-13 19:07:25 -0400) Execution timed out
  #   #=> (2013-10-13 19:07:27 -0400) Execution timed out
  #   #=> (2013-10-13 19:07:29 -0400) Execution timed out
  #   task.shutdown
  #
  #   task = Concurrent::TimerTask.new(execution_interval: 1){ raise StandardError }
  #   task.add_observer(TaskObserver.new)
  #   task.execute
  #
  #   #=> (2013-10-13 19:09:37 -0400) Execution failed with error StandardError
  #   #=> (2013-10-13 19:09:38 -0400) Execution failed with error StandardError
  #   #=> (2013-10-13 19:09:39 -0400) Execution failed with error StandardError
  #   task.shutdown
  #
  # @see http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
  # @see http://docs.oracle.com/javase/7/docs/api/java/util/TimerTask.html
  class TimerTask < RubyExecutorService
    include Concern::Dereferenceable
    include Concern::Observable

    # Default `:execution_interval` in seconds.
    EXECUTION_INTERVAL = 60

    # Maintain the interval between the end of one execution and the start of the next execution.
    FIXED_DELAY = :fixed_delay

    # Maintain the interval between the start of one execution and the start of the next.
    # If execution time exceeds the interval, the next execution will start immediately
    # after the previous execution finishes. Executions will not run concurrently.
    FIXED_RATE = :fixed_rate

    # Default `:interval_type`
    DEFAULT_INTERVAL_TYPE = FIXED_DELAY

    # Create a new TimerTask with the given task and configuration.
    #
    # @!macro timer_task_initialize
    #   @param [Hash] opts the options defining task execution.
    #   @option opts [Float] :execution_interval number of seconds between
    #     task executions (default: EXECUTION_INTERVAL)
    #   @option opts [Boolean] :run_now Whether to run the task immediately
    #     upon instantiation or to wait until the first #  execution_interval
    #     has passed (default: false)
    #   @options opts [Symbol] :interval_type method to calculate the interval
    #     between executions, can be either :fixed_rate or :fixed_delay.
    #     (default: :fixed_delay)
    #   @option opts [Executor] executor, default is `global_io_executor`
    #
    #   @!macro deref_options
    #
    #   @raise ArgumentError when no block is given.
    #
    #   @yield to the block after :execution_interval seconds have passed since
    #     the last yield
    #   @yieldparam task a reference to the `TimerTask` instance so that the
    #     block can control its own lifecycle. Necessary since `self` will
    #     refer to the execution context of the block rather than the running
    #     `TimerTask`.
    #
    #   @return [TimerTask] the new `TimerTask`
    def initialize(opts = {}, &task)
      raise ArgumentError.new('no block given') unless block_given?
      super
      set_deref_options opts
    end

    # Is the executor running?
    #
    # @return [Boolean] `true` when running, `false` when shutting down or shutdown
    def running?
      @running.true?
    end

    # Execute a previously created `TimerTask`.
    #
    # @return [TimerTask] a reference to `self`
    #
    # @example Instance and execute in separate steps
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }
    #   task.running? #=> false
    #   task.execute
    #   task.running? #=> true
    #
    # @example Instance and execute in one line
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }.execute
    #   task.running? #=> true
    def execute
      synchronize do
        if @running.false?
          @running.make_true
          schedule_next_task(@run_now ? 0 : @execution_interval)
        end
      end
      self
    end

    # Create and execute a new `TimerTask`.
    #
    # @!macro timer_task_initialize
    #
    # @example
    #   task = Concurrent::TimerTask.execute(execution_interval: 10){ print "Hello World\n" }
    #   task.running? #=> true
    def self.execute(opts = {}, &task)
      TimerTask.new(opts, &task).execute
    end

    # @!attribute [rw] execution_interval
    # @return [Fixnum] Number of seconds after the task completes before the
    #   task is performed again.
    def execution_interval
      synchronize { @execution_interval }
    end

    # @!attribute [rw] execution_interval
    # @return [Fixnum] Number of seconds after the task completes before the
    #   task is performed again.
    def execution_interval=(value)
      if (value = value.to_f) <= 0.0
        raise ArgumentError.new('must be greater than zero')
      else
        synchronize { @execution_interval = value }
      end
    end

    # @!attribute [r] interval_type
    # @return [Symbol] method to calculate the interval between executions
    attr_reader :interval_type

    # @!attribute [rw] timeout_interval
    # @return [Fixnum] Number of seconds the task can run before it is
    #   considered to have failed.
    def timeout_interval
      warn 'TimerTask timeouts are now ignored as these were not able to be implemented correctly'
    end

    # @!attribute [rw] timeout_interval
    # @return [Fixnum] Number of seconds the task can run before it is
    #   considered to have failed.
    def timeout_interval=(value)
      warn 'TimerTask timeouts are now ignored as these were not able to be implemented correctly'
    end

    private :post, :<<

    private

    def ns_initialize(opts, &task)
      set_deref_options(opts)

      self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
      if opts[:interval_type] && ![FIXED_DELAY, FIXED_RATE].include?(opts[:interval_type])
        raise ArgumentError.new('interval_type must be either :fixed_delay or :fixed_rate')
      end
      if opts[:timeout] || opts[:timeout_interval]
        warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly'
      end

      @run_now = opts[:now] || opts[:run_now]
      @interval_type = opts[:interval_type] || DEFAULT_INTERVAL_TYPE
      @task = Concurrent::SafeTaskExecutor.new(task)
      @executor = opts[:executor] || Concurrent.global_io_executor
      @running = Concurrent::AtomicBoolean.new(false)
      @value = nil

      self.observers = Collection::CopyOnNotifyObserverSet.new
    end

    # @!visibility private
    def ns_shutdown_execution
      @running.make_false
      super
    end

    # @!visibility private
    def ns_kill_execution
      @running.make_false
      super
    end

    # @!visibility private
    def schedule_next_task(interval = execution_interval)
      ScheduledTask.execute(interval, executor: @executor, args: [Concurrent::Event.new], &method(:execute_task))
      nil
    end

    # @!visibility private
    def execute_task(completion)
      return nil unless @running.true?
      start_time = Concurrent.monotonic_time
      _success, value, reason = @task.execute(self)
      if completion.try?
        self.value = value
        schedule_next_task(calculate_next_interval(start_time))
        time = Time.now
        observers.notify_observers do
          [time, self.value, reason]
        end
      end
      nil
    end

    # @!visibility private
    def calculate_next_interval(start_time)
      if @interval_type == FIXED_RATE
        run_time = Concurrent.monotonic_time - start_time
        [execution_interval - run_time, 0].max
      else # FIXED_DELAY
        execution_interval
      end
    end
  end
end