class Rufus::Scheduler::SchedulerCore
directly usable stuff.
Rufus::Scheduler::PlainScheduler and Rufus::Scheduler::EmScheduler for
The core of a rufus-scheduler. See implementations like
def self.start_new(opts={})
Instantiates and starts a new Rufus::Scheduler.
def self.start_new(opts={}) s = self.new(opts) s.start s end
def add_cron_job(job)
def add_cron_job(job) complain_if_blocking_and_timeout(job) @cron_jobs << job job end
def add_job(job)
def add_job(job) complain_if_blocking_and_timeout(job) return nil if job.params[:discard_past] && Time.now.to_f >= job.at @jobs << job job end
def all_jobs
Returns a map job_id => job of all the jobs currently in the scheduler
def all_jobs jobs.merge(cron_jobs) end
def at(t, s=nil, opts={}, &block)
pizza is for Thursday at 2000 (if the shop brochure is right).
end
puts 'order pizza'
scheduler.at 'Thu Mar 26 19:30:00 2009' do
Schedules a job at a given point in time.
def at(t, s=nil, opts={}, &block) add_job(AtJob.new(self, t, combine_opts(s, opts), &block)) end
def combine_opts(schedulable, opts)
def combine_opts(schedulable, opts) if schedulable.respond_to?(:trigger) || schedulable.respond_to?(:call) opts[:schedulable] = schedulable elsif schedulable != nil opts = schedulable.merge(opts) end opts end
def complain_if_blocking_and_timeout(job)
Raises an error if the job has the params :blocking and :timeout set
def complain_if_blocking_and_timeout(job) raise( ArgumentError.new('cannot set a :timeout on a :blocking job') ) if job.params[:blocking] and job.params[:timeout] end
def cron(cronstring, s=nil, opts={}, &block)
end
puts 'activate security system'
# every day of the week at 00:22
scheduler.cron '0 22 * * 1-5' do
Schedules a job given a cron string.
def cron(cronstring, s=nil, opts={}, &block) add_cron_job(CronJob.new(self, cronstring, combine_opts(s, opts), &block)) end
def cron_jobs
Returns a map job_id => job for cron jobs
def cron_jobs @cron_jobs.to_h end
def do_handle_exception(job, exception)
details to $stderr.
method. If yes, hands the exception to it, else defaults to outputting
Determines if there is #log_exception, #handle_exception or #on_exception
def do_handle_exception(job, exception) begin [ :log_exception, :handle_exception, :on_exception ].each do |m| next unless self.respond_to?(m) if method(m).arity == 1 self.send(m, exception) else self.send(m, job, exception) end return # exception was handled successfully end rescue Exception => e $stderr.puts '*' * 80 $stderr.puts 'the exception handling method itself had an issue:' $stderr.puts e $stderr.puts *e.backtrace $stderr.puts '*' * 80 end $stderr.puts '=' * 80 $stderr.puts 'scheduler caught exception:' $stderr.puts exception $stderr.puts *exception.backtrace $stderr.puts '=' * 80 end
def every(t, s=nil, opts={}, &block)
checking blood pressure every 5 months and 1 week.
end
puts 'check blood pressure'
scheduler.every '5m1w' do
Schedules a recurring job every t.
def every(t, s=nil, opts={}, &block) add_job(EveryJob.new(self, t, combine_opts(s, opts), &block)) end
def find(job_or_id)
raise an ArgumentError.
If the argument is an id, and no job with that id is found, it will
simply return it.
Mostly used to find a job given its id. If the argument is a job, will
def find(job_or_id) return job_or_id if job_or_id.respond_to?(:job_id) job = all_jobs[job_or_id] raise ArgumentError.new( "couldn't find job #{job_or_id.inspect}" ) unless job job end
def find_by_tag(tag)
Returns a list of jobs with the given tag
def find_by_tag(tag) all_jobs.values.select { |j| j.tags.include?(tag) } end
def get_queue(type, opts)
(made it into a method for easy override)
Returns a job queue instance.
def get_queue(type, opts) q = if type == :cron opts[:cron_job_queue] || Rufus::Scheduler::CronJobQueue.new else opts[:job_queue] || Rufus::Scheduler::JobQueue.new end q.scheduler = self if q.respond_to?(:scheduler=) q end
def in(t, s=nil, opts={}, &block)
will order an espresso (well sort of) in 20 minutes.
end
puts "order ristretto"
scheduler.in '20m' do
Schedules a job in a given amount of time.
def in(t, s=nil, opts={}, &block) add_job(InJob.new(self, t, combine_opts(s, opts), &block)) end
def initialize(opts={})
Instantiates a Rufus::Scheduler.
def initialize(opts={}) @options = opts @jobs = get_queue(:at, opts) @cron_jobs = get_queue(:cron, opts) @frequency = @options[:frequency] || 0.330 @mutexes = {} end
def jobs
Returns a map job_id => job for at/in/every jobs
def jobs @jobs.to_h end
def pause(job_or_id)
corresponding job cannot be found, an ArgumentError will get raised.
Pauses a given job. If the argument is an id (String) and the
def pause(job_or_id) find(job_or_id).pause end
def resume(job_or_id)
corresponding job cannot be found, an ArgumentError will get raised.
Resumes a given job. If the argument is an id (String) and the
def resume(job_or_id) find(job_or_id).resume end
def running_jobs
triggered and are executing).
Returns the list of the currently running jobs (jobs that just got
def running_jobs Thread.list.collect { |t| t["rufus_scheduler__trigger_thread__#{self.object_id}"] }.compact end
def step
triggered.
The method that does the "wake up and trigger any job that should get
def step @cron_jobs.trigger_matching_jobs @jobs.trigger_matching_jobs end
def trigger_job(params, &block)
EmScheduler blocking triggers for the next tick. Not the same thing ...
TODO : clarify, the blocking here blocks the whole scheduler, while
Else, it will call the block in a dedicated thread.
call the block and return when the block is done.
The default, plain, implementation. If 'blocking' is true, will simply
def trigger_job(params, &block) if params[:blocking] block.call elsif m = params[:mutex] m = (@mutexes[m.to_s] ||= Mutex.new) unless m.is_a?(Mutex) Thread.new { m.synchronize { block.call } } else Thread.new { block.call } end end
def trigger_threads
the execution of jobs.
Returns the current list of trigger threads (threads) dedicated to
def trigger_threads Thread.list.select { |t| t["rufus_scheduler__trigger_thread__#{self.object_id}"] } end
def unschedule(job_or_id)
Returns the job that got unscheduled.
Unschedules a job (cron or at/every/in job).
def unschedule(job_or_id) job_id = job_or_id.respond_to?(:job_id) ? job_or_id.job_id : job_or_id @jobs.unschedule(job_id) || @cron_jobs.unschedule(job_id) end
def unschedule_by_tag(tag)
Given a tag, unschedules all the jobs that bear that tag.
def unschedule_by_tag(tag) jobs = find_by_tag(tag) jobs.each { |job| unschedule(job.job_id) } jobs end