class Rufus::Scheduler
end
# schedule something every two days, start in 5 hours…
scheduler.schedule_every “2d”, :first_at => “5h” do
end
# schedule something every two days, start in 5 hours…
scheduler.schedule_every “2d”, :first_in => “5h” do
optional parameters, :first_at and :first_in
Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two
== ‘Every jobs’, :first_at and :first_in<br><br>end<br>params = true if $mail
$mail = $inbox.fetch_last_mail
# polls every 10 seconds until a mail arrives
#
schedule.schedule_every “10s” do |job_id, at, params|
Unschedule example :
end
end
“5h” # normal schedule, every 5 hours
else
“1h” # lots of spam, check every hour<br>params = if mails.size > 100
mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
mails = $inbox.fetch_mails
schedule.schedule_every “5h” do |job_id, at, params|
Every jobs can reschedule/unschedule themselves. A reschedule example :
== ‘Every jobs’ and rescheduling
end
end
puts “something wrong happened : ”+e.to_s
def log_exception (e)
class << scheduler
# 2 - overriding the [protected] method log_exception(e) :
end
end
puts block.call
puts “oops, something wrong happened : ”
def lwarn (&block)
class << scheduler
# 1 - providing a lwarn method to the scheduler instance :
case of exception. There are two ways to change that behaviour.
The rufus scheduler will output a stacktrace to the STDOUT in
== Exceptions
value (“7”), a list of values (“7,8,9,27”) or a range (“7-12”).
scheduling. This column can, like for the other columns, specify a
The rufus scheduler recognizes an optional first column for second
end
puts “it’s now the seventh second of the minute”
scheduler.schedule “7 * * * * *” do
A cron schedule can be set at the second level :
== Cron up to the second
by itself, so no serialization issue.
prevents you from using anything else. The scheduler has no persistence
The vanilla case for tags assume they are String instances, but nothing
end
init_backup_sequence()
scheduler.schedule_in “2h”, :tags => [ “backup”, “important” ] do
Multiple tags may be attached to a single job :
jobs.each { |job| job.unschedule }
jobs = find_jobs ‘backup’
end
do_this_or_that()
scheduler.schedule “0 24 * * *”, :tags => “new_day” do
end
init_backup_sequence()
scheduler.schedule_in “2h”, :tags => “backup” do
Tags can be attached to jobs scheduled :
== Tags
# (the default is 4 times per second (0.250))
# instatiates a scheduler that checks its jobs twice per second
#
scheduler.start
scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
The :scheduler_precision can be set when instantiating the scheduler.
join_until_no_more_jobs() wraps it.
This attribute is best used indirectly : the method
to true and start the scheduler, the scheduler will immediately exit.
Use with care though, if you create a scheduler, set this attribute
run.
‘true’, the scheduler will exit as soon as there are no more jobs to
The scheduler has a “exit_when_no_more_jobs” attribute. When set to
end
regenerate_latest_report()
scheduler.schedule_every(“1h20m”) do
There is also schedule_every() :
# will regenerate the monthly report in 5 days
#
{ :schedulable => regenerator, :scope => :month })
“5d”,
scheduler.schedule_in(
# will regenerate the report in four days
#
scheduler.schedule_in(“4d”, regenerator)
regenerator = Regenerator.new
end
end
# …
def yearly
end
# …
def monthly
end
self.send(frequency)
def trigger (frequency)
class Regenerator < Schedulable
an example that uses a Schedulable class :
end
init_self_destruction_sequence()
job_id = scheduler.schedule_at “Sun Oct 07 14:24:01 +0900 2009” do
end
activate_security_system()
log.info “activating security system…”
scheduler.schedule “0 22 * * 1-5” do
# in 3 days from now
# will call the regenerate_monthly_report method
#
end
regenerate_monthly_report()
scheduler.schedule_in(“3d”) do
require ‘rufus/scheduler’
require ‘rubygems’
== Examples
will still run OK with “rufus-scheduler”.
s = OpenWFE::Scheduler.new
require ‘openwfe/util/scheduler’
require ‘rubygems’
names.
“rufus-scheduler”, this new gem has ‘pointers’ for the old class
To ensure that code tapping the previous gem still runs fine with
This scheduler was previously known as the “openwferu-scheduler” gem.
== The gem “openwferu-scheduler”
Ruby way.
params (usually an array or nil), either a block, which is more in the
schedule_at() and schedule() await either a Schedulable instance and
The two main methods are thus schedule_at() and schedule().
execute a specified intervals.
‘at’ jobs to execute once at a given point in time. ‘cron’ jobs
The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs.
def at_job_count
Returns the current count of 'at' jobs scheduled (not 'every').
def at_job_count @pending_jobs.select { |j| j.instance_of?(AtJob) }.size end
def at_to_f (at)
(to be compared with the float coming from time.to_f)
Ensures an 'at' instance is translated to a float
def at_to_f (at) at = Rufus::to_ruby_time(at) if at.kind_of?(String) at = Rufus::to_gm_time(at) if at.kind_of?(DateTime) at = at.to_f if at.kind_of?(Time) at end
def cron_job_count
Returns the number of cron jobs currently active in this scheduler.
def cron_job_count @cron_jobs.size end
def do_schedule_at (at, params={}, &block)
schedule_every). It's protected, don't use it directly.
The core method behind schedule_at and schedule_in (and also
def do_schedule_at (at, params={}, &block) #puts "0 at is '#{at.to_s}' (#{at.class})" at = at_to_f at #puts "1 at is '#{at.to_s}' (#{at.class})"}" jobClass = params[:every] ? EveryJob : AtJob job_id = params[:job_id] b = to_block params, &block job = jobClass.new self, at, job_id, params, &b #do_unschedule(job_id) if job_id if at < (Time.new.to_f + @precision) job.trigger() unless params[:discard_past] return nil end @schedule_queue << job job.job_id end
def do_unschedule (job_id)
def do_unschedule (job_id) for i in 0...@pending_jobs.length if @pending_jobs[i].job_id == job_id @pending_jobs.delete_at i return true end end # # not using delete_if because it scans the whole list do_unschedule_cron_job job_id end
def do_unschedule_cron_job (job_id)
def do_unschedule_cron_job (job_id) (@cron_jobs.delete(job_id) != nil) end
def duration_to_f (s)
will yields 10.0
duration_to_f("10s")
Ensures that a duration is a expressed as a Float instance.
def duration_to_f (s) return s if s.kind_of?(Float) return Rufus::parse_time_string(s) if s.kind_of?(String) Float(s.to_s) end
def every_job_count
Returns the current count of 'every' jobs scheduled.
def every_job_count @pending_jobs.select { |j| j.is_a?(EveryJob) }.size end
def find_jobs (tag)
Returns an array of jobs that have the given tag.
def find_jobs (tag) result = @cron_jobs.values.find_all do |job| job.has_tag?(tag) end result + @pending_jobs.find_all do |job| job.has_tag?(tag) end end
def find_schedulables (tag)
result.
Jobs that haven't a wrapped Schedulable won't be included in the
the wrapped Schedulable objects.
Finds the jobs with the given tag and then returns an array of
def find_schedulables (tag) #jobs = find_jobs(tag) #result = [] #jobs.each do |job| # result.push(job.schedulable) if job.respond_to?(:schedulable) #end #result find_jobs(tags).inject([]) do |result, job| result.push(job.schedulable) if job.respond_to?(:schedulable) result end end
def get_job (job_id)
or CronJob will be returned.
Returns the job corresponding to job_id, an instance of AtJob
def get_job (job_id) job = @cron_jobs[job_id] return job if job @pending_jobs.find do |job| job.job_id == job_id end end
def get_schedulable (job_id)
schedulable if any.
Finds a job (via get_job()) and then returns the wrapped
def get_schedulable (job_id) #return nil unless job_id j = get_job(job_id) return j.schedulable if j.respond_to?(:schedulable) nil end
def initialize (params={})
def initialize (params={}) super() @pending_jobs = [] @cron_jobs = {} @schedule_queue = Queue.new @unschedule_queue = Queue.new # # sync between the step() method and the [un]schedule # methods is done via these queues, no more mutex @scheduler_thread = nil @precision = 0.250 # every 250ms, the scheduler wakes up (default value) begin @precision = Float(params[:scheduler_precision]) rescue Exception => e # let precision at its default value end @exit_when_no_more_jobs = false @dont_reschedule_every = false @last_cron_second = -1 @stopped = true end
def join
Joins on the scheduler thread
def join @scheduler_thread.join end
def join_until_no_more_jobs
Currently used only in unit tests.
there aren't no more 'at' (or 'every') jobs in the scheduler.
Thus the scheduler will exit (and the join terminates) as soon as
attribute of this scheduler to true before joining.
Like join() but takes care of setting the 'exit_when_no_more_jobs'
def join_until_no_more_jobs @exit_when_no_more_jobs = true join end
def log_exception (e)
Of course, one can override this method.
be used insted.
If this scheduler provides a lwarn(message) method, it will
message will be displayed to STDOUT.
If an error occurs in the job, it well get caught and an error
def log_exception (e) message = "trigger() caught exception\n" + e.to_s + "\n" + e.backtrace.join("\n") if self.respond_to?(:lwarn) lwarn { message } else puts message end end
def pending_job_count
('at' jobs and 'every' jobs).
Returns the number of currently pending jobs in this scheduler
def pending_job_count @pending_jobs.size end
def prepare_params (params)
Making sure that params is a Hash.
def prepare_params (params) params = { :schedulable => params } \ if params.is_a?(Schedulable) params end
def push_pending_job (job)
Pushes an 'at' job into the pending job list
def push_pending_job (job) old = @pending_jobs.find { |j| j.job_id == job.job_id } @pending_jobs.delete(old) if old # # override previous job with same id if @pending_jobs.length < 1 or job.at >= @pending_jobs.last.at @pending_jobs << job return end for i in 0...@pending_jobs.length if job.at <= @pending_jobs[i].at @pending_jobs[i, 0] = job return # right place found end end end
def schedule (cron_line, params={}, &block)
the job.
This method returns a job identifier which can be used to unschedule()
be used to unschedule the job.
Returns the job id attributed to this 'cron job', this id can
# outputs a message every weekday at 10pm
end
puts "it's break time..."
scheduler.schedule("0 22 * * 1-5") do
# will trigger s at 14:15 on the first of every month
scheduler.schedule("15 14 1 * *", s)
# five minutes after midnight
# will trigger the schedulable s every day
scheduler.schedule("5 0 * * *", s)
For example :
line, or http://www.google.com/search?q=man%205%20crontab).
following the Unix cron standard (see "man 5 crontab" in your command
Schedules a cron job, the 'cron_line' is a string
def schedule (cron_line, params={}, &block) params = prepare_params(params) # # is a job with the same id already scheduled ? cron_id = params[:cron_id] cron_id = params[:job_id] unless cron_id #unschedule(cron_id) if cron_id @unschedule_queue << [ :cron, cron_id ] # # schedule b = to_block(params, &block) job = CronJob.new(self, cron_id, cron_line, params, &b) #@cron_jobs[job.job_id] = job @schedule_queue << job job.job_id end
def schedule_at (at, params={}, &block)
And 'jobid' will hold a nil (not scheduled).
end
puts "you'll never read this message"
jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
true :
To avoid the triggering, the parameter :discard_past may be set to
but not scheduled.
If the job is specified in the past, it will be triggered immediately
the job.
This method returns a job identifier which can be used to unschedule()
Returns the a job_id that can be used to unschedule the job.
Schedules a job by specifying at which time it should trigger.
def schedule_at (at, params={}, &block) do_schedule_at( at, prepare_params(params), &block) end
def schedule_every (freq, params={}, &block)
end
# schedule something every two days, start in 5 hours...
scheduler.schedule_every "2d", :first_in => "5h" do
accepted.
Since rufus-scheduler 1.0.2, the params :first_at and :first_in are
end
# won't get rescheduled in case of exception
do_some_prone_to_error_stuff()
scheduler.schedule_every "500", :try_again => false do
want the job to be rescheduled, set the parameter :try_again to false.
In case of exception in the job, it will be rescheduled. If you don't
the job.
This method returns a job identifier which can be used to unschedule()
before the time specified in 'freq'.
Schedules a job in a loop. After an execution, it will not execute
def schedule_every (freq, params={}, &block) f = duration_to_f freq params = prepare_params params schedulable = params[:schedulable] params[:every] = freq first_at = params.delete :first_at first_in = params.delete :first_in previous_at = params[:previous_at] next_at = if first_at first_at elsif first_in Time.now.to_f + duration_to_f(first_in) elsif previous_at previous_at + f else Time.now.to_f + f end do_schedule_at(next_at, params) do |job_id, at| # # trigger ... hit_exception = false begin if schedulable schedulable.trigger params else block.call job_id, at, params end rescue Exception => e log_exception e hit_exception = true end # cannot use a return here !!! (block) unless \ @dont_reschedule_every or (params[:dont_reschedule] == true) or (hit_exception and params[:try_again] == false) # # ok, reschedule ... params[:job_id] = job_id params[:previous_at] = at schedule_every params[:every], params, &block # # yes, this is a kind of recursion # note that params[:every] might have been changed # by the block/schedulable code end job_id end end
def schedule_in (duration, params={}, &block)
the job.
This method returns a job identifier which can be used to unschedule()
Returns the a job_id that can be used to unschedule the job.
Schedules a job by stating in how much time it should trigger.
def schedule_in (duration, params={}, &block) do_schedule_at( Time.new.to_f + duration_to_f(duration), prepare_params(params), &block) end
def start
Starts this scheduler (or restart it if it was previously stopped)
def start @stopped = false @scheduler_thread = Thread.new do if defined?(JRUBY_VERSION) require 'java' java.lang.Thread.current_thread.name = \ "openwferu scheduler (Ruby Thread)" end loop do break if @stopped step sleep @precision # TODO : adjust precision end end end
def step
'cron' jobs get executed if necessary then 'at' jobs.
determine if there are jobs to trigger else to get back to sleep.
(by default 4 times per second). It's meant to quickly
This is the method called each time the scheduler wakes up
def step #puts Time.now.to_f #puts @pending_jobs.collect { |j| [ j.job_id, j.at ] }.inspect step_unschedule # unschedules any job in the unschedule queue before # they have a chance to get triggered. step_trigger # triggers eligible jobs step_schedule # schedule new jobs # done. end
def step_schedule
either @pending_jobs or @cron_jobs.
adds every job waiting in the @schedule_queue to
def step_schedule loop do break if @schedule_queue.empty? j = @schedule_queue.pop if j.is_a?(CronJob) @cron_jobs[j.job_id] = j else # it's an 'at' job push_pending_job j end end end
def step_trigger
cron jobs.
triggers every eligible pending jobs, then every eligible
def step_trigger now = Time.new if @exit_when_no_more_jobs if @pending_jobs.size < 1 @stopped = true return end @dont_reschedule_every = true if at_job_count < 1 end # TODO : eventually consider running cron / pending # job triggering in two different threads # # but well... there's the synchronization issue... # # cron jobs if now.sec != @last_cron_second @last_cron_second = now.sec #puts "step() @cron_jobs.size #{@cron_jobs.size}" @cron_jobs.each do |cron_id, cron_job| #puts "step() cron_id : #{cron_id}" trigger(cron_job) if cron_job.matches?(now) end end # # pending jobs now = now.to_f # # that's what at jobs do understand loop do break if @pending_jobs.length < 1 job = @pending_jobs[0] break if job.at > now #if job.at <= now # # obviously trigger job @pending_jobs.delete_at 0 end end
def step_unschedule
unschedules jobs in the unschedule_queue
def step_unschedule loop do break if @unschedule_queue.empty? type, job_id = @unschedule_queue.pop if type == :cron do_unschedule_cron_job job_id else do_unschedule job_id end end end
def stop
The scheduler is stoppable via sstop()
def stop @stopped = true end
def to_block (params, &block)
wrapping a call to it.
if a :schedulable is set in the params, will return a block
Returns a block. If a block is passed, will return it, else,
def to_block (params, &block) return block if block schedulable = params[:schedulable] return nil unless schedulable params.delete :schedulable l = lambda do schedulable.trigger(params) end class << l attr_accessor :schedulable end l.schedulable = schedulable l end
def trigger (job)
Triggers the job (in a dedicated thread).
def trigger (job) Thread.new do begin job.trigger rescue Exception => e log_exception e end end end
def unschedule (job_id)
it was given at schedule time.
Unschedules an 'at' or a 'cron' job identified by the id
def unschedule (job_id) @unschedule_queue << [ :at, job_id ] end
def unschedule_cron_job (job_id)
Unschedules a cron job
def unschedule_cron_job (job_id) @unschedule_queue << [ :cron, job_id ] end