require 'date' if RUBY_VERSION < '1.9.0'
require 'thread'
require 'fugit'
module Rufus; end
class Rufus::Scheduler
VERSION = '3.9.2'
EoTime = ::EtOrbi::EoTime
require 'rufus/scheduler/util'
require 'rufus/scheduler/jobs_core'
require 'rufus/scheduler/jobs_one_time'
require 'rufus/scheduler/jobs_repeat'
require 'rufus/scheduler/job_array'
require 'rufus/scheduler/locks'
#
# A common error class for rufus-scheduler
#
class Error < StandardError; end
#
# This error is thrown when the :timeout attribute triggers
#
class TimeoutError < Error; end
#
# For when the scheduler is not running
# (it got shut down or didn't start because of a lock)
#
class NotRunningError < Error; end
#MIN_WORK_THREADS = 3
MAX_WORK_THREADS = 28
attr_accessor :frequency
attr_accessor :discard_past
attr_reader :started_at
attr_reader :paused_at
attr_reader :thread
attr_reader :thread_key
attr_reader :mutexes
#attr_accessor :min_work_threads
attr_accessor :max_work_threads
attr_accessor :stderr
attr_reader :work_queue
def initialize(opts={})
@opts = opts
@started_at = nil
@paused_at = nil
@jobs = JobArray.new
@frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300)
@discard_past = opts.has_key?(:discard_past) ? opts[:discard_past] : true
@mutexes = {}
@work_queue = Queue.new
@join_queue = Queue.new
#@min_work_threads =
# opts[:min_work_threads] || opts[:min_worker_threads] ||
# MIN_WORK_THREADS
@max_work_threads =
opts[:max_work_threads] || opts[:max_worker_threads] ||
MAX_WORK_THREADS
@stderr = $stderr
@thread_key = "rufus_scheduler_#{self.object_id}"
@scheduler_lock =
if lockfile = opts[:lockfile]
Rufus::Scheduler::FileLock.new(lockfile)
else
opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new
end
@trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new
# If we can't grab the @scheduler_lock, don't run.
lock || return
start
end
# Returns a singleton Rufus::Scheduler instance
#
def self.singleton(opts={})
@singleton ||= Rufus::Scheduler.new(opts)
end
# Alias for Rufus::Scheduler.singleton
#
def self.s(opts={}); singleton(opts); end
# Releasing the gem would probably require redirecting .start_new to
# .new and emit a simple deprecation message.
#
# For now, let's assume the people pointing at rufus-scheduler/master
# on GitHub know what they do...
#
def self.start_new
fail 'this is rufus-scheduler 3.x, use .new instead of .start_new'
end
def uptime
@started_at ? EoTime.now - @started_at : nil
end
def around_trigger(job)
yield
end
def uptime_s
uptime ? self.class.to_duration(uptime) : ''
end
def join(time_limit=nil)
fail NotRunningError.new('cannot join scheduler that is not running') \
unless @thread
fail ThreadError.new('scheduler thread cannot join itself') \
if @thread == Thread.current
if time_limit
time_limit_join(time_limit)
else
no_time_limit_join
end
end
def down?
! @started_at
end
def up?
!! @started_at
end
def paused?
!! @paused_at
end
def pause
@paused_at = EoTime.now
end
def resume(opts={})
dp = opts[:discard_past]
jobs.each { |job| job.resume_discard_past = dp }
@paused_at = nil
end
#--
# scheduling methods
#++
def at(time, callable=nil, opts={}, &block)
do_schedule(:once, time, callable, opts, opts[:job], block)
end
def schedule_at(time, callable=nil, opts={}, &block)
do_schedule(:once, time, callable, opts, true, block)
end
def in(duration, callable=nil, opts={}, &block)
do_schedule(:once, duration, callable, opts, opts[:job], block)
end
def schedule_in(duration, callable=nil, opts={}, &block)
do_schedule(:once, duration, callable, opts, true, block)
end
def every(duration, callable=nil, opts={}, &block)
do_schedule(:every, duration, callable, opts, opts[:job], block)
end
def schedule_every(duration, callable=nil, opts={}, &block)
do_schedule(:every, duration, callable, opts, true, block)
end
def interval(duration, callable=nil, opts={}, &block)
do_schedule(:interval, duration, callable, opts, opts[:job], block)
end
def schedule_interval(duration, callable=nil, opts={}, &block)
do_schedule(:interval, duration, callable, opts, true, block)
end
def cron(cronline, callable=nil, opts={}, &block)
do_schedule(:cron, cronline, callable, opts, opts[:job], block)
end
def schedule_cron(cronline, callable=nil, opts={}, &block)
do_schedule(:cron, cronline, callable, opts, true, block)
end
def schedule(arg, callable=nil, opts={}, &block)
callable, opts = nil, callable if callable.is_a?(Hash)
opts = opts.dup
opts[:_t] = Rufus::Scheduler.parse(arg, opts)
case opts[:_t]
when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block)
when ::EtOrbi::EoTime, Time then schedule_at(arg, callable, opts, &block)
else schedule_in(arg, callable, opts, &block)
end
end
def repeat(arg, callable=nil, opts={}, &block)
callable, opts = nil, callable if callable.is_a?(Hash)
opts = opts.dup
opts[:_t] = Rufus::Scheduler.parse(arg, opts)
case opts[:_t]
when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block)
else schedule_every(arg, callable, opts, &block)
end
end
def unschedule(job_or_job_id)
job, job_id = fetch(job_or_job_id)
fail ArgumentError.new("no job found with id '#{job_id}'") unless job
job.unschedule if job
end
#--
# jobs methods
#++
# Returns all the scheduled jobs
# (even those right before re-schedule).
#
def jobs(opts={})
opts = { opts => true } if opts.is_a?(Symbol)
jobs = @jobs.to_a
if opts[:running]
jobs = jobs.select { |j| j.running? }
elsif ! opts[:all]
jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at }
end
tags = Array(opts[:tag] || opts[:tags]).collect(&:to_s)
jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } }
jobs
end
def at_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) }
end
def in_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) }
end
def every_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) }
end
def interval_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) }
end
def cron_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) }
end
def job(job_id)
@jobs[job_id]
end
# Returns true if the scheduler has acquired the [exclusive] lock and
# thus may run.
#
# Most of the time, a scheduler is run alone and this method should
# return true. It is useful in cases where among a group of applications
# only one of them should run the scheduler. For schedulers that should
# not run, the method should return false.
#
# Out of the box, rufus-scheduler proposes the
# :lockfile => 'path/to/lock/file' scheduler start option. It makes
# it easy for schedulers on the same machine to determine which should
# run (the first to write the lockfile and lock it). It uses "man 2 flock"
# so it probably won't work reliably on distributed file systems.
#
# If one needs to use a special/different locking mechanism, the scheduler
# accepts :scheduler_lock => lock_object. lock_object only needs to respond
# to #lock
# and #unlock, and both of these methods should be idempotent.
#
# Look at rufus/scheduler/locks.rb for an example.
#
def lock
@scheduler_lock.lock
end
# Sister method to #lock, is called when the scheduler shuts down.
#
def unlock
@trigger_lock.unlock
@scheduler_lock.unlock
end
# Callback called when a job is triggered. If the lock cannot be acquired,
# the job won't run (though it'll still be scheduled to run again if
# necessary).
#
def confirm_lock
@trigger_lock.lock
end
# Returns true if this job is currently scheduled.
#
# Takes extra care to answer true if the job is a repeat job
# currently firing.
#
def scheduled?(job_or_job_id)
job, _ = fetch(job_or_job_id)
!! (job && job.unscheduled_at.nil? && job.next_time != nil)
end
# Lists all the threads associated with this scheduler.
#
def threads
Thread.list.select { |t| t[thread_key] }
end
# Lists all the work threads (the ones actually running the scheduled
# block code)
#
# Accepts a query option, which can be set to:
# * :all (default), returns all the threads that are work threads
# or are currently running a job
# * :active, returns all threads that are currently running a job
# * :vacant, returns the threads that are not running a job
#
# If, thanks to :blocking => true, a job is scheduled to monopolize the
# main scheduler thread, that thread will get returned when :active or
# :all.
#
def work_threads(query=:all)
ts = threads.select { |t| t[:rufus_scheduler_work_thread] }
case query
when :active then ts.select { |t| t[:rufus_scheduler_job] }
when :vacant then ts.reject { |t| t[:rufus_scheduler_job] }
else ts
end
end
def running_jobs(opts={})
jobs(opts.merge(:running => true))
end
def occurrences(time0, time1, format=:per_job)
h = {}
jobs.each do |j|
os = j.occurrences(time0, time1)
h[j] = os if os.any?
end
if format == :timeline
a = []
h.each { |j, ts| ts.each { |t| a << [ t, j ] } }
a.sort_by { |(t, _)| t }
else
h
end
end
def timeline(time0, time1)
occurrences(time0, time1, :timeline)
end
def on_error(job, err)
pre = err.object_id.to_s
ms = {}; mutexes.each { |k, v| ms[k] = v.locked? }
stderr.puts("{ #{pre} rufus-scheduler intercepted an error:")
if job
stderr.puts(" #{pre} job:")
stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}")
stderr.puts(" #{pre} #{job.source_location.inspect}")
# TODO: eventually use a Job#detail or something like that
else
stderr.puts(" #{pre} job: (error did not occur in a job)")
end
stderr.puts(" #{pre} error:")
stderr.puts(" #{pre} #{err.object_id}")
stderr.puts(" #{pre} #{err.class}")
stderr.puts(" #{pre} #{err}")
err.backtrace.each do |l|
stderr.puts(" #{pre} #{l}")
end
stderr.puts(" #{pre} tz:")
stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}")
stderr.puts(" #{pre} Time.now: #{Time.now}")
stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}")
stderr.puts(" #{pre} et-orbi:")
stderr.puts(" #{pre} #{EoTime.platform_info}")
stderr.puts(" #{pre} scheduler:")
stderr.puts(" #{pre} object_id: #{object_id}")
stderr.puts(" #{pre} opts:")
stderr.puts(" #{pre} #{@opts.inspect}")
stderr.puts(" #{pre} frequency: #{self.frequency}")
stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}")
stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}")
stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})")
stderr.puts(" #{pre} down?: #{down?}")
stderr.puts(" #{pre} frequency: #{frequency.inspect}")
stderr.puts(" #{pre} discard_past: #{discard_past.inspect}")
stderr.puts(" #{pre} started_at: #{started_at.inspect}")
stderr.puts(" #{pre} paused_at: #{paused_at.inspect}")
stderr.puts(" #{pre} threads: #{self.threads.size}")
stderr.puts(" #{pre} thread: #{self.thread}")
stderr.puts(" #{pre} thread_key: #{self.thread_key}")
stderr.puts(" #{pre} work_threads: #{work_threads.size}")
stderr.puts(" #{pre} active: #{work_threads(:active).size}")
stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}")
stderr.puts(" #{pre} max_work_threads: #{max_work_threads}")
stderr.puts(" #{pre} mutexes: #{ms.inspect}")
stderr.puts(" #{pre} jobs: #{jobs.size}")
stderr.puts(" #{pre} at_jobs: #{at_jobs.size}")
stderr.puts(" #{pre} in_jobs: #{in_jobs.size}")
stderr.puts(" #{pre} every_jobs: #{every_jobs.size}")
stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}")
stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}")
stderr.puts(" #{pre} running_jobs: #{running_jobs.size}")
stderr.puts(" #{pre} work_queue:")
stderr.puts(" #{pre} size: #{@work_queue.size}")
stderr.puts(" #{pre} num_waiting: #{@work_queue.num_waiting}")
stderr.puts(" #{pre} join_queue:")
stderr.puts(" #{pre} size: #{@join_queue.size}")
stderr.puts(" #{pre} num_waiting: #{@join_queue.num_waiting}")
stderr.puts("} #{pre} .")
rescue => e
stderr.puts("failure in #on_error itself:")
stderr.puts(e.inspect)
stderr.puts(e.backtrace)
ensure
stderr.flush
end
def shutdown(opt=nil)
opts =
case opt
when Symbol then { opt => true }
when Hash then opt
else {}
end
@jobs.unschedule_all
if opts[:wait] || opts[:join]
join_shutdown(opts)
elsif opts[:kill]
kill_shutdown(opts)
else
regular_shutdown(opts)
end
@work_queue.clear
unlock
@thread.join unless @thread == Thread.current
end
alias stop shutdown
protected
def join_shutdown(opts)
limit = opts[:wait] || opts[:join]
limit = limit.is_a?(Numeric) ? limit : nil
#@started_at = nil
#
# when @started_at is nil, the scheduler thread exits, here
# we want it to exit when all the work threads have been joined
# hence it's set to nil later on
#
@paused_at = EoTime.now
(work_threads.size * 2 + 1).times { @work_queue << :shutdown }
work_threads
.collect { |wt|
wt == Thread.current ? nil : Thread.new { wt.join(limit); wt.kill } }
.each { |st|
st.join if st }
@started_at = nil
end
def kill_shutdown(opts)
@started_at = nil
work_threads.each(&:kill)
end
def regular_shutdown(opts)
@started_at = nil
end
def time_limit_join(limit)
fail ArgumentError.new("limit #{limit.inspect} should be > 0") \
unless limit.is_a?(Numeric) && limit > 0
t0 = monow
f = [ limit.to_f / 20, 0.100 ].min
while monow - t0 < limit
r =
begin
@join_queue.pop(true)
rescue ThreadError
# #<ThreadError: queue empty>
false
end
return r if r
sleep(f)
end
nil
end
def no_time_limit_join
@join_queue.pop
end
# Returns [ job, job_id ]
#
def fetch(job_or_job_id)
if job_or_job_id.respond_to?(:job_id)
[ job_or_job_id, job_or_job_id.job_id ]
else
[ job(job_or_job_id), job_or_job_id ]
end
end
def terminate_all_jobs
jobs.each { |j| j.unschedule }
sleep 0.01 while running_jobs.size > 0
end
#def free_all_work_threads
#
# work_threads.each { |t| t.raise(KillSignal) }
#end
def start
@started_at = EoTime.now
@thread =
Thread.new do
while @started_at do
begin
unschedule_jobs
trigger_jobs unless @paused_at
timeout_jobs
sleep(@frequency)
rescue => err
#
# for `blocking: true` jobs mostly
#
on_error(nil, err)
end
end
rejoin
end
@thread[@thread_key] = true
@thread[:rufus_scheduler] = self
@thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler"
end
def unschedule_jobs
@jobs.delete_unscheduled
end
def trigger_jobs
now = EoTime.now
@jobs.each(now) do |job|
job.trigger(now)
end
end
def timeout_jobs
work_threads(:active).each do |t|
job = t[:rufus_scheduler_job]
to = t[:rufus_scheduler_timeout]
ts = t[:rufus_scheduler_time]
next unless job && to && ts
# thread might just have become inactive (job -> nil)
to = ts + to unless to.is_a?(EoTime)
next if to > EoTime.now
t.raise(Rufus::Scheduler::TimeoutError)
end
end
def rejoin
(@join_queue.num_waiting * 2 + 1).times { @join_queue << @thread }
end
def do_schedule(job_type, t, callable, opts, return_job_instance, block)
fail NotRunningError.new(
'cannot schedule, scheduler is down or shutting down'
) if @started_at.nil?
callable, opts = nil, callable if callable.is_a?(Hash)
opts = opts.dup unless opts.has_key?(:_t)
return_job_instance ||= opts[:job]
job_class =
case job_type
when :once
opts[:_t] ||= Rufus::Scheduler.parse(t, opts)
opts[:_t].is_a?(Numeric) ? InJob : AtJob
when :every
EveryJob
when :interval
IntervalJob
when :cron
CronJob
end
job = job_class.new(self, t, opts, block || callable)
if job.past? && (d = job.send(:discard_past?))
fail ArgumentError.new(
"scheduling in the past and discard_past set to :fail"
) if d == :fail
return
end
job.check_frequency
@jobs.push(job)
return_job_instance ? job : job.job_id
end
def monow; self.class.monow; end
def ltstamp; self.class.ltstamp; end
end