class Rufus::Scheduler


scheduler.new :thread_name => “the crazy scheduler”
it easier in some debugging situations.
You can specify the name of the scheduler’s thread. Should make
== :thread_name option
puts Rufus::CronLine.new(“* * 13 * fri”).next_time
require ‘rufus/scheduler’
require ‘rubygems’
When is the next friday 13th ?
seconds on my 2006 macbook to reach a cron schedule 1 year away.
For cron jobs, the current implementation is quite brutal. It takes three
last time).
when the job will be fired next time (for at and in jobs, this is also the
Jobs, be they at, every or cron have a next_time() method, which tells
== job.next_time()
end
# schedule something every two days, start in 5 hours…
scheduler.schedule_every “2d”, :first_at => “5h” do
end
# schedule something every two days, start in 5 hours…
scheduler.schedule_every “2d”, :first_in => “5h” do
optional parameters, :first_at and :first_in
Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two
== ‘Every jobs’, :first_at and :first_in<br><br>end<br>params = true if $mail
$mail = $inbox.fetch_last_mail
# polls every 10 seconds until a mail arrives
#
schedule.schedule_every “10s” do |job_id, at, params|
Unschedule example :
end
end
“5h” # normal schedule, every 5 hours
else
“1h” # lots of spam, check every hour<br>params = if mails.size > 100
mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
mails = $inbox.fetch_mails
schedule.schedule_every “5h” do |job_id, at, params|
Every jobs can reschedule/unschedule themselves. A reschedule example :
== ‘Every jobs’ and rescheduling
end
end
puts “something wrong happened : ”+e.to_s
def log_exception (e)
class << scheduler
# 2 - overriding the [protected] method log_exception(e) :
end
end
puts block.call
puts “oops, something wrong happened : ”
def lwarn (&block)
class << scheduler
# 1 - providing a lwarn method to the scheduler instance :
case of exception. There are two ways to change that behaviour.
The rufus scheduler will output a stacktrace to the STDOUT in
== Exceptions
method (:tags, :first_at, :first_in, :dont_reschedule, …)
In both cases, params corresponds to the params passed to the schedule
end
puts “my job_id is #{job_id}”
scheduler.schedule “7 * * * * *” do |job_id, cron_line, params|
For schedule(), zero or two parameters can get passed
end
puts “my job_id is #{job_id}”
scheduler.schedule_every(“1h20m”) do |job_id, at, params|
expects zero or 3 parameters like in
When calling schedule_every(), schedule_in() or schedule_at(), the block
== information passed to schedule blocks
value (“7”), a list of values (“7,8,9,27”) or a range (“7-12”).
scheduling. This column can, like for the other columns, specify a
The rufus scheduler recognizes an optional first column for second
end
puts “it’s now the seventh second of the minute”
scheduler.schedule “7 * * * * *” do
A cron schedule can be set at the second level :
== Cron up to the second
by itself, so no serialization issue.
prevents you from using anything else. The scheduler has no persistence
The vanilla case for tags assume they are String instances, but nothing
end
init_backup_sequence()
scheduler.schedule_in “2h”, :tags => [ “backup”, “important” ] do
Multiple tags may be attached to a single job :
jobs.each { |job| job.unschedule }
jobs = find_jobs ‘backup’
end
do_this_or_that()
scheduler.schedule “0 24 * * *”, :tags => “new_day” do
end
init_backup_sequence()
scheduler.schedule_in “2h”, :tags => “backup” do
Tags can be attached to jobs scheduled :
== Tags
will raise an exception.
scheduler = Rufus::Scheduler.new :scheduler_precision => 5.0
or
scheduler.precision = 4.0
Thus
precision : 0.0 < p <= 1.0
Note that rufus-scheduler places a constraint on the values for the
# (the default is 4 times per second (0.250))
# instatiates a scheduler that checks its jobs twice per second
#
scheduler.start
scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
The :scheduler_precision can be set when instantiating the scheduler.
join_until_no_more_jobs() wraps it.
This attribute is best used indirectly : the method
to true and start the scheduler, the scheduler will immediately exit.
Use with care though, if you create a scheduler, set this attribute
run.
‘true’, the scheduler will exit as soon as there are no more jobs to
The scheduler has a “exit_when_no_more_jobs” attribute. When set to
end
regenerate_latest_report()
scheduler.schedule_every(“1h20m”) do
There is also schedule_every() :
# will regenerate the monthly report in 5 days
#
{ :schedulable => regenerator, :scope => :month })
“5d”,
scheduler.schedule_in(
# will regenerate the report in four days
#
scheduler.schedule_in(“4d”, regenerator)
regenerator = Regenerator.new
end
end
# …
def yearly
end
# …
def monthly
end
self.send(frequency)
def trigger (frequency)
class Regenerator < Schedulable
an example that uses a Schedulable class :
end
init_self_destruction_sequence()
job_id = scheduler.schedule_at “Sun Oct 07 14:24:01 +0900 2009” do
end
activate_security_system()
log.info “activating security system…”
scheduler.schedule “0 22 * * 1-5” do
# in 3 days from now
# will call the regenerate_monthly_report method
#
end
regenerate_monthly_report()
scheduler.schedule_in(“3d”) do
scheduler = Rufus::Scheduler.start_new
require ‘rufus/scheduler’
require ‘rubygems’
== Examples
will still run OK with “rufus-scheduler”.
s = OpenWFE::Scheduler.new
require ‘openwfe/util/scheduler’
require ‘rubygems’
names.
“rufus-scheduler”, this new gem has ‘pointers’ for the old class
To ensure that code tapping the previous gem still runs fine with
This scheduler was previously known as the “openwferu-scheduler” gem.
== The gem “openwferu-scheduler”
Ruby way.
params (usually an array or nil), either a block, which is more in the
schedule_at() and schedule() await either a Schedulable instance and
The two main methods are thus schedule_at() and schedule().
execute a specified intervals.
‘at’ jobs to execute once at a given point in time. ‘cron’ jobs
The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs.

def self.start_new


Instantiates a new Rufus::Scheduler instance, starts it and returns it
def self.start_new
  s = self.new
  s.start
  s
end

def at_job_count


Returns the current count of 'at' jobs scheduled (not 'every').
def at_job_count
  @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
end

def at_to_f (at)


(to be compared with the float coming from time.to_f)
Ensures an 'at' instance is translated to a float
def at_to_f (at)
  at = Rufus::to_ruby_time(at) if at.kind_of?(String)
  at = Rufus::to_gm_time(at) if at.kind_of?(DateTime)
  at = at.to_f if at.kind_of?(Time)
  at
end

def cron_job_count


Returns the number of cron jobs currently active in this scheduler.
def cron_job_count
  @cron_jobs.size
end

def do_schedule_at (at, params={}, &block)


schedule_every). It's protected, don't use it directly.
The core method behind schedule_at and schedule_in (and also
def do_schedule_at (at, params={}, &block)
  #puts "0 at is '#{at.to_s}' (#{at.class})"
  at = at_to_f at
  #puts "1 at is '#{at.to_s}' (#{at.class})"}"
  jobClass = params[:every] ? EveryJob : AtJob
  job_id = params[:job_id]
  b = to_block params, &block
  job = jobClass.new self, at, job_id, params, &b
  #do_unschedule(job_id) if job_id
  if at < (Time.new.to_f + @precision)
    job.trigger() unless params[:discard_past]
    return nil
  end
  @schedule_queue << job
  job.job_id
end

def do_unschedule (job_id)

def do_unschedule (job_id)
  for i in 0...@pending_jobs.length
    if @pending_jobs[i].job_id == job_id
      @pending_jobs.delete_at i
      return true
    end
  end
    #
    # not using delete_if because it scans the whole list
  do_unschedule_cron_job job_id
end

def do_unschedule_cron_job (job_id)

def do_unschedule_cron_job (job_id)
  (@cron_jobs.delete(job_id) != nil)
end

def duration_to_f (s)


will yields 10.0

duration_to_f("10s")

Ensures that a duration is a expressed as a Float instance.
def duration_to_f (s)
  return s if s.kind_of?(Float)
  return Rufus::parse_time_string(s) if s.kind_of?(String)
  Float(s.to_s)
end

def every_job_count


Returns the current count of 'every' jobs scheduled.
def every_job_count
  @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
end

def find_jobs (tag)


Returns an array of jobs that have the given tag.
def find_jobs (tag)
  @cron_jobs.values.find_all { |job| job.has_tag?(tag) } +
  @pending_jobs.find_all { |job| job.has_tag?(tag) }
end

def find_schedulables (tag)


result.
Jobs that haven't a wrapped Schedulable won't be included in the
the wrapped Schedulable objects.
Finds the jobs with the given tag and then returns an array of
def find_schedulables (tag)
  find_jobs(tag).find_all { |job| job.respond_to?(:schedulable) }
end

def get_job (job_id)


or CronJob will be returned.
Returns the job corresponding to job_id, an instance of AtJob
def get_job (job_id)
  @cron_jobs[job_id] || @pending_jobs.find { |job| job.job_id == job_id }
end

def get_schedulable (job_id)


schedulable if any.
Finds a job (via get_job()) and then returns the wrapped
def get_schedulable (job_id)
  #return nil unless job_id
  j = get_job(job_id)
  return j.schedulable if j.respond_to?(:schedulable)
  nil
end

def initialize (params={})

def initialize (params={})
  super()
  @pending_jobs = []
  @cron_jobs = {}
  @schedule_queue = Queue.new
  @unschedule_queue = Queue.new
    #
    # sync between the step() method and the [un]schedule
    # methods is done via these queues, no more mutex
  @scheduler_thread = nil
  @precision = 0.250
    # every 250ms, the scheduler wakes up (default value)
  begin
    self.precision = Float(params[:scheduler_precision])
  rescue Exception => e
    # let precision at its default value
  end
  @thread_name = params[:thread_name] || "rufus scheduler"
  #@correction = 0.00045
  @exit_when_no_more_jobs = false
  @dont_reschedule_every = false
  @last_cron_second = -1
  @stopped = true
end

def join


Joins on the scheduler thread
def join
  @scheduler_thread.join
end

def join_until_no_more_jobs


Currently used only in unit tests.

there aren't no more 'at' (or 'every') jobs in the scheduler.
Thus the scheduler will exit (and the join terminates) as soon as
attribute of this scheduler to true before joining.
Like join() but takes care of setting the 'exit_when_no_more_jobs'
def join_until_no_more_jobs
  @exit_when_no_more_jobs = true
  join
end

def log_exception (e)


Of course, one can override this method.

be used insted.
If this scheduler provides a lwarn(message) method, it will
message will be displayed to STDOUT.
If an error occurs in the job, it well get caught and an error
def log_exception (e)
  message =
    "trigger() caught exception\n" +
    e.to_s + "\n" +
    e.backtrace.join("\n")
  if self.respond_to?(:lwarn)
    lwarn { message }
  else
    puts message
  end
end

def pending_job_count


('at' jobs and 'every' jobs).
Returns the number of currently pending jobs in this scheduler
def pending_job_count
  @pending_jobs.size
end

def precision= (f)


Setting the precision ( 0.0 < p <= 1.0 )
def precision= (f)
  raise "precision must be 0.0 < p <= 1.0" \
    if f <= 0.0 or f > 1.0
  @precision = f
end

def prepare_params (params)


Making sure that params is a Hash.
def prepare_params (params)
  params = { :schedulable => params } \
    if params.is_a?(Schedulable)
  params
end

def push_pending_job (job)


Pushes an 'at' job into the pending job list
def push_pending_job (job)
  old = @pending_jobs.find { |j| j.job_id == job.job_id }
  @pending_jobs.delete(old) if old
    #
    # override previous job with same id
  if @pending_jobs.length < 1 or job.at >= @pending_jobs.last.at
    @pending_jobs << job
    return
  end
  for i in 0...@pending_jobs.length
    if job.at <= @pending_jobs[i].at
      @pending_jobs[i, 0] = job
      return # right place found
    end
  end
end

def schedule (cron_line, params={}, &block)


the job.
This method returns a job identifier which can be used to unschedule()

be used to unschedule the job.
Returns the job id attributed to this 'cron job', this id can

# outputs a message every weekday at 10pm
end
puts "it's break time..."
scheduler.schedule("0 22 * * 1-5") do

# will trigger s at 14:15 on the first of every month
scheduler.schedule("15 14 1 * *", s)

# five minutes after midnight
# will trigger the schedulable s every day
scheduler.schedule("5 0 * * *", s)

For example :

line, or http://www.google.com/search?q=man%205%20crontab).
following the Unix cron standard (see "man 5 crontab" in your command
Schedules a cron job, the 'cron_line' is a string
def schedule (cron_line, params={}, &block)
  params = prepare_params(params)
  #
  # is a job with the same id already scheduled ?
  cron_id = params[:cron_id]
  cron_id = params[:job_id] unless cron_id
  #unschedule(cron_id) if cron_id
  @unschedule_queue << [ :cron, cron_id ]
  #
  # schedule
  b = to_block(params, &block)
  job = CronJob.new(self, cron_id, cron_line, params, &b)
  #@cron_jobs[job.job_id] = job
  @schedule_queue << job
  job.job_id
end

def schedule_at (at, params={}, &block)



And 'jobid' will hold a nil (not scheduled).

end
puts "you'll never read this message"
jobid = scheduler.schedule_at(yesterday, :discard_past => true) do

true :
To avoid the triggering, the parameter :discard_past may be set to
but not scheduled.
If the job is specified in the past, it will be triggered immediately

the job.
This method returns a job identifier which can be used to unschedule()

Returns the a job_id that can be used to unschedule the job.
Schedules a job by specifying at which time it should trigger.
def schedule_at (at, params={}, &block)
  do_schedule_at(
    at,
    prepare_params(params),
    &block)
end

def schedule_every (freq, params={}, &block)


end
# schedule something every two days, start in 5 hours...
scheduler.schedule_every "2d", :first_in => "5h" do

accepted.
Since rufus-scheduler 1.0.2, the params :first_at and :first_in are

end
# won't get rescheduled in case of exception
do_some_prone_to_error_stuff()
scheduler.schedule_every "500", :try_again => false do

want the job to be rescheduled, set the parameter :try_again to false.
In case of exception in the job, it will be rescheduled. If you don't

the job.
This method returns a job identifier which can be used to unschedule()

before the time specified in 'freq'.
Schedules a job in a loop. After an execution, it will not execute
def schedule_every (freq, params={}, &block)
  f = duration_to_f freq
  params = prepare_params params
  schedulable = params[:schedulable]
  params[:every] = freq
  first_at = params.delete :first_at
  first_in = params.delete :first_in
  previous_at = params[:previous_at]
  next_at = if first_at
    first_at
  elsif first_in
    Time.now.to_f + duration_to_f(first_in)
  elsif previous_at
    previous_at + f
  else
    Time.now.to_f + f
  end
  do_schedule_at(next_at, params) do |job_id, at|
    #
    # trigger ...
    hit_exception = false
    begin
      if schedulable
        schedulable.trigger params
      else
        block.call job_id, at, params
      end
    rescue Exception => e
      log_exception e
      hit_exception = true
    end
    # cannot use a return here !!! (block)
    unless \
      @dont_reschedule_every or
      (params[:dont_reschedule] == true) or
      (hit_exception and params[:try_again] == false)
      #
      # ok, reschedule ...
      params[:job_id] = job_id
      params[:previous_at] = at
      schedule_every params[:every], params, &block
        #
        # yes, this is a kind of recursion
        # note that params[:every] might have been changed
        # by the block/schedulable code
    end
    job_id
  end
end

def schedule_in (duration, params={}, &block)


the job.
This method returns a job identifier which can be used to unschedule()

Returns the a job_id that can be used to unschedule the job.
Schedules a job by stating in how much time it should trigger.
def schedule_in (duration, params={}, &block)
  do_schedule_at(
    Time.new.to_f + duration_to_f(duration),
    prepare_params(params),
    &block)
end

def start


Starts this scheduler (or restart it if it was previously stopped)
def start
  @stopped = false
  @scheduler_thread = Thread.new do
    Thread.current[:name] = @thread_name
    if defined?(JRUBY_VERSION)
      require 'java'
      java.lang.Thread.current_thread.name = @thread_name
    end
    loop do
      break if @stopped
      t0 = Time.now.to_f
      step
      d = Time.now.to_f - t0 # + @correction
      next if d > @precision
      sleep (@precision - d)
    end
  end
end

def step


'cron' jobs get executed if necessary then 'at' jobs.
determine if there are jobs to trigger else to get back to sleep.
(by default 4 times per second). It's meant to quickly
This is the method called each time the scheduler wakes up
def step
  #puts Time.now.to_f
  #puts @pending_jobs.collect { |j| [ j.job_id, j.at ] }.inspect
  step_unschedule
    # unschedules any job in the unschedule queue before
    # they have a chance to get triggered.
  step_trigger
    # triggers eligible jobs
  step_schedule
    # schedule new jobs
  # done.
end

def step_schedule


either @pending_jobs or @cron_jobs.
adds every job waiting in the @schedule_queue to
def step_schedule
  loop do
    break if @schedule_queue.empty?
    j = @schedule_queue.pop
    if j.is_a?(CronJob)
      @cron_jobs[j.job_id] = j
    else # it's an 'at' job
      push_pending_job j
    end
  end
end

def step_trigger


cron jobs.
triggers every eligible pending jobs, then every eligible
def step_trigger
  now = Time.new
  if @exit_when_no_more_jobs
    if @pending_jobs.size < 1
      @stopped = true
      return
    end
    @dont_reschedule_every = true if at_job_count < 1
  end
  # TODO : eventually consider running cron / pending
  # job triggering in two different threads
  #
  # but well... there's the synchronization issue...
  #
  # cron jobs
  if now.sec != @last_cron_second
    @last_cron_second = now.sec
    #puts "step() @cron_jobs.size #{@cron_jobs.size}"
    @cron_jobs.each do |cron_id, cron_job|
      #puts "step() cron_id : #{cron_id}"
      #trigger(cron_job) if cron_job.matches?(now, @precision)
      trigger(cron_job) if cron_job.matches?(now)
    end
  end
  #
  # pending jobs
  now = now.to_f
    #
    # that's what at jobs do understand
  loop do
    break if @pending_jobs.length < 1
    job = @pending_jobs[0]
    break if job.at > now
    #if job.at <= now
      #
      # obviously
    trigger job
    @pending_jobs.delete_at 0
  end
end

def step_unschedule


unschedules jobs in the unschedule_queue
def step_unschedule
  loop do
    break if @unschedule_queue.empty?
    type, job_id = @unschedule_queue.pop
    if type == :cron
      do_unschedule_cron_job job_id
    else
      do_unschedule job_id
    end
  end
end

def stop


The scheduler is stoppable via sstop()
def stop
  @stopped = true
end

def to_block (params, &block)


wrapping a call to it.
if a :schedulable is set in the params, will return a block
Returns a block. If a block is passed, will return it, else,
def to_block (params, &block)
  return block if block
  schedulable = params[:schedulable]
  return nil unless schedulable
  params.delete :schedulable
  l = lambda do
    schedulable.trigger(params)
  end
  class << l
    attr_accessor :schedulable
  end
  l.schedulable = schedulable
  l
end

def trigger (job)


Triggers the job (in a dedicated thread).
def trigger (job)
  Thread.new do
    begin
      job.trigger
    rescue Exception => e
      log_exception e
    end
  end
end

def unschedule (job_id)


it was given at schedule time.
Unschedules an 'at' or a 'cron' job identified by the id
def unschedule (job_id)
  @unschedule_queue << [ :at, job_id ]
end

def unschedule_cron_job (job_id)


Unschedules a cron job
def unschedule_cron_job (job_id)
  @unschedule_queue << [ :cron, job_id ]
end