lib/rufus/scheduler.rb



#
#--
# Copyright (c) 2006-2008, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#++
#

#
# "made in Japan"
#
# John Mettraux at openwfe.org
#

require 'thread'
require 'monitor'
require 'rufus/otime'
require 'rufus/cronline'

module Rufus

  #
  # The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs.
  # 'at' jobs to execute once at a given point in time. 'cron' jobs
  # execute a specified intervals.
  # The two main methods are thus schedule_at() and schedule().
  #
  # schedule_at() and schedule() await either a Schedulable instance and
  # params (usually an array or nil), either a block, which is more in the
  # Ruby way.
  #
  # == The gem "rufus-scheduler"
  #
  # This scheduler was previously known as the "openwferu-scheduler" gem.
  #
  # To ensure that code tapping the previous gem still runs fine with
  # "rufus-scheduler", this new gem has 'pointers' for the old class
  # names.
  #
  #  require 'rubygems'
  #  require 'openwfe/util/scheduler'
  #  s = OpenWFE::Scheduler.new
  #
  # will still run OK with "rufus-scheduler".
  #
  # == Examples
  #
  #  require 'rubygems'
  #  require 'rufus/scheduler'
  #
  #  scheduler = Rufus::Scheduler.start_new
  #
  #  scheduler.schedule_in("3d") do
  #    regenerate_monthly_report()
  #  end
  #    #
  #    # will call the regenerate_monthly_report method
  #    # in 3 days from now
  #
  #   scheduler.schedule "0 22 * * 1-5" do
  #     log.info "activating security system..."
  #     activate_security_system()
  #   end
  #
  #   job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
  #     init_self_destruction_sequence()
  #   end
  #
  # an example that uses a Schedulable class :
  #
  #  class Regenerator < Schedulable
  #    def trigger (frequency)
  #      self.send(frequency)
  #    end
  #    def monthly
  #      # ...
  #    end
  #    def yearly
  #      # ...
  #    end
  #  end
  #
  #  regenerator = Regenerator.new
  #
  #  scheduler.schedule_in("4d", regenerator)
  #    #
  #    # will regenerate the report in four days
  #
  #  scheduler.schedule_in(
  #    "5d",
  #    { :schedulable => regenerator, :scope => :month })
  #      #
  #      # will regenerate the monthly report in 5 days
  #
  # There is also schedule_every() :
  #
  #   scheduler.schedule_every("1h20m") do
  #     regenerate_latest_report()
  #   end
  #
  # (note : a schedule every isn't triggered immediately, thus this example
  # will first trigger 1 hour and 20 minutes after being scheduled)
  #
  # The scheduler has a "exit_when_no_more_jobs" attribute. When set to
  # 'true', the scheduler will exit as soon as there are no more jobs to
  # run.
  # Use with care though, if you create a scheduler, set this attribute
  # to true and start the scheduler, the scheduler will immediately exit.
  # This attribute is best used indirectly : the method
  # join_until_no_more_jobs() wraps it.
  #
  # The :scheduler_precision can be set when instantiating the scheduler.
  #
  #   scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
  #   scheduler.start
  #     #
  #     # instatiates a scheduler that checks its jobs twice per second
  #     # (the default is 4 times per second (0.250))
  #
  # Note that rufus-scheduler places a constraint on the values for the
  # precision : 0.0 < p <= 1.0
  # Thus
  #
  #   scheduler.precision = 4.0
  #
  # or
  #
  #   scheduler = Rufus::Scheduler.new :scheduler_precision => 5.0
  #
  # will raise an exception.
  #
  #
  # == Tags
  #
  # Tags can be attached to jobs scheduled :
  #
  #   scheduler.schedule_in "2h", :tags => "backup" do
  #     init_backup_sequence()
  #   end
  #
  #   scheduler.schedule "0 24 * * *", :tags => "new_day" do
  #     do_this_or_that()
  #   end
  #
  #   jobs = find_jobs 'backup'
  #   jobs.each { |job| job.unschedule }
  #
  # Multiple tags may be attached to a single job :
  #
  #   scheduler.schedule_in "2h", :tags => [ "backup", "important" ]  do
  #     init_backup_sequence()
  #   end
  #
  # The vanilla case for tags assume they are String instances, but nothing
  # prevents you from using anything else. The scheduler has no persistence
  # by itself, so no serialization issue.
  #
  #
  # == Cron up to the second
  #
  # A cron schedule can be set at the second level :
  #
  #   scheduler.schedule "7 * * * * *" do
  #     puts "it's now the seventh second of the minute"
  #   end
  #
  # The rufus scheduler recognizes an optional first column for second
  # scheduling. This column can, like for the other columns, specify a
  # value ("7"), a list of values ("7,8,9,27") or a range ("7-12").
  #
  #
  # == information passed to schedule blocks
  #
  # When calling schedule_every(), schedule_in() or schedule_at(), the block
  # expects zero or 3 parameters like in
  #
  #   scheduler.schedule_every("1h20m") do |job_id, at, params|
  #       puts "my job_id is #{job_id}"
  #   end
  #
  # For schedule(), zero or two parameters can get passed
  #
  #   scheduler.schedule "7 * * * * *" do |job_id, cron_line, params|
  #     puts "my job_id is #{job_id}"
  #   end
  #
  # In both cases, params corresponds to the params passed to the schedule
  # method (:tags, :first_at, :first_in, :dont_reschedule, ...)
  #
  #
  # == Exceptions
  #
  # The rufus scheduler will output a stacktrace to the STDOUT in
  # case of exception. There are two ways to change that behaviour.
  #
  #   # 1 - providing a lwarn method to the scheduler instance :
  #
  #   class << scheduler
  #     def lwarn (&block)
  #       puts "oops, something wrong happened : "
  #       puts block.call
  #     end
  #   end
  #
  #   # 2 - overriding the [protected] method log_exception(e) :
  #
  #   class << scheduler
  #     def log_exception (e)
  #       puts "something wrong happened : "+e.to_s
  #     end
  #   end
  #
  # == 'Every jobs' and rescheduling
  #
  # Every jobs can reschedule/unschedule themselves. A reschedule example :
  #
  #   schedule.schedule_every "5h" do |job_id, at, params|
  #
  #     mails = $inbox.fetch_mails
  #     mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
  #
  #     params[:every] = if mails.size > 100
  #       "1h" # lots of spam, check every hour
  #     else
  #       "5h" # normal schedule, every 5 hours
  #     end
  #   end
  #
  # Unschedule example :
  #
  #   schedule.schedule_every "10s" do |job_id, at, params|
  #     #
  #     # polls every 10 seconds until a mail arrives
  #
  #     $mail = $inbox.fetch_last_mail
  #
  #     params[:dont_reschedule] = true if $mail
  #   end
  #
  # == 'Every jobs', :first_at and :first_in
  #
  # Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two
  # optional parameters, :first_at and :first_in
  #
  #   scheduler.schedule_every "2d", :first_in => "5h" do
  #     # schedule something every two days, start in 5 hours...
  #   end
  #
  #   scheduler.schedule_every "2d", :first_at => "5h" do
  #     # schedule something every two days, start in 5 hours...
  #   end
  #
  # == job.next_time()
  #
  # Jobs, be they at, every or cron have a next_time() method, which tells
  # when the job will be fired next time (for at and in jobs, this is also the
  # last time).
  #
  # For cron jobs, the current implementation is quite brutal. It takes three
  # seconds on my 2006 macbook to reach a cron schedule 1 year away.
  #
  # When is the next friday 13th ?
  #
  #   require 'rubygems'
  #   require 'rufus/scheduler'
  #
  #   puts Rufus::CronLine.new("* * 13 * fri").next_time
  #
  #
  # == :thread_name option
  #
  # You can specify the name of the scheduler's thread. Should make
  # it easier in some debugging situations.
  #
  #   scheduler.new :thread_name => "the crazy scheduler"
  #
  #
  # == job.trigger_thread
  #
  # Since rufus-scheduler 1.0.8, you can have access to the thread of
  # a job currently being triggered.
  #
  #   job = scheduler.get_job(job_id)
  #   thread = job.trigger_thread
  #
  # This new method will return nil if the job is not currently being
  # triggered. Not that in case of an every or cron job, this method
  # will return the thread of the last triggered instance, thus, in case
  # of overlapping executions, you only get the most recent thread.
  #
  class Scheduler

    #
    # By default, the precision is 0.250, with means the scheduler
    # will check for jobs to execute 4 times per second.
    #
    attr_reader :precision

    #
    # Setting the precision ( 0.0 < p <= 1.0 )
    #
    def precision= (f)

      raise "precision must be 0.0 < p <= 1.0" \
        if f <= 0.0 or f > 1.0

      @precision = f
    end

    #--
    # Set by default at 0.00045, it's meant to minimize drift
    #
    #attr_accessor :correction
    #++

    #
    # As its name implies.
    #
    attr_accessor :stopped


    def initialize (params={})

      super()

      @pending_jobs = []
      @cron_jobs = {}
      @non_cron_jobs = {}

      @schedule_queue = Queue.new
      @unschedule_queue = Queue.new
        #
        # sync between the step() method and the [un]schedule
        # methods is done via these queues, no more mutex

      @scheduler_thread = nil

      @precision = 0.250
        # every 250ms, the scheduler wakes up (default value)
      begin
        self.precision = Float(params[:scheduler_precision])
      rescue Exception => e
        # let precision at its default value
      end

      @thread_name = params[:thread_name] || "rufus scheduler"

      #@correction = 0.00045

      @exit_when_no_more_jobs = false
      #@dont_reschedule_every = false

      @last_cron_second = -1

      @stopped = true
    end

    #
    # Starts this scheduler (or restart it if it was previously stopped)
    #
    def start

      @stopped = false

      @scheduler_thread = Thread.new do

        Thread.current[:name] = @thread_name

        if defined?(JRUBY_VERSION)
          require 'java'
          java.lang.Thread.current_thread.name = @thread_name
        end

        loop do

          break if @stopped

          t0 = Time.now.to_f

          step

          d = Time.now.to_f - t0 # + @correction

          next if d > @precision

          sleep (@precision - d)
        end
      end
    end

    #
    # Instantiates a new Rufus::Scheduler instance, starts it and returns it
    #
    def self.start_new

      s = self.new
      s.start
      s
    end

    #
    # The scheduler is stoppable via sstop()
    #
    def stop

      @stopped = true
    end

    # (for backward compatibility)
    #
    alias :sstart :start

    # (for backward compatibility)
    #
    alias :sstop :stop

    #
    # Joins on the scheduler thread
    #
    def join

      @scheduler_thread.join
    end

    #
    # Like join() but takes care of setting the 'exit_when_no_more_jobs'
    # attribute of this scheduler to true before joining.
    # Thus the scheduler will exit (and the join terminates) as soon as
    # there aren't no more 'at' (or 'every') jobs in the scheduler.
    #
    # Currently used only in unit tests.
    #
    def join_until_no_more_jobs

      @exit_when_no_more_jobs = true
      join
    end

    #--
    #
    # The scheduling methods
    #
    #++

    #
    # Schedules a job by specifying at which time it should trigger.
    # Returns the a job_id that can be used to unschedule the job.
    #
    # This method returns a job identifier which can be used to unschedule()
    # the job.
    #
    # If the job is specified in the past, it will be triggered immediately
    # but not scheduled.
    # To avoid the triggering, the parameter :discard_past may be set to
    # true :
    #
    #   jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
    #     puts "you'll never read this message"
    #   end
    #
    # And 'jobid' will hold a nil (not scheduled).
    #
    #
    def schedule_at (at, params={}, &block)

      do_schedule_at(
        at,
        prepare_params(params),
        &block)
    end

    #
    # a shortcut for schedule_at
    #
    alias :at :schedule_at


    #
    # Schedules a job by stating in how much time it should trigger.
    # Returns the a job_id that can be used to unschedule the job.
    #
    # This method returns a job identifier which can be used to unschedule()
    # the job.
    #
    def schedule_in (duration, params={}, &block)

      do_schedule_at(
        Time.new.to_f + Rufus::duration_to_f(duration),
        prepare_params(params),
        &block)
    end

    #
    # a shortcut for schedule_in
    #
    alias :in :schedule_in

    #
    # Schedules a job in a loop. After an execution, it will not execute
    # before the time specified in 'freq'.
    #
    # This method returns a job identifier which can be used to unschedule()
    # the job.
    #
    # In case of exception in the job, it will be rescheduled. If you don't
    # want the job to be rescheduled, set the parameter :try_again to false.
    #
    #   scheduler.schedule_every "500", :try_again => false do
    #     do_some_prone_to_error_stuff()
    #       # won't get rescheduled in case of exception
    #   end
    #
    # Since rufus-scheduler 1.0.2, the params :first_at and :first_in are
    # accepted.
    #
    #   scheduler.schedule_every "2d", :first_in => "5h" do
    #     # schedule something every two days, start in 5 hours...
    #   end
    #
    # (without setting a :first_in (or :first_at), our example schedule would
    # have had been triggered after two days).
    #
    def schedule_every (freq, params={}, &block)

      params = prepare_params params
      params[:every] = freq

      first_at = params[:first_at]
      first_in = params[:first_in]

      first_at = if first_at
        at_to_f(first_at)
      elsif first_in
        Time.now.to_f + Rufus.duration_to_f(first_in)
      else
        Time.now.to_f + Rufus.duration_to_f(freq) # not triggering immediately
      end

      do_schedule_at(first_at, params, &block)
    end

    #
    # a shortcut for schedule_every
    #
    alias :every :schedule_every

    #
    # Schedules a cron job, the 'cron_line' is a string
    # following the Unix cron standard (see "man 5 crontab" in your command
    # line, or http://www.google.com/search?q=man%205%20crontab).
    #
    # For example :
    #
    #  scheduler.schedule("5 0 * * *", s)
    #    # will trigger the schedulable s every day
    #    # five minutes after midnight
    #
    #  scheduler.schedule("15 14 1 * *", s)
    #    # will trigger s at 14:15 on the first of every month
    #
    #  scheduler.schedule("0 22 * * 1-5") do
    #    puts "it's break time..."
    #  end
    #    # outputs a message every weekday at 10pm
    #
    # Returns the job id attributed to this 'cron job', this id can
    # be used to unschedule the job.
    #
    # This method returns a job identifier which can be used to unschedule()
    # the job.
    #
    def schedule (cron_line, params={}, &block)

      params = prepare_params(params)

      #
      # is a job with the same id already scheduled ?

      cron_id = params[:cron_id] || params[:job_id]

      #@unschedule_queue << cron_id

      #
      # schedule

      b = to_block(params, &block)
      job = CronJob.new(self, cron_id, cron_line, params, &b)

      @schedule_queue << job

      job.job_id
    end

    #
    # an alias for schedule()
    #
    alias :cron :schedule

    #--
    #
    # The UNscheduling methods
    #
    #++

    #
    # Unschedules an 'at' or a 'cron' job identified by the id
    # it was given at schedule time.
    #
    def unschedule (job_id)

      @unschedule_queue << job_id
    end

    #
    # Unschedules a cron job
    #
    # (deprecated : use unschedule(job_id) for all the jobs !)
    #
    def unschedule_cron_job (job_id)

      unschedule(job_id)
    end

    #--
    #
    # 'query' methods
    #
    #++

    #
    # Returns the job corresponding to job_id, an instance of AtJob
    # or CronJob will be returned.
    #
    def get_job (job_id)

      @cron_jobs[job_id] || @non_cron_jobs[job_id]
    end

    #
    # Finds a job (via get_job()) and then returns the wrapped
    # schedulable if any.
    #
    def get_schedulable (job_id)

      j = get_job(job_id)
      j.respond_to?(:schedulable) ? j.schedulable : nil
    end

    #
    # Returns an array of jobs that have the given tag.
    #
    def find_jobs (tag)

      @cron_jobs.values.find_all { |job| job.has_tag?(tag) } +
      @non_cron_jobs.values.find_all { |job| job.has_tag?(tag) }
    end

    #
    # Finds the jobs with the given tag and then returns an array of
    # the wrapped Schedulable objects.
    # Jobs that haven't a wrapped Schedulable won't be included in the
    # result.
    #
    def find_schedulables (tag)

      find_jobs(tag).find_all { |job| job.respond_to?(:schedulable) }
    end

    #
    # Returns the number of currently pending jobs in this scheduler
    # ('at' jobs and 'every' jobs).
    #
    def pending_job_count

      @pending_jobs.size
    end

    #
    # Returns the number of cron jobs currently active in this scheduler.
    #
    def cron_job_count

      @cron_jobs.size
    end

    #
    # Returns the current count of 'every' jobs scheduled.
    #
    def every_job_count

      @non_cron_jobs.values.select { |j| j.class == EveryJob }.size
    end

    #
    # Returns the current count of 'at' jobs scheduled (not 'every').
    #
    def at_job_count

      @non_cron_jobs.values.select { |j| j.class == AtJob }.size
    end

    #
    # Returns true if the given string seems to be a cron string.
    #
    def self.is_cron_string (s)

      s.match ".+ .+ .+ .+ .+" # well...
    end

    private

      #
      # the unschedule work itself.
      #
      def do_unschedule (job_id)

        job = get_job job_id

        return (@cron_jobs.delete(job_id) != nil) if job.is_a?(CronJob)

        return false unless job # not found

        if job.is_a?(AtJob) # catches AtJob and EveryJob instances
          @non_cron_jobs.delete(job_id)
          job.params[:dont_reschedule] = true # for AtJob as well, no worries
        end

        for i in 0...@pending_jobs.length
          if @pending_jobs[i].job_id == job_id
            @pending_jobs.delete_at i
            return true # asap
          end
        end

        true
      end

      #
      # Making sure that params is a Hash.
      #
      def prepare_params (params)

        params.is_a?(Schedulable) ? { :schedulable => params } : params
      end

      #
      # The core method behind schedule_at and schedule_in (and also
      # schedule_every). It's protected, don't use it directly.
      #
      def do_schedule_at (at, params={}, &block)

        job = params.delete :job

        unless job

          jobClass = params[:every] ? EveryJob : AtJob

          b = to_block params, &block

          job = jobClass.new self, at_to_f(at), params[:job_id], params, &b
        end

        if jobClass == AtJob && job.at < (Time.new.to_f + @precision)

          job.trigger() unless params[:discard_past]

          @non_cron_jobs.delete job.job_id # just to be sure

          return nil
        end

        @non_cron_jobs[job.job_id] = job

        @schedule_queue << job

        job.job_id
      end

      #
      # Ensures an 'at' instance is translated to a float
      # (to be compared with the float coming from time.to_f)
      #
      def at_to_f (at)

        at = Rufus::to_ruby_time(at) if at.kind_of?(String)
        at = Rufus::to_gm_time(at) if at.kind_of?(DateTime)
        at = at.to_f if at.kind_of?(Time)

        raise "cannot schedule at : #{at.inspect}" unless at.is_a?(Float)

        at
      end

      #
      # Returns a block. If a block is passed, will return it, else,
      # if a :schedulable is set in the params, will return a block
      # wrapping a call to it.
      #
      def to_block (params, &block)

        return block if block

        schedulable = params.delete(:schedulable)

        return nil unless schedulable

        l = lambda do
          schedulable.trigger(params)
        end
        class << l
          attr_accessor :schedulable
        end
        l.schedulable = schedulable

        l
      end

      #
      # Pushes an 'at' job into the pending job list
      #
      def push_pending_job (job)

        old = @pending_jobs.find { |j| j.job_id == job.job_id }
        @pending_jobs.delete(old) if old
          #
          # override previous job with same id

        if @pending_jobs.length < 1 or job.at >= @pending_jobs.last.at
          @pending_jobs << job
          return
        end

        for i in 0...@pending_jobs.length
          if job.at <= @pending_jobs[i].at
            @pending_jobs[i, 0] = job
            return # right place found
          end
        end
      end

      #
      # This is the method called each time the scheduler wakes up
      # (by default 4 times per second). It's meant to quickly
      # determine if there are jobs to trigger else to get back to sleep.
      # 'cron' jobs get executed if necessary then 'at' jobs.
      #
      def step

        step_unschedule
          # unschedules any job in the unschedule queue before
          # they have a chance to get triggered.

        step_trigger
          # triggers eligible jobs

        step_schedule
          # schedule new jobs

        # done.
      end

      #
      # unschedules jobs in the unschedule_queue
      #
      def step_unschedule

        loop do

          break if @unschedule_queue.empty?

          do_unschedule(@unschedule_queue.pop)
        end
      end

      #
      # adds every job waiting in the @schedule_queue to
      # either @pending_jobs or @cron_jobs.
      #
      def step_schedule

        loop do

          break if @schedule_queue.empty?

          j = @schedule_queue.pop

          if j.is_a?(CronJob)

            @cron_jobs[j.job_id] = j

          else # it's an 'at' job

            push_pending_job j
          end
        end
      end

      #
      # triggers every eligible pending (at or every) jobs, then every eligible
      # cron jobs.
      #
      def step_trigger

        now = Time.now

        if @exit_when_no_more_jobs && @pending_jobs.size < 1

          @stopped = true
          return
        end

        # TODO : eventually consider running cron / pending
        # job triggering in two different threads
        #
        # but well... there's the synchronization issue...

        #
        # cron jobs

        if now.sec != @last_cron_second

          @last_cron_second = now.sec

          @cron_jobs.each do |cron_id, cron_job|
            #trigger(cron_job) if cron_job.matches?(now, @precision)
            cron_job.trigger if cron_job.matches?(now)
          end
        end

        #
        # pending jobs

        now = now.to_f
          #
          # that's what at jobs do understand

        loop do

          break if @pending_jobs.length < 1

          job = @pending_jobs[0]

          break if job.at > now

          #if job.at <= now
            #
            # obviously

          job.trigger

          @pending_jobs.delete_at 0
        end
      end

      #
      # If an error occurs in the job, it well get caught and an error
      # message will be displayed to STDOUT.
      # If this scheduler provides a lwarn(message) method, it will
      # be used insted.
      #
      # Of course, one can override this method.
      #
      def log_exception (e)

        message =
          "trigger() caught exception\n" +
          e.to_s + "\n" +
          e.backtrace.join("\n")

        if self.respond_to?(:lwarn)
          lwarn { message }
        else
          puts message
        end
      end
  end

  #
  # This module adds a trigger method to any class that includes it.
  # The default implementation feature here triggers an exception.
  #
  module Schedulable

    def trigger (params)
      raise "trigger() implementation is missing"
    end

    def reschedule (scheduler)
      raise "reschedule() implentation is missing"
    end
  end

  protected

    JOB_ID_LOCK = Monitor.new
      #
      # would it be better to use a Mutex instead of a full-blown
      # Monitor ?

    #
    # The parent class for scheduled jobs.
    #
    class Job

      @@last_given_id = 0
        #
        # as a scheduler is fully transient, no need to
        # have persistent ids, a simple counter is sufficient

      #
      # The identifier for the job
      #
      attr_accessor :job_id

      #
      # An array of tags
      #
      attr_accessor :tags

      #
      # The block to execute at trigger time
      #
      attr_accessor :block

      #
      # A reference to the scheduler
      #
      attr_reader :scheduler

      #
      # Keeping a copy of the initialization params of the job.
      #
      attr_reader :params

      #
      # if the job is currently executing, this field points to
      # the 'trigger thread'
      #
      attr_reader :trigger_thread


      def initialize (scheduler, job_id, params, &block)

        @scheduler = scheduler
        @block = block

        if job_id
          @job_id = job_id
        else
          JOB_ID_LOCK.synchronize do
            @job_id = @@last_given_id
            @@last_given_id = @job_id + 1
          end
        end

        @params = params

        #@tags = Array(tags).collect { |tag| tag.to_s }
          # making sure we have an array of String tags

        @tags = Array(params[:tags])
          # any tag is OK
      end

      #
      # Returns true if this job sports the given tag
      #
      def has_tag? (tag)

        @tags.include?(tag)
      end

      #
      # Removes (cancels) this job from its scheduler.
      #
      def unschedule

        @scheduler.unschedule(@job_id)
      end

      #
      # Triggers the job (in a dedicated thread).
      #
      def trigger

        Thread.new do

          @trigger_thread = Thread.current
            # keeping track of the thread

          begin

            do_trigger

          rescue Exception => e

            @scheduler.send(:log_exception, e)
          end

          #@trigger_thread = nil if @trigger_thread = Thread.current
          @trigger_thread = nil
            # overlapping executions, what to do ?
        end
      end
    end

    #
    # An 'at' job.
    #
    class AtJob < Job

      #
      # The float representation (Time.to_f) of the time at which
      # the job should be triggered.
      #
      attr_accessor :at


      def initialize (scheduler, at, at_id, params, &block)

        super(scheduler, at_id, params, &block)
        @at = at
      end

      #
      # Returns the Time instance at which this job is scheduled.
      #
      def schedule_info

        Time.at(@at)
      end

      #
      # next_time is last_time (except for EveryJob instances). Returns
      # a Time instance.
      #
      def next_time

        schedule_info
      end

      protected

        #
        # Triggers the job (calls the block)
        #
        def do_trigger

          @block.call @job_id, @at

          @scheduler.instance_variable_get(:@non_cron_jobs).delete @job_id
        end
    end

    #
    # An 'every' job is simply an extension of an 'at' job.
    #
    class EveryJob < AtJob

      #
      # Returns the frequency string used to schedule this EveryJob,
      # like for example "3d" or "1M10d3h".
      #
      def schedule_info

        @params[:every]
      end

      protected

        #
        # triggers the job, then reschedules it if necessary
        #
        def do_trigger

          hit_exception = false

          begin

            @block.call @job_id, @at, @params

          rescue Exception => e

            @scheduler.send(:log_exception, e)

            hit_exception = true
          end

          if \
            @scheduler.instance_variable_get(:@exit_when_no_more_jobs) or
            (@params[:dont_reschedule] == true) or
            (hit_exception and @params[:try_again] == false)

            @scheduler.instance_variable_get(:@non_cron_jobs).delete(job_id)
              # maybe it'd be better to wipe that reference from here anyway...

            return
          end

          #
          # ok, reschedule ...


          params[:job] = self

          @at = @at + Rufus.duration_to_f(params[:every])

          @scheduler.send(:do_schedule_at, @at, params)
        end
    end

    #
    # A cron job.
    #
    class CronJob < Job

      #
      # The CronLine instance representing the times at which
      # the cron job has to be triggered.
      #
      attr_accessor :cron_line

      def initialize (scheduler, cron_id, line, params, &block)

        super(scheduler, cron_id, params, &block)

        if line.is_a?(String)

          @cron_line = CronLine.new(line)

        elsif line.is_a?(CronLine)

          @cron_line = line

        else

          raise \
            "Cannot initialize a CronJob " +
            "with a param of class #{line.class}"
        end
      end

      #
      # This is the method called by the scheduler to determine if it
      # has to fire this CronJob instance.
      #
      def matches? (time)
      #def matches? (time, precision)

        #@cron_line.matches?(time, precision)
        @cron_line.matches?(time)
      end

      #
      # Returns the original cron tab string used to schedule this
      # Job. Like for example "60/3 * * * Sun".
      #
      def schedule_info

        @cron_line.original
      end

      #
      # Returns a Time instance : the next time this cron job is
      # supposed to "fire".
      #
      # 'from' is used to specify the starting point for determining
      # what will be the next time. Defaults to now.
      #
      def next_time (from=Time.now)

        @cron_line.next_time(from)
      end

      protected

        #
        # As the name implies.
        #
        def do_trigger

          @block.call @job_id, @cron_line, @params
        end
    end

end