class Rufus::Scheduler


end
# schedule something every two days, start in 5 hours…
scheduler.schedule_every “2d”, :first_at => “5h” do
end
# schedule something every two days, start in 5 hours…
scheduler.schedule_every “2d”, :first_in => “5h” do
optional parameters, :first_at and :first_in
Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two
== ‘Every jobs’, :first_at and :first_in<br><br>end<br>params = true if $mail
$mail = $inbox.fetch_last_mail
# polls every 10 seconds until a mail arrives
#
schedule.schedule_every “10s” do |job_id, at, params|
Unschedule example :
end
end
“5h” # normal schedule, every 5 hours
else
“1h” # lots of spam, check every hour<br>params = if mails.size > 100
mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
mails = $inbox.fetch_mails
schedule.schedule_every “5h” do |job_id, at, params|
Every jobs can reschedule/unschedule themselves. A reschedule example :
== ‘Every jobs’ and rescheduling
end
end
puts “something wrong happened : ”+e.to_s
def log_exception (e)
class << scheduler
# 2 - overriding the [protected] method log_exception(e) :
end
end
puts block.call
puts “oops, something wrong happened : ”
def lwarn (&block)
class << scheduler
# 1 - providing a lwarn method to the scheduler instance :
case of exception. There are two ways to change that behaviour.
The rufus scheduler will output a stacktrace to the STDOUT in
== Exceptions
value (“7”), a list of values (“7,8,9,27”) or a range (“7-12”).
scheduling. This column can, like for the other columns, specify a
The rufus scheduler recognizes an optional first column for second
end
puts “it’s now the seventh second of the minute”
scheduler.schedule “7 * * * * *” do
A cron schedule can be set at the second level :
== Cron up to the second
by itself, so no serialization issue.
prevents you from using anything else. The scheduler has no persistence
The vanilla case for tags assume they are String instances, but nothing
end
init_backup_sequence()
scheduler.schedule_in “2h”, :tags => [ “backup”, “important” ] do
Multiple tags may be attached to a single job :
jobs.each { |job| job.unschedule }
jobs = find_jobs ‘backup’
end
do_this_or_that()
scheduler.schedule “0 24 * * *”, :tags => “new_day” do
end
init_backup_sequence()
scheduler.schedule_in “2h”, :tags => “backup” do
Tags can be attached to jobs scheduled :
== Tags
# (the default is 4 times per second (0.250))
# instatiates a scheduler that checks its jobs twice per second
#
scheduler.start
scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
The :scheduler_precision can be set when instantiating the scheduler.
join_until_no_more_jobs() wraps it.
This attribute is best used indirectly : the method
to true and start the scheduler, the scheduler will immediately exit.
Use with care though, if you create a scheduler, set this attribute
run.
‘true’, the scheduler will exit as soon as there are no more jobs to
The scheduler has a “exit_when_no_more_jobs” attribute. When set to
end
regenerate_latest_report()
scheduler.schedule_every(“1h20m”) do
There is also schedule_every() :
# will regenerate the monthly report in 5 days
#
{ :schedulable => regenerator, :scope => :month })
“5d”,
scheduler.schedule_in(
# will regenerate the report in four days
#
scheduler.schedule_in(“4d”, regenerator)
regenerator = Regenerator.new
end
end
# …
def yearly
end
# …
def monthly
end
self.send(frequency)
def trigger (frequency)
class Regenerator < Schedulable
an example that uses a Schedulable class :
end
init_self_destruction_sequence()
job_id = scheduler.schedule_at “Sun Oct 07 14:24:01 +0900 2009” do
end
activate_security_system()
log.info “activating security system…”
scheduler.schedule “0 22 * * 1-5” do
# in 3 days from now
# will call the regenerate_monthly_report method
#
end
regenerate_monthly_report()
scheduler.schedule_in(“3d”) do
require ‘rufus/scheduler’
require ‘rubygems’
== Examples
will still run OK with “rufus-scheduler”.
s = OpenWFE::Scheduler.new
require ‘openwfe/util/scheduler’
require ‘rubygems’
names.
“rufus-scheduler”, this new gem has ‘pointers’ for the old class
To ensure that code tapping the previous gem still runs fine with
This scheduler was previously known as the “openwferu-scheduler” gem.
== The gem “openwferu-scheduler”
Ruby way.
params (usually an array or nil), either a block, which is more in the
schedule_at() and schedule() await either a Schedulable instance and
The two main methods are thus schedule_at() and schedule().
execute a specified intervals.
‘at’ jobs to execute once at a given point in time. ‘cron’ jobs
The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs.

def at_job_count


Returns the current count of 'at' jobs scheduled (not 'every').
def at_job_count
    @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
end

def at_to_f (at)


(to be compared with the float coming from time.to_f)
Ensures an 'at' instance is translated to a float
def at_to_f (at)
    at = Rufus::to_ruby_time(at) if at.kind_of?(String)
    at = Rufus::to_gm_time(at) if at.kind_of?(DateTime)
    at = at.to_f if at.kind_of?(Time)
    at
end

def cron_job_count


Returns the number of cron jobs currently active in this scheduler.
def cron_job_count
    @cron_jobs.size
end

def do_schedule_at (at, params={}, &block)


schedule_every). It's protected, don't use it directly.
The core method behind schedule_at and schedule_in (and also
def do_schedule_at (at, params={}, &block)
    #puts "0 at is '#{at.to_s}' (#{at.class})"
    at = at_to_f at
    #puts "1 at is '#{at.to_s}' (#{at.class})"}"
    jobClass = params[:every] ? EveryJob : AtJob
    job_id = params[:job_id]
    b = to_block params, &block
    job = jobClass.new self, at, job_id, params, &b
    #do_unschedule(job_id) if job_id 
    if at < (Time.new.to_f + @precision)
        job.trigger() unless params[:discard_past]
        return nil
    end
    @schedule_queue << job
    job.job_id
end

def do_unschedule (job_id)

def do_unschedule (job_id)
    for i in 0...@pending_jobs.length
        if @pending_jobs[i].job_id == job_id
            @pending_jobs.delete_at i
            return true
        end
    end
        #
        # not using delete_if because it scans the whole list
    do_unschedule_cron_job job_id
end

def do_unschedule_cron_job (job_id)

def do_unschedule_cron_job (job_id)
    (@cron_jobs.delete(job_id) != nil)
end

def duration_to_f (s)


will yields 10.0

duration_to_f("10s")

Ensures that a duration is a expressed as a Float instance.
def duration_to_f (s)
    return s if s.kind_of?(Float)
    return Rufus::parse_time_string(s) if s.kind_of?(String)
    Float(s.to_s)
end

def every_job_count


Returns the current count of 'every' jobs scheduled.
def every_job_count
    @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
end

def find_jobs (tag)


Returns an array of jobs that have the given tag.
def find_jobs (tag)
    result = @cron_jobs.values.find_all do |job|
        job.has_tag?(tag)
    end
    result + @pending_jobs.find_all do |job|
        job.has_tag?(tag)
    end
end

def find_schedulables (tag)


result.
Jobs that haven't a wrapped Schedulable won't be included in the
the wrapped Schedulable objects.
Finds the jobs with the given tag and then returns an array of
def find_schedulables (tag)
    #jobs = find_jobs(tag)
    #result = []
    #jobs.each do |job|
    #    result.push(job.schedulable) if job.respond_to?(:schedulable)
    #end
    #result
    find_jobs(tags).inject([]) do |result, job|
        result.push(job.schedulable) if job.respond_to?(:schedulable)
        result
    end
end

def get_job (job_id)


or CronJob will be returned.
Returns the job corresponding to job_id, an instance of AtJob
def get_job (job_id)
    job = @cron_jobs[job_id]
    return job if job
    @pending_jobs.find do |job|
        job.job_id == job_id
    end
end

def get_schedulable (job_id)


schedulable if any.
Finds a job (via get_job()) and then returns the wrapped
def get_schedulable (job_id)
    #return nil unless job_id
    j = get_job(job_id)
    return j.schedulable if j.respond_to?(:schedulable)
    nil
end

def initialize (params={})

def initialize (params={})
    super()
    @pending_jobs = []
    @cron_jobs = {}
    @schedule_queue = Queue.new
    @unschedule_queue = Queue.new
        #
        # sync between the step() method and the [un]schedule
        # methods is done via these queues, no more mutex
    @scheduler_thread = nil
    @precision = 0.250
        # every 250ms, the scheduler wakes up (default value)
    begin
        @precision = Float(params[:scheduler_precision])
    rescue Exception => e
        # let precision at its default value
    end
    @exit_when_no_more_jobs = false
    @dont_reschedule_every = false
    @last_cron_second = -1
    @stopped = true
end

def join


Joins on the scheduler thread
def join
    @scheduler_thread.join
end

def join_until_no_more_jobs


Currently used only in unit tests.

there aren't no more 'at' (or 'every') jobs in the scheduler.
Thus the scheduler will exit (and the join terminates) as soon as
attribute of this scheduler to true before joining.
Like join() but takes care of setting the 'exit_when_no_more_jobs'
def join_until_no_more_jobs
    @exit_when_no_more_jobs = true
    join
end

def log_exception (e)


Of course, one can override this method.

be used insted.
If this scheduler provides a lwarn(message) method, it will
message will be displayed to STDOUT.
If an error occurs in the job, it well get caught and an error
def log_exception (e)
    message =
        "trigger() caught exception\n" +
        e.to_s + "\n" +
        e.backtrace.join("\n")
    if self.respond_to?(:lwarn)
        lwarn { message }
    else
        puts message
    end
end

def pending_job_count


('at' jobs and 'every' jobs).
Returns the number of currently pending jobs in this scheduler
def pending_job_count
    @pending_jobs.size
end

def prepare_params (params)


Making sure that params is a Hash.
def prepare_params (params)
    params = { :schedulable => params } \
        if params.is_a?(Schedulable)
    params
end

def push_pending_job (job)


Pushes an 'at' job into the pending job list
def push_pending_job (job)
    old = @pending_jobs.find { |j| j.job_id == job.job_id }
    @pending_jobs.delete(old) if old
        #
        # override previous job with same id
    if @pending_jobs.length < 1 or job.at >= @pending_jobs.last.at
        @pending_jobs << job
        return
    end
    for i in 0...@pending_jobs.length
        if job.at <= @pending_jobs[i].at
            @pending_jobs[i, 0] = job
            return # right place found
        end
    end
end

def schedule (cron_line, params={}, &block)


the job.
This method returns a job identifier which can be used to unschedule()

be used to unschedule the job.
Returns the job id attributed to this 'cron job', this id can

# outputs a message every weekday at 10pm
end
puts "it's break time..."
scheduler.schedule("0 22 * * 1-5") do

# will trigger s at 14:15 on the first of every month
scheduler.schedule("15 14 1 * *", s)

# five minutes after midnight
# will trigger the schedulable s every day
scheduler.schedule("5 0 * * *", s)

For example :

line, or http://www.google.com/search?q=man%205%20crontab).
following the Unix cron standard (see "man 5 crontab" in your command
Schedules a cron job, the 'cron_line' is a string
def schedule (cron_line, params={}, &block)
    params = prepare_params(params)

    #
    # is a job with the same id already scheduled ?
    cron_id = params[:cron_id]
    cron_id = params[:job_id] unless cron_id
    #unschedule(cron_id) if cron_id
    @unschedule_queue << [ :cron, cron_id ]
    #
    # schedule
    b = to_block(params, &block)
    job = CronJob.new(self, cron_id, cron_line, params, &b)
    #@cron_jobs[job.job_id] = job
    @schedule_queue << job
    job.job_id
end

def schedule_at (at, params={}, &block)



And 'jobid' will hold a nil (not scheduled).

end
puts "you'll never read this message"
jobid = scheduler.schedule_at(yesterday, :discard_past => true) do

true :
To avoid the triggering, the parameter :discard_past may be set to
but not scheduled.
If the job is specified in the past, it will be triggered immediately

the job.
This method returns a job identifier which can be used to unschedule()

Returns the a job_id that can be used to unschedule the job.
Schedules a job by specifying at which time it should trigger.
def schedule_at (at, params={}, &block)
    do_schedule_at(
        at, 
        prepare_params(params), 
        &block)
end

def schedule_every (freq, params={}, &block)


end
# schedule something every two days, start in 5 hours...
scheduler.schedule_every "2d", :first_in => "5h" do

accepted.
Since rufus-scheduler 1.0.2, the params :first_at and :first_in are

end
# won't get rescheduled in case of exception
do_some_prone_to_error_stuff()
scheduler.schedule_every "500", :try_again => false do

want the job to be rescheduled, set the parameter :try_again to false.
In case of exception in the job, it will be rescheduled. If you don't

the job.
This method returns a job identifier which can be used to unschedule()

before the time specified in 'freq'.
Schedules a job in a loop. After an execution, it will not execute
def schedule_every (freq, params={}, &block)
    f = duration_to_f freq
    params = prepare_params params
    schedulable = params[:schedulable]
    params[:every] = freq
    first_at = params.delete :first_at
    first_in = params.delete :first_in
    previous_at = params[:previous_at]
    next_at = if first_at
        first_at
    elsif first_in
        Time.now.to_f + duration_to_f(first_in)
    elsif previous_at
        previous_at + f
    else
        Time.now.to_f + f
    end
    do_schedule_at(next_at, params) do |job_id, at|
        #
        # trigger ...
        hit_exception = false
        begin
            if schedulable
                schedulable.trigger params
            else
                block.call job_id, at, params
            end
        rescue Exception => e
            log_exception e
            hit_exception = true
        end
        # cannot use a return here !!! (block)
        unless \
            @dont_reschedule_every or
            (params[:dont_reschedule] == true) or
            (hit_exception and params[:try_again] == false)
            #
            # ok, reschedule ...
            params[:job_id] = job_id
            params[:previous_at] = at
            
            schedule_every params[:every], params, &block
                #
                # yes, this is a kind of recursion
                # note that params[:every] might have been changed
                # by the block/schedulable code
        end
        job_id
    end
end

def schedule_in (duration, params={}, &block)


the job.
This method returns a job identifier which can be used to unschedule()

Returns the a job_id that can be used to unschedule the job.
Schedules a job by stating in how much time it should trigger.
def schedule_in (duration, params={}, &block)
    do_schedule_at(
        Time.new.to_f + duration_to_f(duration),
        prepare_params(params),
        &block)
end

def start


Starts this scheduler (or restart it if it was previously stopped)
def start
    @stopped = false
    @scheduler_thread = Thread.new do
        if defined?(JRUBY_VERSION)
            require 'java'
            java.lang.Thread.current_thread.name = \
                "openwferu scheduler (Ruby Thread)"
        end
        loop do
            break if @stopped
            step
            sleep @precision
                # TODO : adjust precision
        end
    end
end

def step


'cron' jobs get executed if necessary then 'at' jobs.
determine if there are jobs to trigger else to get back to sleep.
(by default 4 times per second). It's meant to quickly
This is the method called each time the scheduler wakes up
def step
    #puts Time.now.to_f
    #puts @pending_jobs.collect { |j| [ j.job_id, j.at ] }.inspect
    step_unschedule
        # unschedules any job in the unschedule queue before
        # they have a chance to get triggered.
    step_trigger
        # triggers eligible jobs
    step_schedule
        # schedule new jobs
    # done.
end

def step_schedule


either @pending_jobs or @cron_jobs.
adds every job waiting in the @schedule_queue to
def step_schedule
    loop do
        break if @schedule_queue.empty?
        j = @schedule_queue.pop
        if j.is_a?(CronJob)
            @cron_jobs[j.job_id] = j
        else # it's an 'at' job
            push_pending_job j
        end
    end
end

def step_trigger


cron jobs.
triggers every eligible pending jobs, then every eligible
def step_trigger
    now = Time.new
    if @exit_when_no_more_jobs
       if @pending_jobs.size < 1
            @stopped = true
            return
        end
        @dont_reschedule_every = true if at_job_count < 1
    end
    # TODO : eventually consider running cron / pending
    # job triggering in two different threads
    #
    # but well... there's the synchronization issue...
    #
    # cron jobs
    if now.sec != @last_cron_second
        @last_cron_second = now.sec
        #puts "step() @cron_jobs.size #{@cron_jobs.size}"
        @cron_jobs.each do |cron_id, cron_job|
            #puts "step() cron_id : #{cron_id}"
            trigger(cron_job) if cron_job.matches?(now)
        end
    end
    #
    # pending jobs
    now = now.to_f
        #
        # that's what at jobs do understand
    loop do
        break if @pending_jobs.length < 1
        job = @pending_jobs[0]
        break if job.at > now
        #if job.at <= now
            #
            # obviously
        trigger job
        @pending_jobs.delete_at 0
    end
end

def step_unschedule


unschedules jobs in the unschedule_queue
def step_unschedule
    loop do
        break if @unschedule_queue.empty?
        type, job_id = @unschedule_queue.pop
        if type == :cron
            do_unschedule_cron_job job_id
        else
            
            do_unschedule job_id
        end
    end
end

def stop


The scheduler is stoppable via sstop()
def stop
    @stopped = true
end

def to_block (params, &block)


wrapping a call to it.
if a :schedulable is set in the params, will return a block
Returns a block. If a block is passed, will return it, else,
def to_block (params, &block)
    return block if block
    schedulable = params[:schedulable]
    return nil unless schedulable
    params.delete :schedulable
    l = lambda do
        schedulable.trigger(params)
    end
    class << l
        attr_accessor :schedulable
    end
    l.schedulable = schedulable
    l
end

def trigger (job)


Triggers the job (in a dedicated thread).
def trigger (job)
    Thread.new do
        begin
            job.trigger
        rescue Exception => e
            log_exception e
        end
    end
end

def unschedule (job_id)


it was given at schedule time.
Unschedules an 'at' or a 'cron' job identified by the id
def unschedule (job_id)
    
    @unschedule_queue << [ :at, job_id ]
end

def unschedule_cron_job (job_id)


Unschedules a cron job
def unschedule_cron_job (job_id)
    @unschedule_queue << [ :cron, job_id ]
end