#
#--
# Copyright (c) 2006-2008, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#++
#
#
# "made in Japan"
#
# John Mettraux at openwfe.org
#
require 'thread'
require 'rufus/otime'
require 'rufus/cronline'
module Rufus
#
# The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs.
# 'at' jobs to execute once at a given point in time. 'cron' jobs
# execute a specified intervals.
# The two main methods are thus schedule_at() and schedule().
#
# schedule_at() and schedule() await either a Schedulable instance and
# params (usually an array or nil), either a block, which is more in the
# Ruby way.
#
# == The gem "rufus-scheduler"
#
# This scheduler was previously known as the "openwferu-scheduler" gem.
#
# To ensure that code tapping the previous gem still runs fine with
# "rufus-scheduler", this new gem has 'pointers' for the old class
# names.
#
# require 'rubygems'
# require 'openwfe/util/scheduler'
# s = OpenWFE::Scheduler.new
#
# will still run OK with "rufus-scheduler".
#
# == Examples
#
# require 'rubygems'
# require 'rufus/scheduler'
#
# scheduler = Rufus::Scheduler.start_new
#
# scheduler.schedule_in("3d") do
# regenerate_monthly_report()
# end
# #
# # will call the regenerate_monthly_report method
# # in 3 days from now
#
# scheduler.schedule "0 22 * * 1-5" do
# log.info "activating security system..."
# activate_security_system()
# end
#
# job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
# init_self_destruction_sequence()
# end
#
# scheduler.join # join the scheduler (prevents exiting)
#
#
# an example that uses a Schedulable class :
#
# class Regenerator < Schedulable
# def trigger (frequency)
# self.send(frequency)
# end
# def monthly
# # ...
# end
# def yearly
# # ...
# end
# end
#
# regenerator = Regenerator.new
#
# scheduler.schedule_in("4d", regenerator)
# #
# # will regenerate the report in four days
#
# scheduler.schedule_in(
# "5d",
# { :schedulable => regenerator, :scope => :month })
# #
# # will regenerate the monthly report in 5 days
#
# There is also schedule_every() :
#
# scheduler.schedule_every("1h20m") do
# regenerate_latest_report()
# end
#
# (note : a schedule every isn't triggered immediately, thus this example
# will first trigger 1 hour and 20 minutes after being scheduled)
#
# The scheduler has a "exit_when_no_more_jobs" attribute. When set to
# 'true', the scheduler will exit as soon as there are no more jobs to
# run.
# Use with care though, if you create a scheduler, set this attribute
# to true and start the scheduler, the scheduler will immediately exit.
# This attribute is best used indirectly : the method
# join_until_no_more_jobs() wraps it.
#
# The :scheduler_precision can be set when instantiating the scheduler.
#
# scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
# scheduler.start
# #
# # instatiates a scheduler that checks its jobs twice per second
# # (the default is 4 times per second (0.250))
#
# Note that rufus-scheduler places a constraint on the values for the
# precision : 0.0 < p <= 1.0
# Thus
#
# scheduler.precision = 4.0
#
# or
#
# scheduler = Rufus::Scheduler.new :scheduler_precision => 5.0
#
# will raise an exception.
#
#
# == Tags
#
# Tags can be attached to jobs scheduled :
#
# scheduler.schedule_in "2h", :tags => "backup" do
# init_backup_sequence()
# end
#
# scheduler.schedule "0 24 * * *", :tags => "new_day" do
# do_this_or_that()
# end
#
# jobs = find_jobs 'backup'
# jobs.each { |job| job.unschedule }
#
# Multiple tags may be attached to a single job :
#
# scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do
# init_backup_sequence()
# end
#
# The vanilla case for tags assume they are String instances, but nothing
# prevents you from using anything else. The scheduler has no persistence
# by itself, so no serialization issue.
#
#
# == Cron up to the second
#
# A cron schedule can be set at the second level :
#
# scheduler.schedule "7 * * * * *" do
# puts "it's now the seventh second of the minute"
# end
#
# The rufus scheduler recognizes an optional first column for second
# scheduling. This column can, like for the other columns, specify a
# value ("7"), a list of values ("7,8,9,27") or a range ("7-12").
#
#
# == information passed to schedule blocks
#
# When calling schedule_every(), schedule_in() or schedule_at(), the block
# expects zero or 3 parameters like in
#
# scheduler.schedule_every("1h20m") do |job_id, at, params|
# puts "my job_id is #{job_id}"
# end
#
# For schedule(), zero or two parameters can get passed
#
# scheduler.schedule "7 * * * * *" do |job_id, cron_line, params|
# puts "my job_id is #{job_id}"
# end
#
# In both cases, params corresponds to the params passed to the schedule
# method (:tags, :first_at, :first_in, :dont_reschedule, ...)
#
#
# == Exceptions
#
# The rufus scheduler will output a stacktrace to the STDOUT in
# case of exception. There are two ways to change that behaviour.
#
# # 1 - providing a lwarn method to the scheduler instance :
#
# class << scheduler
# def lwarn (&block)
# puts "oops, something wrong happened : "
# puts block.call
# end
# end
#
# # or
#
# def scheduler.lwarn (&block)
# puts "oops, something wrong happened : "
# puts block.call
# end
#
# # 2 - overriding the [protected] method log_exception(e) :
#
# class << scheduler
# def log_exception (e)
# puts "something wrong happened : "+e.to_s
# end
# end
#
# # or
#
# def scheduler.log_exception (e)
# puts "something wrong happened : "+e.to_s
# end
#
# == 'Every jobs' and rescheduling
#
# Every jobs can reschedule/unschedule themselves. A reschedule example :
#
# schedule.schedule_every "5h" do |job_id, at, params|
#
# mails = $inbox.fetch_mails
# mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
#
# params[:every] = if mails.size > 100
# "1h" # lots of spam, check every hour
# else
# "5h" # normal schedule, every 5 hours
# end
# end
#
# Unschedule example :
#
# schedule.schedule_every "10s" do |job_id, at, params|
# #
# # polls every 10 seconds until a mail arrives
#
# $mail = $inbox.fetch_last_mail
#
# params[:dont_reschedule] = true if $mail
# end
#
# == 'Every jobs', :first_at and :first_in
#
# Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two
# optional parameters, :first_at and :first_in
#
# scheduler.schedule_every "2d", :first_in => "5h" do
# # schedule something every two days, start in 5 hours...
# end
#
# scheduler.schedule_every "2d", :first_at => "5h" do
# # schedule something every two days, start in 5 hours...
# end
#
# == job.next_time()
#
# Jobs, be they at, every or cron have a next_time() method, which tells
# when the job will be fired next time (for at and in jobs, this is also the
# last time).
#
# For cron jobs, the current implementation is quite brutal. It takes three
# seconds on my 2006 macbook to reach a cron schedule 1 year away.
#
# When is the next friday 13th ?
#
# require 'rubygems'
# require 'rufus/scheduler'
#
# puts Rufus::CronLine.new("* * 13 * fri").next_time
#
#
# == :thread_name option
#
# You can specify the name of the scheduler's thread. Should make
# it easier in some debugging situations.
#
# scheduler.new :thread_name => "the crazy scheduler"
#
#
# == job.trigger_thread
#
# Since rufus-scheduler 1.0.8, you can have access to the thread of
# a job currently being triggered.
#
# job = scheduler.get_job(job_id)
# thread = job.trigger_thread
#
# This new method will return nil if the job is not currently being
# triggered. Not that in case of an every or cron job, this method
# will return the thread of the last triggered instance, thus, in case
# of overlapping executions, you only get the most recent thread.
#
#
# == specifying a :timeout for a job
#
# rufus-scheduler 1.0.12 introduces a :timeout parameter for jobs.
#
# scheduler.every "3h", :timeout => '2h30m' do
# do_that_long_job()
# end
#
# after 2 hours and half, the 'long job' will get interrupted by a
# Rufus::TimeOutError (so that you know what to catch).
#
# :timeout is applicable to all types of jobs : at, in, every, cron. It
# accepts a String value following the "Mdhms" scheme the rufus-scheduler
# uses.
#
class Scheduler
VERSION = '1.0.12'
#
# By default, the precision is 0.250, with means the scheduler
# will check for jobs to execute 4 times per second.
#
attr_reader :precision
#
# Setting the precision ( 0.0 < p <= 1.0 )
#
def precision= (f)
raise 'precision must be 0.0 < p <= 1.0' \
if f <= 0.0 or f > 1.0
@precision = f
end
#--
# Set by default at 0.00045, it's meant to minimize drift
#
#attr_accessor :correction
#++
#
# As its name implies.
#
attr_accessor :stopped
def initialize (params={})
super()
@pending_jobs = []
@cron_jobs = {}
@non_cron_jobs = {}
@schedule_queue = Queue.new
@unschedule_queue = Queue.new
#
# sync between the step() method and the [un]schedule
# methods is done via these queues, no more mutex
@scheduler_thread = nil
@precision = 0.250
# every 250ms, the scheduler wakes up (default value)
begin
self.precision = Float(params[:scheduler_precision])
rescue Exception => e
# let precision at its default value
end
@thread_name = params[:thread_name] || "rufus scheduler"
#@correction = 0.00045
@exit_when_no_more_jobs = false
#@dont_reschedule_every = false
@last_cron_second = -1
@stopped = true
end
#
# Starts this scheduler (or restart it if it was previously stopped)
#
def start
@stopped = false
@scheduler_thread = Thread.new do
Thread.current[:name] = @thread_name
if defined?(JRUBY_VERSION)
require 'java'
java.lang.Thread.current_thread.name = @thread_name
end
loop do
break if @stopped
t0 = Time.now.to_f
step
d = Time.now.to_f - t0 # + @correction
next if d > @precision
sleep(@precision - d)
end
end
end
#
# Instantiates a new Rufus::Scheduler instance, starts it and returns it
#
def self.start_new (params = {})
s = self.new(params)
s.start
s
end
#
# The scheduler is stoppable via sstop()
#
def stop
@stopped = true
end
# (for backward compatibility)
#
alias :sstart :start
# (for backward compatibility)
#
alias :sstop :stop
#
# Joins on the scheduler thread
#
def join
@scheduler_thread.join
end
#
# Like join() but takes care of setting the 'exit_when_no_more_jobs'
# attribute of this scheduler to true before joining.
# Thus the scheduler will exit (and the join terminates) as soon as
# there aren't no more 'at' (or 'every') jobs in the scheduler.
#
# Currently used only in unit tests.
#
def join_until_no_more_jobs
@exit_when_no_more_jobs = true
join
end
#
# Ensures that a duration is a expressed as a Float instance.
#
# duration_to_f("10s")
#
# will yield 10.0
#
def duration_to_f (s)
Rufus.duration_to_f(s)
end
#--
#
# The scheduling methods
#
#++
#
# Schedules a job by specifying at which time it should trigger.
# Returns the a job_id that can be used to unschedule the job.
#
# This method returns a job identifier which can be used to unschedule()
# the job.
#
# If the job is specified in the past, it will be triggered immediately
# but not scheduled.
# To avoid the triggering, the parameter :discard_past may be set to
# true :
#
# jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
# puts "you'll never read this message"
# end
#
# And 'jobid' will hold a nil (not scheduled).
#
#
def schedule_at (at, params={}, &block)
do_schedule_at(
at,
prepare_params(params),
&block)
end
#
# a shortcut for schedule_at
#
alias :at :schedule_at
#
# Schedules a job by stating in how much time it should trigger.
# Returns the a job_id that can be used to unschedule the job.
#
# This method returns a job identifier which can be used to unschedule()
# the job.
#
def schedule_in (duration, params={}, &block)
do_schedule_at(
Time.new.to_f + Rufus.duration_to_f(duration),
prepare_params(params),
&block)
end
#
# a shortcut for schedule_in
#
alias :in :schedule_in
#
# Schedules a job in a loop. After an execution, it will not execute
# before the time specified in 'freq'.
#
# This method returns a job identifier which can be used to unschedule()
# the job.
#
# In case of exception in the job, it will be rescheduled. If you don't
# want the job to be rescheduled, set the parameter :try_again to false.
#
# scheduler.schedule_every "500", :try_again => false do
# do_some_prone_to_error_stuff()
# # won't get rescheduled in case of exception
# end
#
# Since rufus-scheduler 1.0.2, the params :first_at and :first_in are
# accepted.
#
# scheduler.schedule_every "2d", :first_in => "5h" do
# # schedule something every two days, start in 5 hours...
# end
#
# (without setting a :first_in (or :first_at), our example schedule would
# have had been triggered after two days).
#
def schedule_every (freq, params={}, &block)
params = prepare_params(params)
params[:every] = freq
first_at = params[:first_at]
first_in = params[:first_in]
#params[:delayed] = true if first_at or first_in
first_at = if first_at
at_to_f(first_at)
elsif first_in
Time.now.to_f + Rufus.duration_to_f(first_in)
else
Time.now.to_f + Rufus.duration_to_f(freq) # not triggering immediately
end
do_schedule_at(first_at, params, &block)
end
#
# a shortcut for schedule_every
#
alias :every :schedule_every
#
# Schedules a cron job, the 'cron_line' is a string
# following the Unix cron standard (see "man 5 crontab" in your command
# line, or http://www.google.com/search?q=man%205%20crontab).
#
# For example :
#
# scheduler.schedule("5 0 * * *", s)
# # will trigger the schedulable s every day
# # five minutes after midnight
#
# scheduler.schedule("15 14 1 * *", s)
# # will trigger s at 14:15 on the first of every month
#
# scheduler.schedule("0 22 * * 1-5") do
# puts "it's break time..."
# end
# # outputs a message every weekday at 10pm
#
# Returns the job id attributed to this 'cron job', this id can
# be used to unschedule the job.
#
# This method returns a job identifier which can be used to unschedule()
# the job.
#
def schedule (cron_line, params={}, &block)
params = prepare_params(params)
#
# is a job with the same id already scheduled ?
cron_id = params[:cron_id] || params[:job_id]
#@unschedule_queue << cron_id
#
# schedule
b = to_block(params, &block)
job = CronJob.new(self, cron_id, cron_line, params, &b)
@schedule_queue << job
job.job_id
end
#
# an alias for schedule()
#
alias :cron :schedule
#--
#
# The UNscheduling methods
#
#++
#
# Unschedules an 'at' or a 'cron' job identified by the id
# it was given at schedule time.
#
def unschedule (job_id)
@unschedule_queue << job_id
end
#
# Unschedules a cron job
#
# (deprecated : use unschedule(job_id) for all the jobs !)
#
def unschedule_cron_job (job_id)
unschedule(job_id)
end
#--
#
# 'query' methods
#
#++
#
# Returns the job corresponding to job_id, an instance of AtJob
# or CronJob will be returned.
#
def get_job (job_id)
@cron_jobs[job_id] || @non_cron_jobs[job_id]
end
#
# Finds a job (via get_job()) and then returns the wrapped
# schedulable if any.
#
def get_schedulable (job_id)
j = get_job(job_id)
j.respond_to?(:schedulable) ? j.schedulable : nil
end
#
# Returns an array of jobs that have the given tag.
#
def find_jobs (tag=nil)
jobs = @cron_jobs.values + @non_cron_jobs.values
jobs = jobs.select { |job| job.has_tag?(tag) } if tag
jobs
end
#
# Returns all the jobs in the scheduler.
#
def all_jobs
find_jobs()
end
#
# Finds the jobs with the given tag and then returns an array of
# the wrapped Schedulable objects.
# Jobs that haven't a wrapped Schedulable won't be included in the
# result.
#
def find_schedulables (tag)
find_jobs(tag).find_all { |job| job.respond_to?(:schedulable) }
end
#
# Returns the number of currently pending jobs in this scheduler
# ('at' jobs and 'every' jobs).
#
def pending_job_count
@pending_jobs.size
end
#
# Returns the number of cron jobs currently active in this scheduler.
#
def cron_job_count
@cron_jobs.size
end
#
# Returns the current count of 'every' jobs scheduled.
#
def every_job_count
@non_cron_jobs.values.select { |j| j.class == EveryJob }.size
end
#
# Returns the current count of 'at' jobs scheduled (not 'every').
#
def at_job_count
@non_cron_jobs.values.select { |j| j.class == AtJob }.size
end
#
# Returns true if the given string seems to be a cron string.
#
def self.is_cron_string (s)
s.match ".+ .+ .+ .+ .+" # well...
end
private
#
# the unschedule work itself.
#
def do_unschedule (job_id)
job = get_job job_id
return (@cron_jobs.delete(job_id) != nil) if job.is_a?(CronJob)
return false unless job # not found
if job.is_a?(AtJob) # catches AtJob and EveryJob instances
@non_cron_jobs.delete(job_id)
job.params[:dont_reschedule] = true # for AtJob as well, no worries
end
for i in 0...@pending_jobs.length
if @pending_jobs[i].job_id == job_id
@pending_jobs.delete_at i
return true # asap
end
end
true
end
#
# Making sure that params is a Hash.
#
def prepare_params (params)
params.is_a?(Schedulable) ? { :schedulable => params } : params
end
#
# The core method behind schedule_at and schedule_in (and also
# schedule_every). It's protected, don't use it directly.
#
def do_schedule_at (at, params={}, &block)
job = params.delete(:job)
unless job
jobClass = params[:every] ? EveryJob : AtJob
b = to_block(params, &block)
job = jobClass.new(self, at_to_f(at), params[:job_id], params, &b)
end
if jobClass == AtJob && job.at < (Time.new.to_f + @precision)
job.trigger() unless params[:discard_past]
@non_cron_jobs.delete job.job_id # just to be sure
return nil
end
@non_cron_jobs[job.job_id] = job
@schedule_queue << job
job.job_id
end
#
# Ensures an 'at' instance is translated to a float
# (to be compared with the float coming from time.to_f)
#
def at_to_f (at)
at = Rufus::to_ruby_time(at) if at.kind_of?(String)
at = Rufus::to_gm_time(at) if at.kind_of?(DateTime)
at = at.to_f if at.kind_of?(Time)
raise "cannot schedule at : #{at.inspect}" unless at.is_a?(Float)
at
end
#
# Returns a block. If a block is passed, will return it, else,
# if a :schedulable is set in the params, will return a block
# wrapping a call to it.
#
def to_block (params, &block)
return block if block
schedulable = params.delete(:schedulable)
return nil unless schedulable
l = lambda do
schedulable.trigger(params)
end
class << l
attr_accessor :schedulable
end
l.schedulable = schedulable
l
end
#
# Pushes an 'at' job into the pending job list
#
def push_pending_job (job)
old = @pending_jobs.find { |j| j.job_id == job.job_id }
@pending_jobs.delete(old) if old
#
# override previous job with same id
if @pending_jobs.length < 1 or job.at >= @pending_jobs.last.at
@pending_jobs << job
return
end
for i in 0...@pending_jobs.length
if job.at <= @pending_jobs[i].at
@pending_jobs[i, 0] = job
return # right place found
end
end
end
#
# This is the method called each time the scheduler wakes up
# (by default 4 times per second). It's meant to quickly
# determine if there are jobs to trigger else to get back to sleep.
# 'cron' jobs get executed if necessary then 'at' jobs.
#
def step
step_unschedule
# unschedules any job in the unschedule queue before
# they have a chance to get triggered.
step_trigger
# triggers eligible jobs
step_schedule
# schedule new jobs
# done.
end
#
# unschedules jobs in the unschedule_queue
#
def step_unschedule
loop do
break if @unschedule_queue.empty?
do_unschedule(@unschedule_queue.pop)
end
end
#
# adds every job waiting in the @schedule_queue to
# either @pending_jobs or @cron_jobs.
#
def step_schedule
loop do
break if @schedule_queue.empty?
j = @schedule_queue.pop
if j.is_a?(CronJob)
@cron_jobs[j.job_id] = j
else # it's an 'at' job
push_pending_job j
end
end
end
#
# triggers every eligible pending (at or every) jobs, then every eligible
# cron jobs.
#
def step_trigger
now = Time.now
if @exit_when_no_more_jobs && @pending_jobs.size < 1
@stopped = true
return
end
# TODO : eventually consider running cron / pending
# job triggering in two different threads
#
# but well... there's the synchronization issue...
#
# cron jobs
if now.sec != @last_cron_second
@last_cron_second = now.sec
@cron_jobs.each do |cron_id, cron_job|
#trigger(cron_job) if cron_job.matches?(now, @precision)
cron_job.trigger if cron_job.matches?(now)
end
end
#
# pending jobs
now = now.to_f
#
# that's what at jobs do understand
loop do
break if @pending_jobs.length < 1
job = @pending_jobs[0]
break if job.at > now
#if job.at <= now
#
# obviously
job.trigger
@pending_jobs.delete_at 0
end
end
#
# If an error occurs in the job, it well get caught and an error
# message will be displayed to STDOUT.
# If this scheduler provides a lwarn(message) method, it will
# be used insted.
#
# Of course, one can override this method.
#
def log_exception (e)
message =
"trigger() caught exception\n" +
e.to_s + "\n" +
e.backtrace.join("\n")
if self.respond_to?(:lwarn)
lwarn { message }
else
puts message
end
end
end
#
# This module adds a trigger method to any class that includes it.
# The default implementation feature here triggers an exception.
#
module Schedulable
def trigger (params)
raise "trigger() implementation is missing"
end
def reschedule (scheduler)
raise "reschedule() implentation is missing"
end
end
#
# This error is thrown when the :timeout attribute triggers
#
class TimeOutError < RuntimeError
end
protected
JOB_ID_LOCK = Mutex.new
#
# would it be better to use a Mutex instead of a full-blown
# Monitor ?
#
# The parent class for scheduled jobs.
#
class Job
@@last_given_id = 0
#
# as a scheduler is fully transient, no need to
# have persistent ids, a simple counter is sufficient
#
# The identifier for the job
#
attr_accessor :job_id
#
# An array of tags
#
attr_accessor :tags
#
# The block to execute at trigger time
#
attr_accessor :block
#
# A reference to the scheduler
#
attr_reader :scheduler
#
# Keeping a copy of the initialization params of the job.
#
attr_reader :params
#
# if the job is currently executing, this field points to
# the 'trigger thread'
#
attr_reader :trigger_thread
def initialize (scheduler, job_id, params, &block)
@scheduler = scheduler
@block = block
if job_id
@job_id = job_id
else
JOB_ID_LOCK.synchronize do
@job_id = @@last_given_id
@@last_given_id = @job_id + 1
end
end
@params = params
#@tags = Array(tags).collect { |tag| tag.to_s }
# making sure we have an array of String tags
@tags = Array(params[:tags])
# any tag is OK
end
#
# Returns true if this job sports the given tag
#
def has_tag? (tag)
@tags.include?(tag)
end
#
# Removes (cancels) this job from its scheduler.
#
def unschedule
@scheduler.unschedule(@job_id)
end
#
# Triggers the job (in a dedicated thread).
#
def trigger
Thread.new do
@trigger_thread = Thread.current
# keeping track of the thread
begin
do_trigger
rescue Exception => e
@scheduler.send(:log_exception, e)
end
#@trigger_thread = nil if @trigger_thread = Thread.current
@trigger_thread = nil
# overlapping executions, what to do ?
end
if trigger_thread_alive? and (to = @params[:timeout])
@scheduler.in(to, :tags => 'timeout') do
@trigger_thread.raise(Rufus::TimeOutError) if trigger_thread_alive?
end
end
end
protected
def trigger_thread_alive?
(@trigger_thread && @trigger_thread.alive?)
end
end
#
# An 'at' job.
#
class AtJob < Job
#
# The float representation (Time.to_f) of the time at which
# the job should be triggered.
#
attr_accessor :at
def initialize (scheduler, at, at_id, params, &block)
super(scheduler, at_id, params, &block)
@at = at
end
#
# Returns the Time instance at which this job is scheduled.
#
def schedule_info
Time.at(@at)
end
#
# next_time is last_time (except for EveryJob instances). Returns
# a Time instance.
#
def next_time
schedule_info
end
protected
#
# Triggers the job (calls the block)
#
def do_trigger
@block.call @job_id, @at
@scheduler.instance_variable_get(:@non_cron_jobs).delete @job_id
end
end
#
# An 'every' job is simply an extension of an 'at' job.
#
class EveryJob < AtJob
#
# Returns the frequency string used to schedule this EveryJob,
# like for example "3d" or "1M10d3h".
#
def schedule_info
@params[:every]
end
protected
#
# triggers the job, then reschedules it if necessary
#
def do_trigger
hit_exception = false
begin
@block.call @job_id, @at, @params
rescue Exception => e
@scheduler.send(:log_exception, e)
hit_exception = true
end
if \
@scheduler.instance_variable_get(:@exit_when_no_more_jobs) or
(@params[:dont_reschedule] == true) or
(hit_exception and @params[:try_again] == false)
@scheduler.instance_variable_get(:@non_cron_jobs).delete(job_id)
# maybe it'd be better to wipe that reference from here anyway...
return
end
#
# ok, reschedule ...
params[:job] = self
@at = @at + Rufus.duration_to_f(params[:every])
@scheduler.send(:do_schedule_at, @at, params)
end
end
#
# A cron job.
#
class CronJob < Job
#
# The CronLine instance representing the times at which
# the cron job has to be triggered.
#
attr_accessor :cron_line
def initialize (scheduler, cron_id, line, params, &block)
super(scheduler, cron_id, params, &block)
if line.is_a?(String)
@cron_line = CronLine.new(line)
elsif line.is_a?(CronLine)
@cron_line = line
else
raise \
"Cannot initialize a CronJob " +
"with a param of class #{line.class}"
end
end
#
# This is the method called by the scheduler to determine if it
# has to fire this CronJob instance.
#
def matches? (time)
#def matches? (time, precision)
#@cron_line.matches?(time, precision)
@cron_line.matches?(time)
end
#
# Returns the original cron tab string used to schedule this
# Job. Like for example "60/3 * * * Sun".
#
def schedule_info
@cron_line.original
end
#
# Returns a Time instance : the next time this cron job is
# supposed to "fire".
#
# 'from' is used to specify the starting point for determining
# what will be the next time. Defaults to now.
#
def next_time (from=Time.now)
@cron_line.next_time(from)
end
protected
#
# As the name implies.
#
def do_trigger
@block.call @job_id, @cron_line, @params
end
end
end