class Rufus::Scheduler


uses.
accepts a String value following the “Mdhms” scheme the rufus-scheduler
:timeout is applicable to all types of jobs : at, in, every, cron. It
Rufus::TimeOutError (so that you know what to catch).
after 2 hours and half, the ‘long job’ will get interrupted by a
end
do_that_long_job()
scheduler.every “3h”, :timeout => ‘2h30m’ do
rufus-scheduler 1.0.12 introduces a :timeout parameter for jobs.
== specifying a :timeout for a job
of overlapping executions, you only get the most recent thread.
will return the thread of the last triggered instance, thus, in case
triggered. Not that in case of an every or cron job, this method
This new method will return nil if the job is not currently being
thread = job.trigger_thread
job = scheduler.get_job(job_id)
a job currently being triggered.
Since rufus-scheduler 1.0.8, you can have access to the thread of
== job.trigger_thread
scheduler.new :thread_name => “the crazy scheduler”
it easier in some debugging situations.
You can specify the name of the scheduler’s thread. Should make
== :thread_name option
puts Rufus::CronLine.new(“* * 13 * fri”).next_time
require ‘rufus/scheduler’
require ‘rubygems’
When is the next friday 13th ?
seconds on my 2006 macbook to reach a cron schedule 1 year away.
For cron jobs, the current implementation is quite brutal. It takes three
last time).
when the job will be fired next time (for at and in jobs, this is also the
Jobs, be they at, every or cron have a next_time() method, which tells
== job.next_time()
end
# schedule something every two days, start in 5 hours…
scheduler.schedule_every “2d”, :first_at => “5h” do
end
# schedule something every two days, start in 5 hours…
scheduler.schedule_every “2d”, :first_in => “5h” do
optional parameters, :first_at and :first_in
Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two
== ‘Every jobs’, :first_at and :first_in<br><br>end<br>params = true if $mail
$mail = $inbox.fetch_last_mail
# polls every 10 seconds until a mail arrives
#
schedule.schedule_every “10s” do |job_id, at, params|
Unschedule example :
end
end
“5h” # normal schedule, every 5 hours
else
“1h” # lots of spam, check every hour<br>params = if mails.size > 100
mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
mails = $inbox.fetch_mails
schedule.schedule_every “5h” do |job_id, at, params|
Every jobs can reschedule/unschedule themselves. A reschedule example :
== ‘Every jobs’ and rescheduling
end
puts “something wrong happened : ”+e.to_s
def scheduler.log_exception (e)
# or
end
end
puts “something wrong happened : ”+e.to_s
def log_exception (e)
class << scheduler
# 2 - overriding the [protected] method log_exception(e) :
end
puts block.call
puts “oops, something wrong happened : ”
def scheduler.lwarn (&block)
# or
end
end
puts block.call
puts “oops, something wrong happened : ”
def lwarn (&block)
class << scheduler
# 1 - providing a lwarn method to the scheduler instance :
case of exception. There are two ways to change that behaviour.
The rufus scheduler will output a stacktrace to the STDOUT in
== Exceptions
method (:tags, :first_at, :first_in, :dont_reschedule, …)
In both cases, params corresponds to the params passed to the schedule
end
puts “my job_id is #{job_id}”
scheduler.schedule “7 * * * * *” do |job_id, cron_line, params|
For schedule(), zero or two parameters can get passed
end
puts “my job_id is #{job_id}”
scheduler.schedule_every(“1h20m”) do |job_id, at, params|
expects zero or 3 parameters like in
When calling schedule_every(), schedule_in() or schedule_at(), the block
== information passed to schedule blocks
value (“7”), a list of values (“7,8,9,27”) or a range (“7-12”).
scheduling. This column can, like for the other columns, specify a
The rufus scheduler recognizes an optional first column for second
end
puts “it’s now the seventh second of the minute”
scheduler.schedule “7 * * * * *” do
A cron schedule can be set at the second level :
== Cron up to the second
by itself, so no serialization issue.
prevents you from using anything else. The scheduler has no persistence
The vanilla case for tags assume they are String instances, but nothing
end
init_backup_sequence()
scheduler.schedule_in “2h”, :tags => [ “backup”, “important” ] do
Multiple tags may be attached to a single job :
jobs.each { |job| job.unschedule }
jobs = find_jobs ‘backup’
end
do_this_or_that()
scheduler.schedule “0 24 * * *”, :tags => “new_day” do
end
init_backup_sequence()
scheduler.schedule_in “2h”, :tags => “backup” do
Tags can be attached to jobs scheduled :
== Tags
will raise an exception.
scheduler = Rufus::Scheduler.new :scheduler_precision => 5.0
or
scheduler.precision = 4.0
Thus
precision : 0.0 < p <= 1.0
Note that rufus-scheduler places a constraint on the values for the
# (the default is 4 times per second (0.250))
# instatiates a scheduler that checks its jobs twice per second
#
scheduler.start
scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
The :scheduler_precision can be set when instantiating the scheduler.
join_until_no_more_jobs() wraps it.
This attribute is best used indirectly : the method
to true and start the scheduler, the scheduler will immediately exit.
Use with care though, if you create a scheduler, set this attribute
run.
‘true’, the scheduler will exit as soon as there are no more jobs to
The scheduler has a “exit_when_no_more_jobs” attribute. When set to
will first trigger 1 hour and 20 minutes after being scheduled)
(note : a schedule every isn’t triggered immediately, thus this example
end
regenerate_latest_report()
scheduler.schedule_every(“1h20m”) do
There is also schedule_every() :
# will regenerate the monthly report in 5 days
#
{ :schedulable => regenerator, :scope => :month })
“5d”,
scheduler.schedule_in(
# will regenerate the report in four days
#
scheduler.schedule_in(“4d”, regenerator)
regenerator = Regenerator.new
end
end
# …
def yearly
end
# …
def monthly
end
self.send(frequency)
def trigger (frequency)
class Regenerator < Schedulable
an example that uses a Schedulable class :
scheduler.join # join the scheduler (prevents exiting)
end
init_self_destruction_sequence()
job_id = scheduler.schedule_at “Sun Oct 07 14:24:01 +0900 2009” do
end
activate_security_system()
log.info “activating security system…”
scheduler.schedule “0 22 * * 1-5” do
# in 3 days from now
# will call the regenerate_monthly_report method
#
end
regenerate_monthly_report()
scheduler.schedule_in(“3d”) do
scheduler = Rufus::Scheduler.start_new
require ‘rufus/scheduler’
require ‘rubygems’
== Examples
will still run OK with “rufus-scheduler”.
s = OpenWFE::Scheduler.new
require ‘openwfe/util/scheduler’
require ‘rubygems’
names.
“rufus-scheduler”, this new gem has ‘pointers’ for the old class
To ensure that code tapping the previous gem still runs fine with
This scheduler was previously known as the “openwferu-scheduler” gem.
== The gem “rufus-scheduler”
Ruby way.
params (usually an array or nil), either a block, which is more in the
schedule_at() and schedule() await either a Schedulable instance and
The two main methods are thus schedule_at() and schedule().
execute a specified intervals.
‘at’ jobs to execute once at a given point in time. ‘cron’ jobs
The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs.

def self.is_cron_string (s)


Returns true if the given string seems to be a cron string.
def self.is_cron_string (s)
  s.match ".+ .+ .+ .+ .+" # well...
end

def self.start_new (params = {})


Instantiates a new Rufus::Scheduler instance, starts it and returns it
def self.start_new (params = {})
  s = self.new(params)
  s.start
  s
end

def all_jobs


Returns all the jobs in the scheduler.
def all_jobs
  find_jobs()
end

def at_job_count


Returns the current count of 'at' jobs scheduled (not 'every').
def at_job_count
  @non_cron_jobs.values.select { |j| j.class == AtJob }.size
end

def at_to_f (at)


(to be compared with the float coming from time.to_f)
Ensures an 'at' instance is translated to a float
def at_to_f (at)
  at = Rufus::to_ruby_time(at) if at.kind_of?(String)
  at = Rufus::to_gm_time(at) if at.kind_of?(DateTime)
  at = at.to_f if at.kind_of?(Time)
  raise "cannot schedule at : #{at.inspect}" unless at.is_a?(Float)
  at
end

def cron_job_count


Returns the number of cron jobs currently active in this scheduler.
def cron_job_count
  @cron_jobs.size
end

def do_schedule_at (at, params={}, &block)


schedule_every). It's protected, don't use it directly.
The core method behind schedule_at and schedule_in (and also
def do_schedule_at (at, params={}, &block)
  job = params.delete(:job)
  unless job
    jobClass = params[:every] ? EveryJob : AtJob
    b = to_block(params, &block)
    job = jobClass.new(self, at_to_f(at), params[:job_id], params, &b)
  end
  if jobClass == AtJob && job.at < (Time.new.to_f + @precision)
    job.trigger() unless params[:discard_past]
    @non_cron_jobs.delete job.job_id # just to be sure
    return nil
  end
  @non_cron_jobs[job.job_id] = job
  @schedule_queue << job
  job.job_id
end

def do_unschedule (job_id)


the unschedule work itself.
def do_unschedule (job_id)
  job = get_job job_id
  return (@cron_jobs.delete(job_id) != nil) if job.is_a?(CronJob)
  return false unless job # not found
  if job.is_a?(AtJob) # catches AtJob and EveryJob instances
    @non_cron_jobs.delete(job_id)
    job.params[:dont_reschedule] = true # for AtJob as well, no worries
  end
  for i in 0...@pending_jobs.length
    if @pending_jobs[i].job_id == job_id
      @pending_jobs.delete_at i
      return true # asap
    end
  end
  true
end

def duration_to_f (s)


will yield 10.0

duration_to_f("10s")

Ensures that a duration is a expressed as a Float instance.
def duration_to_f (s)
  Rufus.duration_to_f(s)
end

def every_job_count


Returns the current count of 'every' jobs scheduled.
def every_job_count
  @non_cron_jobs.values.select { |j| j.class == EveryJob }.size
end

def find_jobs (tag=nil)


Returns an array of jobs that have the given tag.
def find_jobs (tag=nil)
  jobs = @cron_jobs.values + @non_cron_jobs.values
  jobs = jobs.select { |job| job.has_tag?(tag) } if tag
  jobs
end

def find_schedulables (tag)


result.
Jobs that haven't a wrapped Schedulable won't be included in the
the wrapped Schedulable objects.
Finds the jobs with the given tag and then returns an array of
def find_schedulables (tag)
  find_jobs(tag).find_all { |job| job.respond_to?(:schedulable) }
end

def get_job (job_id)


or CronJob will be returned.
Returns the job corresponding to job_id, an instance of AtJob
def get_job (job_id)
  @cron_jobs[job_id] || @non_cron_jobs[job_id]
end

def get_schedulable (job_id)


schedulable if any.
Finds a job (via get_job()) and then returns the wrapped
def get_schedulable (job_id)
  j = get_job(job_id)
  j.respond_to?(:schedulable) ? j.schedulable : nil
end

def initialize (params={})

def initialize (params={})
  super()
  @pending_jobs = []
  @cron_jobs = {}
  @non_cron_jobs = {}
  @schedule_queue = Queue.new
  @unschedule_queue = Queue.new
    #
    # sync between the step() method and the [un]schedule
    # methods is done via these queues, no more mutex
  @scheduler_thread = nil
  @precision = 0.250
    # every 250ms, the scheduler wakes up (default value)
  begin
    self.precision = Float(params[:scheduler_precision])
  rescue Exception => e
    # let precision at its default value
  end
  @thread_name = params[:thread_name] || "rufus scheduler"
  #@correction = 0.00045
  @exit_when_no_more_jobs = false
  #@dont_reschedule_every = false
  @last_cron_second = -1
  @stopped = true
end

def join


Joins on the scheduler thread
def join
  @scheduler_thread.join
end

def join_until_no_more_jobs


Currently used only in unit tests.

there aren't no more 'at' (or 'every') jobs in the scheduler.
Thus the scheduler will exit (and the join terminates) as soon as
attribute of this scheduler to true before joining.
Like join() but takes care of setting the 'exit_when_no_more_jobs'
def join_until_no_more_jobs
  @exit_when_no_more_jobs = true
  join
end

def log_exception (e)


Of course, one can override this method.

be used insted.
If this scheduler provides a lwarn(message) method, it will
message will be displayed to STDOUT.
If an error occurs in the job, it well get caught and an error
def log_exception (e)
  message =
    "trigger() caught exception\n" +
    e.to_s + "\n" +
    e.backtrace.join("\n")
  if self.respond_to?(:lwarn)
    lwarn { message }
  else
    puts message
  end
end

def pending_job_count


('at' jobs and 'every' jobs).
Returns the number of currently pending jobs in this scheduler
def pending_job_count
  @pending_jobs.size
end

def precision= (f)


Setting the precision ( 0.0 < p <= 1.0 )
def precision= (f)
  raise 'precision must be 0.0 < p <= 1.0' \
    if f <= 0.0 or f > 1.0
  @precision = f
end

def prepare_params (params)


Making sure that params is a Hash.
def prepare_params (params)
  params.is_a?(Schedulable) ? { :schedulable => params } : params
end

def push_pending_job (job)


Pushes an 'at' job into the pending job list
def push_pending_job (job)
  old = @pending_jobs.find { |j| j.job_id == job.job_id }
  @pending_jobs.delete(old) if old
    #
    # override previous job with same id
  if @pending_jobs.length < 1 or job.at >= @pending_jobs.last.at
    @pending_jobs << job
    return
  end
  for i in 0...@pending_jobs.length
    if job.at <= @pending_jobs[i].at
      @pending_jobs[i, 0] = job
      return # right place found
    end
  end
end

def schedule (cron_line, params={}, &block)


the job.
This method returns a job identifier which can be used to unschedule()

be used to unschedule the job.
Returns the job id attributed to this 'cron job', this id can

# outputs a message every weekday at 10pm
end
puts "it's break time..."
scheduler.schedule("0 22 * * 1-5") do

# will trigger s at 14:15 on the first of every month
scheduler.schedule("15 14 1 * *", s)

# five minutes after midnight
# will trigger the schedulable s every day
scheduler.schedule("5 0 * * *", s)

For example :

line, or http://www.google.com/search?q=man%205%20crontab).
following the Unix cron standard (see "man 5 crontab" in your command
Schedules a cron job, the 'cron_line' is a string
def schedule (cron_line, params={}, &block)
  params = prepare_params(params)
  #
  # is a job with the same id already scheduled ?
  cron_id = params[:cron_id] || params[:job_id]
  #@unschedule_queue << cron_id
  #
  # schedule
  b = to_block(params, &block)
  job = CronJob.new(self, cron_id, cron_line, params, &b)
  @schedule_queue << job
  job.job_id
end

def schedule_at (at, params={}, &block)



And 'jobid' will hold a nil (not scheduled).

end
puts "you'll never read this message"
jobid = scheduler.schedule_at(yesterday, :discard_past => true) do

true :
To avoid the triggering, the parameter :discard_past may be set to
but not scheduled.
If the job is specified in the past, it will be triggered immediately

the job.
This method returns a job identifier which can be used to unschedule()

Returns the a job_id that can be used to unschedule the job.
Schedules a job by specifying at which time it should trigger.
def schedule_at (at, params={}, &block)
  do_schedule_at(
    at,
    prepare_params(params),
    &block)
end

def schedule_every (freq, params={}, &block)


have had been triggered after two days).
(without setting a :first_in (or :first_at), our example schedule would

end
# schedule something every two days, start in 5 hours...
scheduler.schedule_every "2d", :first_in => "5h" do

accepted.
Since rufus-scheduler 1.0.2, the params :first_at and :first_in are

end
# won't get rescheduled in case of exception
do_some_prone_to_error_stuff()
scheduler.schedule_every "500", :try_again => false do

want the job to be rescheduled, set the parameter :try_again to false.
In case of exception in the job, it will be rescheduled. If you don't

the job.
This method returns a job identifier which can be used to unschedule()

before the time specified in 'freq'.
Schedules a job in a loop. After an execution, it will not execute
def schedule_every (freq, params={}, &block)
  params = prepare_params(params)
  params[:every] = freq
  first_at = params[:first_at]
  first_in = params[:first_in]
  #params[:delayed] = true if first_at or first_in
  first_at = if first_at
    at_to_f(first_at)
  elsif first_in
    Time.now.to_f + Rufus.duration_to_f(first_in)
  else
    Time.now.to_f + Rufus.duration_to_f(freq) # not triggering immediately
  end
  do_schedule_at(first_at, params, &block)
end

def schedule_in (duration, params={}, &block)


the job.
This method returns a job identifier which can be used to unschedule()

Returns the a job_id that can be used to unschedule the job.
Schedules a job by stating in how much time it should trigger.
def schedule_in (duration, params={}, &block)
  do_schedule_at(
    Time.new.to_f + Rufus.duration_to_f(duration),
    prepare_params(params),
    &block)
end

def start


Starts this scheduler (or restart it if it was previously stopped)
def start
  @stopped = false
  @scheduler_thread = Thread.new do
    Thread.current[:name] = @thread_name
    if defined?(JRUBY_VERSION)
      require 'java'
      java.lang.Thread.current_thread.name = @thread_name
    end
    loop do
      break if @stopped
      t0 = Time.now.to_f
      step
      d = Time.now.to_f - t0 # + @correction
      next if d > @precision
      sleep(@precision - d)
    end
  end
end

def step


'cron' jobs get executed if necessary then 'at' jobs.
determine if there are jobs to trigger else to get back to sleep.
(by default 4 times per second). It's meant to quickly
This is the method called each time the scheduler wakes up
def step
  step_unschedule
    # unschedules any job in the unschedule queue before
    # they have a chance to get triggered.
  step_trigger
    # triggers eligible jobs
  step_schedule
    # schedule new jobs
  # done.
end

def step_schedule


either @pending_jobs or @cron_jobs.
adds every job waiting in the @schedule_queue to
def step_schedule
  loop do
    break if @schedule_queue.empty?
    j = @schedule_queue.pop
    if j.is_a?(CronJob)
      @cron_jobs[j.job_id] = j
    else # it's an 'at' job
      push_pending_job j
    end
  end
end

def step_trigger


cron jobs.
triggers every eligible pending (at or every) jobs, then every eligible
def step_trigger
  now = Time.now
  if @exit_when_no_more_jobs && @pending_jobs.size < 1
    @stopped = true
    return
  end
  # TODO : eventually consider running cron / pending
  # job triggering in two different threads
  #
  # but well... there's the synchronization issue...
  #
  # cron jobs
  if now.sec != @last_cron_second
    @last_cron_second = now.sec
    @cron_jobs.each do |cron_id, cron_job|
      #trigger(cron_job) if cron_job.matches?(now, @precision)
      cron_job.trigger if cron_job.matches?(now)
    end
  end
  #
  # pending jobs
  now = now.to_f
    #
    # that's what at jobs do understand
  loop do
    break if @pending_jobs.length < 1
    job = @pending_jobs[0]
    break if job.at > now
    #if job.at <= now
      #
      # obviously
    job.trigger
    @pending_jobs.delete_at 0
  end
end

def step_unschedule


unschedules jobs in the unschedule_queue
def step_unschedule
  loop do
    break if @unschedule_queue.empty?
    do_unschedule(@unschedule_queue.pop)
  end
end

def stop


The scheduler is stoppable via sstop()
def stop
  @stopped = true
end

def to_block (params, &block)


wrapping a call to it.
if a :schedulable is set in the params, will return a block
Returns a block. If a block is passed, will return it, else,
def to_block (params, &block)
  return block if block
  schedulable = params.delete(:schedulable)
  return nil unless schedulable
  l = lambda do
    schedulable.trigger(params)
  end
  class << l
    attr_accessor :schedulable
  end
  l.schedulable = schedulable
  l
end

def unschedule (job_id)


it was given at schedule time.
Unschedules an 'at' or a 'cron' job identified by the id
def unschedule (job_id)
  @unschedule_queue << job_id
end

def unschedule_cron_job (job_id)


(deprecated : use unschedule(job_id) for all the jobs !)

Unschedules a cron job
def unschedule_cron_job (job_id)
  unschedule(job_id)
end