#--
# Copyright (c) 2006-2011, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Made in Japan.
#++
module Rufus
module Scheduler
#
# The base class for all types of jobs.
#
class Job
# A reference to the scheduler owning this job
#
attr_accessor :scheduler
# The initial, raw, scheduling info (at / in / every / cron)
#
attr_reader :t
# Returns the thread instance of the last triggered job.
# May be null (especially before the first trigger).
#
attr_reader :last_job_thread
# The job parameters (passed via the schedule method)
#
attr_reader :params
# The block to call when triggering
#
attr_reader :block
# Last time the job executed
# (for an {At|In}Job, it will mean 'not executed' if nil or when
# it got executed if set)
#
# (
# Last time job got triggered (most useful with EveryJob, but can be
# useful with remaining instances of At/InJob (are they done ?))
# )
#
attr_reader :last
# The identifier for this job.
#
attr_reader :job_id
# Instantiating the job.
#
def initialize(scheduler, t, params, &block)
@scheduler = scheduler
@t = t
@params = params
@block = block || params[:schedulable]
@running = false
@paused = false
raise ArgumentError.new(
'no block or :schedulable passed, nothing to schedule'
) unless @block
@params[:tags] = Array(@params[:tags])
@job_id = params[:job_id] || "#{self.class.name}_#{self.object_id.to_s}"
determine_at
end
# Returns true if this job is currently running (in the middle of #trigger)
#
# Note : paused? is not related to running?
#
def running
@running
end
alias running? running
# Returns true if this job is paused, false else.
#
# A paused job is still scheduled, but does not trigger.
#
# Note : paused? is not related to running?
#
def paused?
@paused
end
# Pauses this job (sets the paused flag to true).
#
# Note that it will not pause the execution of a block currently 'running'.
# Future triggering of the job will not occur until #resume is called.
#
# Note too that, during the pause time, the schedule kept the same. Calling
# #resume will not force old triggers in.
#
def pause
@paused = true
end
# Resumes this job (sets the paused flag to false).
#
# This job will trigger again.
#
def resume
@paused = false
end
# Returns the list of tags attached to the job.
#
def tags
@params[:tags]
end
# Sets the list of tags attached to the job (Usually they are set
# via the schedule every/at/in/cron method).
#
def tags=(tags)
@params[:tags] = Array(tags)
end
# Generally returns the string/float/integer used to schedule the job
# (seconds, time string, date string)
#
def schedule_info
@t
end
# Triggers the job.
#
def trigger(t=Time.now)
return if @paused
@last = t
job_thread = nil
to_job = nil
return if @running and (params[:allow_overlapping] == false)
@running = true
@scheduler.send(:trigger_job, @params) do
#
# Note that #trigger_job is protected, hence the #send
# (Only jobs know about this method of the scheduler)
job_thread = Thread.current
job_thread[
"rufus_scheduler__trigger_thread__#{@scheduler.object_id}"
] = self
@last_job_thread = job_thread
begin
trigger_block
job_thread[
"rufus_scheduler__trigger_thread__#{@scheduler.object_id}"
] = nil
job_thread = nil
to_job.unschedule if to_job
rescue Exception => e
@scheduler.do_handle_exception(self, e)
end
@running = false
end
# note that add_job and add_cron_job ensured that :blocking is
# not used along :timeout
if to = @params[:timeout]
to_job = @scheduler.in(to, :parent => self, :tags => 'timeout') do
# at this point, @job_thread might be set
if job_thread && job_thread.alive?
job_thread.raise(Rufus::Scheduler::TimeOutError)
end
end
end
end
# Simply encapsulating the block#call/trigger operation, for easy
# override.
#
def trigger_block
@block.respond_to?(:call) ?
@block.call(self) : @block.trigger(@params.merge(:job => self))
end
# Unschedules this job.
#
def unschedule
@scheduler.unschedule(self.job_id)
end
end
#
# The base class of at/in/every jobs.
#
class SimpleJob < Job
# When the job is supposed to trigger
#
attr_reader :at
attr_reader :last
def determine_at
@at = Rufus.at_to_f(@t)
end
# Returns the next time (or the unique time) this job is meant to trigger
#
def next_time
Time.at(@at)
end
end
#
# Job that occurs once, in a certain amount of time.
#
class InJob < SimpleJob
# If this InJob is a timeout job, parent points to the job that
# is subject to the timeout.
#
attr_reader :parent
def initialize(scheduler, t, params)
@parent = params[:parent]
super
end
protected
def determine_at
iin = @t.is_a?(Fixnum) || @t.is_a?(Float) ?
@t : Rufus.parse_duration_string(@t)
@at = (Time.now + iin).to_f
end
end
#
# Job that occurs once, at a certain point in time.
#
class AtJob < SimpleJob
end
#
# Recurring job with a certain frequency.
#
class EveryJob < SimpleJob
# The frequency, in seconds, of this EveryJob
#
attr_reader :frequency
def initialize(scheduler, t, params, &block)
super
determine_frequency
determine_at
end
# Triggers the job (and reschedules it).
#
def trigger
schedule_next
super
end
protected
def determine_frequency
@frequency = @t.is_a?(Fixnum) || @t.is_a?(Float) ?
@t : Rufus.parse_duration_string(@t)
end
def determine_at
return unless @frequency
@last = @at
# the first time, @last will be nil
now = Time.now.to_f
@at = if @last
@last + @frequency
else
if fi = @params[:first_in]
now + Rufus.duration_to_f(fi)
elsif fa = @params[:first_at]
Rufus.at_to_f(fa)
else
now + @frequency
end
end
while @at < now do
@at += @frequency
end if @params[:discard_past]
end
# It's an every job, have to schedule next time it occurs...
#
def schedule_next
determine_at
@scheduler.send(:add_job, self)
end
end
#
# Recurring job, cron style.
#
class CronJob < Job
# The CronLine instance, it holds all the info about the cron schedule
#
attr_reader :cron_line
# The job parameters (passed via the schedule method)
#
attr_reader :params
# The block to call when triggering
#
attr_reader :block
# Creates a new CronJob instance.
#
def initialize(scheduler, cron_string, params, &block)
super
@cron_line = case @t
when String then CronLine.new(@t)
when CronLine then @t
else raise ArgumentError.new(
"cannot initialize a CronJob out of #{@t.inspect}")
end
end
def trigger_if_matches(time)
return if @paused
trigger(time) if @cron_line.matches?(time)
end
# Returns the next time this job is meant to trigger
#
def next_time(from=Time.now)
@cron_line.next_time(from)
end
protected
def determine_at
# empty
end
end
end
end