#--
# Copyright (c) 2006-2014, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Made in Japan.
#++
require 'date' if RUBY_VERSION < '1.9.0'
require 'time'
require 'thread'
require 'tzinfo'
require 'fileutils'
module Rufus
class Scheduler
require 'rufus/scheduler/util'
require 'rufus/scheduler/jobs'
require 'rufus/scheduler/cronline'
require 'rufus/scheduler/job_array'
VERSION = '3.0.8'
#
# A common error class for rufus-scheduler
#
class Error < StandardError; end
#
# This error is thrown when the :timeout attribute triggers
#
class TimeoutError < Error; end
#
# For when the scheduler is not running
# (it got shut down or didn't start because of a lock)
#
class NotRunningError < Error; end
#MIN_WORK_THREADS = 3
MAX_WORK_THREADS = 28
attr_accessor :frequency
attr_reader :started_at
attr_reader :thread
attr_reader :thread_key
attr_reader :mutexes
#attr_accessor :min_work_threads
attr_accessor :max_work_threads
attr_accessor :stderr
attr_reader :work_queue
def initialize(opts={})
@opts = opts
@started_at = nil
@paused = false
@jobs = JobArray.new
@frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300)
@mutexes = {}
@work_queue = Queue.new
#@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS
@max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS
@stderr = $stderr
@thread_key = "rufus_scheduler_#{self.object_id}"
lock || return
start
end
# Returns a singleton Rufus::Scheduler instance
#
def self.singleton(opts={})
@singleton ||= Rufus::Scheduler.new(opts)
end
# Alias for Rufus::Scheduler.singleton
#
def self.s(opts={}); singleton(opts); end
# Releasing the gem would probably require redirecting .start_new to
# .new and emit a simple deprecation message.
#
# For now, let's assume the people pointing at rufus-scheduler/master
# on GitHub know what they do...
#
def self.start_new
fail "this is rufus-scheduler 3.0, use .new instead of .start_new"
end
def shutdown(opt=nil)
@started_at = nil
#jobs.each { |j| j.unschedule }
# provokes https://github.com/jmettraux/rufus-scheduler/issue/98
@jobs.array.each { |j| j.unschedule }
@work_queue.clear
if opt == :wait
join_all_work_threads
elsif opt == :kill
kill_all_work_threads
end
unlock
end
alias stop shutdown
def uptime
@started_at ? Time.now - @started_at : nil
end
def uptime_s
self.class.to_duration(uptime)
end
def join
fail NotRunningError.new(
'cannot join scheduler that is not running'
) unless @thread
@thread.join
end
def down?
! @started_at
end
def up?
!! @started_at
end
def paused?
@paused
end
def pause
@paused = true
end
def resume
@paused = false
end
#--
# scheduling methods
#++
def at(time, callable=nil, opts={}, &block)
do_schedule(:once, time, callable, opts, opts[:job], block)
end
def schedule_at(time, callable=nil, opts={}, &block)
do_schedule(:once, time, callable, opts, true, block)
end
def in(duration, callable=nil, opts={}, &block)
do_schedule(:once, duration, callable, opts, opts[:job], block)
end
def schedule_in(duration, callable=nil, opts={}, &block)
do_schedule(:once, duration, callable, opts, true, block)
end
def every(duration, callable=nil, opts={}, &block)
do_schedule(:every, duration, callable, opts, opts[:job], block)
end
def schedule_every(duration, callable=nil, opts={}, &block)
do_schedule(:every, duration, callable, opts, true, block)
end
def interval(duration, callable=nil, opts={}, &block)
do_schedule(:interval, duration, callable, opts, opts[:job], block)
end
def schedule_interval(duration, callable=nil, opts={}, &block)
do_schedule(:interval, duration, callable, opts, true, block)
end
def cron(cronline, callable=nil, opts={}, &block)
do_schedule(:cron, cronline, callable, opts, opts[:job], block)
end
def schedule_cron(cronline, callable=nil, opts={}, &block)
do_schedule(:cron, cronline, callable, opts, true, block)
end
def schedule(arg, callable=nil, opts={}, &block)
opts[:_t] = Scheduler.parse(arg, opts)
case opts[:_t]
when CronLine then schedule_cron(arg, callable, opts, &block)
when Time then schedule_at(arg, callable, opts, &block)
else schedule_in(arg, callable, opts, &block)
end
end
def repeat(arg, callable=nil, opts={}, &block)
opts[:_t] = Scheduler.parse(arg, opts)
case opts[:_t]
when CronLine then schedule_cron(arg, callable, opts, &block)
else schedule_every(arg, callable, opts, &block)
end
end
def unschedule(job_or_job_id)
job, job_id = fetch(job_or_job_id)
fail ArgumentError.new("no job found with id '#{job_id}'") unless job
job.unschedule if job
end
#--
# jobs methods
#++
# Returns all the scheduled jobs
# (even those right before re-schedule).
#
def jobs(opts={})
opts = { opts => true } if opts.is_a?(Symbol)
jobs = @jobs.to_a
if opts[:running]
jobs = jobs.select { |j| j.running? }
elsif ! opts[:all]
jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at }
end
tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s }
jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } }
jobs
end
def at_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) }
end
def in_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) }
end
def every_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) }
end
def interval_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) }
end
def cron_jobs(opts={})
jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) }
end
def job(job_id)
@jobs[job_id]
end
# Returns true if this job is currently scheduled.
#
# Takes extra care to answer true if the job is a repeat job
# currently firing.
#
def scheduled?(job_or_job_id)
job, job_id = fetch(job_or_job_id)
!! (job && job.next_time != nil)
end
# Lists all the threads associated with this scheduler.
#
def threads
Thread.list.select { |t| t[thread_key] }
end
# Lists all the work threads (the ones actually running the scheduled
# block code)
#
# Accepts a query option, which can be set to:
# * :all (default), returns all the threads that are work threads
# or are currently running a job
# * :active, returns all threads that are currenly running a job
# * :vacant, returns the threads that are not running a job
#
# If, thanks to :blocking => true, a job is scheduled to monopolize the
# main scheduler thread, that thread will get returned when :active or
# :all.
#
def work_threads(query=:all)
ts =
threads.select { |t|
t[:rufus_scheduler_job] || t[:rufus_scheduler_work_thread]
}
case query
when :active then ts.select { |t| t[:rufus_scheduler_job] }
when :vacant then ts.reject { |t| t[:rufus_scheduler_job] }
else ts
end
end
def running_jobs(opts={})
jobs(opts.merge(:running => true))
end
def occurrences(time0, time1, format=:per_job)
h = {}
jobs.each do |j|
os = j.occurrences(time0, time1)
h[j] = os if os.any?
end
if format == :timeline
a = []
h.each { |j, ts| ts.each { |t| a << [ t, j ] } }
a.sort_by { |(t, j)| t }
else
h
end
end
def timeline(time0, time1)
occurrences(time0, time1, :timeline)
end
def on_error(job, err)
pre = err.object_id.to_s
ms = {}; mutexes.each { |k, v| ms[k] = v.locked? }
stderr.puts("{ #{pre} rufus-scheduler intercepted an error:")
stderr.puts(" #{pre} job:")
stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}")
# TODO: eventually use a Job#detail or something like that
stderr.puts(" #{pre} error:")
stderr.puts(" #{pre} #{err.object_id}")
stderr.puts(" #{pre} #{err.class}")
stderr.puts(" #{pre} #{err}")
err.backtrace.each do |l|
stderr.puts(" #{pre} #{l}")
end
stderr.puts(" #{pre} tz:")
stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}")
stderr.puts(" #{pre} Time.now: #{Time.now}")
stderr.puts(" #{pre} scheduler:")
stderr.puts(" #{pre} object_id: #{object_id}")
stderr.puts(" #{pre} opts:")
stderr.puts(" #{pre} #{@opts.inspect}")
stderr.puts(" #{pre} frequency: #{self.frequency}")
stderr.puts(" #{pre} lockfile: #{@lockfile.inspect}")
stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})")
stderr.puts(" #{pre} down?: #{down?}")
stderr.puts(" #{pre} threads: #{self.threads.size}")
stderr.puts(" #{pre} thread: #{self.thread}")
stderr.puts(" #{pre} thread_key: #{self.thread_key}")
stderr.puts(" #{pre} work_threads: #{work_threads.size}")
stderr.puts(" #{pre} active: #{work_threads(:active).size}")
stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}")
stderr.puts(" #{pre} max_work_threads: #{max_work_threads}")
stderr.puts(" #{pre} mutexes: #{ms.inspect}")
stderr.puts(" #{pre} jobs: #{jobs.size}")
stderr.puts(" #{pre} at_jobs: #{at_jobs.size}")
stderr.puts(" #{pre} in_jobs: #{in_jobs.size}")
stderr.puts(" #{pre} every_jobs: #{every_jobs.size}")
stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}")
stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}")
stderr.puts(" #{pre} running_jobs: #{running_jobs.size}")
stderr.puts(" #{pre} work_queue: #{work_queue.size}")
stderr.puts("} #{pre} .")
rescue => e
stderr.puts("failure in #on_error itself:")
stderr.puts(e.inspect)
stderr.puts(e.backtrace)
ensure
stderr.flush
end
protected
# Returns [ job, job_id ]
#
def fetch(job_or_job_id)
if job_or_job_id.respond_to?(:job_id)
[ job_or_job_id, job_or_job_id.job_id ]
else
[ job(job_or_job_id), job_or_job_id ]
end
end
# Returns true if the scheduler has acquired the [exclusive] lock and
# thus may run.
#
# Most of the time, a scheduler is run alone and this method should
# return true. It is useful in cases where among a group of applications
# only one of them should run the scheduler. For schedulers that should
# not run, the method should return false.
#
# Out of the box, rufus-scheduler proposes the
# :lockfile => 'path/to/lock/file' scheduler start option. It makes
# it easy for schedulers on the same machine to determine which should
# run (to first to write the lockfile and lock it). It uses "man 2 flock"
# so it probably won't work reliably on distributed file systems.
#
# If one needs to use a special/different locking mechanism, providing
# overriding implementation for this #lock and the #unlock complement is
# easy.
#
def lock
@lockfile = nil
return true unless f = @opts[:lockfile]
raise ArgumentError.new(
":lockfile argument must be a string, not a #{f.class}"
) unless f.is_a?(String)
FileUtils.mkdir_p(File.dirname(f))
f = File.new(f, File::RDWR | File::CREAT)
locked = f.flock(File::LOCK_NB | File::LOCK_EX)
return false unless locked
now = Time.now
f.print("pid: #{$$}, ")
f.print("scheduler.object_id: #{self.object_id}, ")
f.print("time: #{now}, ")
f.print("timestamp: #{now.to_f}")
f.flush
@lockfile = f
true
end
# Sister method to #lock, is called when the scheduler shuts down.
#
def unlock
@lockfile.flock(File::LOCK_UN) if @lockfile
end
def terminate_all_jobs
jobs.each { |j| j.unschedule }
sleep 0.01 while running_jobs.size > 0
end
def join_all_work_threads
work_threads.size.times { @work_queue << :sayonara }
work_threads.each { |t| t.join }
@work_queue.clear
end
def kill_all_work_threads
work_threads.each { |t| t.kill }
end
#def free_all_work_threads
#
# work_threads.each { |t| t.raise(KillSignal) }
#end
def start
@started_at = Time.now
@thread =
Thread.new do
while @started_at do
unschedule_jobs
trigger_jobs unless @paused
timeout_jobs
sleep(@frequency)
end
end
@thread[@thread_key] = true
@thread[:rufus_scheduler] = self
@thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler"
end
def unschedule_jobs
@jobs.delete_unscheduled
end
def trigger_jobs
now = Time.now
@jobs.each(now) do |job|
job.trigger(now)
end
end
def timeout_jobs
work_threads(:active).each do |t|
job = t[:rufus_scheduler_job]
to = t[:rufus_scheduler_timeout]
next unless job && to
# thread might just have become inactive (job -> nil)
ts = t[:rufus_scheduler_time]
to = to.is_a?(Time) ? to : ts + to
next if to > Time.now
t.raise(Rufus::Scheduler::TimeoutError)
end
end
def do_schedule(job_type, t, callable, opts, return_job_instance, block)
fail NotRunningError.new(
'cannot schedule, scheduler is down or shutting down'
) if @started_at == nil
callable, opts = nil, callable if callable.is_a?(Hash)
return_job_instance ||= opts[:job]
job_class =
case job_type
when :once
opts[:_t] ||= Rufus::Scheduler.parse(t, opts)
opts[:_t].is_a?(Time) ? AtJob : InJob
when :every
EveryJob
when :interval
IntervalJob
when :cron
CronJob
end
job = job_class.new(self, t, opts, block || callable)
raise ArgumentError.new(
"job frequency (#{job.frequency}) is higher than " +
"scheduler frequency (#{@frequency})"
) if job.respond_to?(:frequency) && job.frequency < @frequency
@jobs.push(job)
return_job_instance ? job : job.job_id
end
end
end