class Rufus::Scheduler::Job
def [](key)
def [](key) @local_mutex.synchronize { @locals[key] } end
def []=(key, value)
def []=(key, value) @local_mutex.synchronize { @locals[key] = value } end
def callback(meth, time)
def callback(meth, time) return true unless @scheduler.respond_to?(meth) arity = @scheduler.method(meth).arity args = [ self, time ][0, (arity < 0 ? 2 : arity)] @scheduler.send(meth, *args) end
def compute_timeout
def compute_timeout if to = @opts[:timeout] Rufus::Scheduler.parse(to) else nil end end
def do_trigger(time)
def do_trigger(time) t = Time.now # if there are mutexes, t might be really bigger than time Thread.current[:rufus_scheduler_job] = self Thread.current[:rufus_scheduler_time] = t Thread.current[:rufus_scheduler_timeout] = compute_timeout @last_time = t args = [ self, time ][0, @callable.arity] @callable.call(*args) rescue KillSignal # discard rescue StandardError => se @scheduler.on_error(self, se) ensure post_trigger(time) Thread.current[:rufus_scheduler_job] = nil Thread.current[:rufus_scheduler_time] = nil Thread.current[:rufus_scheduler_timeout] = nil end
def do_trigger_in_thread(time)
def do_trigger_in_thread(time) threads = @scheduler.work_threads cur = threads.size vac = threads.select { |t| t[:rufus_scheduler_job] == nil }.size #min = @scheduler.min_work_threads max = @scheduler.max_work_threads que = @scheduler.work_queue.size start_work_thread if vac - que < 1 && cur < max @scheduler.work_queue << [ self, time ] end
def initialize(scheduler, original, opts, block)
def initialize(scheduler, original, opts, block) @scheduler = scheduler @original = original @opts = opts @handler = block @callable = if block.respond_to?(:arity) block elsif block.respond_to?(:call) block.method(:call) elsif block.is_a?(Class) @handler = block.new @handler.method(:call) rescue nil else nil end @scheduled_at = Time.now @unscheduled_at = nil @last_time = nil @locals = {} @local_mutex = Mutex.new @id = determine_id raise( ArgumentError, 'missing block or callable to schedule', caller[2..-1] ) unless @callable @tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s } # tidy up options if @opts[:allow_overlap] == false || @opts[:allow_overlapping] == false @opts[:overlap] = false end if m = @opts[:mutex] @opts[:mutex] = Array(m) end end
def key?(key)
def key?(key) @local_mutex.synchronize { @locals.key?(key) } end
def keys
def keys @local_mutex.synchronize { @locals.keys } end
def kill
Kills all the threads this Job currently has going on.
def kill threads.each { |t| t.raise(KillSignal) } end
def mutex(m)
def mutex(m) m.is_a?(Mutex) ? m : (@scheduler.mutexes[m.to_s] ||= Mutex.new) end
def post_trigger(time)
def post_trigger(time) set_next_time(true, time) callback(:on_post_trigger, time) end
def running?
def running? threads.any? end
def scheduled?
def scheduled? @scheduler.scheduled?(self) end
def start_work_thread
def start_work_thread thread = Thread.new do Thread.current[@scheduler.thread_key] = true Thread.current[:rufus_scheduler_job_thread] = true loop do job, time = @scheduler.work_queue.pop break if @scheduler.started_at == nil next if job.unscheduled_at begin (job.opts[:mutex] || []).reduce( lambda { job.do_trigger(time) } ) do |b, m| lambda { mutex(m).synchronize { b.call } } end.call rescue KillSignal # simply go on looping end end end thread[@scheduler.thread_key] = true thread[:rufus_scheduler_work_thread] = true # # same as above (in the thead block), # but since it has to be done as quickly as possible. # So, whoever is running first (scheduler thread vs job thread) # sets this information end
def threads
def threads Thread.list.select { |t| t[:rufus_scheduler_job] == self } end
def trigger(time)
def trigger(time) set_next_time(false, time) return if opts[:overlap] == false && running? r = callback(:confirm_lock, time) && callback(:on_pre_trigger, time) return if r == false if opts[:blocking] do_trigger(time) else do_trigger_in_thread(time) end end
def unschedule
def unschedule @unscheduled_at = Time.now end