class Rufus::Scheduler::Job

def [](key)

def [](key)
  @local_mutex.synchronize { @locals[key] }
end

def []=(key, value)

def []=(key, value)
  @local_mutex.synchronize { @locals[key] = value }
end

def call(do_rescue=false)


Warning: error rescueing is the responsibity of the caller.

Calls the callable (usually a block) wrapped in this Job instance.
def call(do_rescue=false)
  do_call(Time.now, do_rescue)
end

def callback(meth, time)

def callback(meth, time)
  return true unless @scheduler.respond_to?(meth)
  arity = @scheduler.method(meth).arity
  args = [ self, time ][0, (arity < 0 ? 2 : arity)]
  @scheduler.send(meth, *args)
end

def compute_timeout

def compute_timeout
  if to = @opts[:timeout]
    Rufus::Scheduler.parse(to)
  else
    nil
  end
end

def do_call(time, do_rescue)

def do_call(time, do_rescue)
  args = [ self, time ][0, @callable.arity]
  @callable.call(*args)
rescue StandardError => se
  raise se unless do_rescue
  return if se.is_a?(KillSignal) # discard
  @scheduler.on_error(self, se)
# exceptions above StandardError do pass through
end

def do_trigger(time)

def do_trigger(time)
  t = Time.now
    # if there are mutexes, t might be really bigger than time
  Thread.current[:rufus_scheduler_job] = self
  Thread.current[:rufus_scheduler_time] = t
  Thread.current[:rufus_scheduler_timeout] = compute_timeout
  @last_time = t
  do_call(time, true)
ensure
  @last_work_time =
    Time.now - Thread.current[:rufus_scheduler_time]
  @mean_work_time =
    ((@count - 1) * @mean_work_time + @last_work_time) / @count
  post_trigger(time)
  Thread.current[:rufus_scheduler_job] = nil
  Thread.current[:rufus_scheduler_time] = nil
  Thread.current[:rufus_scheduler_timeout] = nil
end

def do_trigger_in_thread(time)

def do_trigger_in_thread(time)
  threads = @scheduler.work_threads
  cur = threads.size
  vac = threads.select { |t| t[:rufus_scheduler_job] == nil }.size
  #min = @scheduler.min_work_threads
  max = @scheduler.max_work_threads
  que = @scheduler.work_queue.size
  start_work_thread if vac - que < 1 && cur < max
  @scheduler.work_queue << [ self, time ]
end

def initialize(scheduler, original, opts, block)

def initialize(scheduler, original, opts, block)
  @scheduler = scheduler
  @original = original
  @opts = opts
  @handler = block
  @callable =
    if block.respond_to?(:arity)
      block
    elsif block.respond_to?(:call)
      block.method(:call)
    elsif block.is_a?(Class)
      @handler = block.new
      @handler.method(:call) rescue nil
    else
      nil
    end
  @scheduled_at = Time.now
  @unscheduled_at = nil
  @last_time = nil
  @locals = {}
  @local_mutex = Mutex.new
  @id = determine_id
  raise(
    ArgumentError,
    'missing block or callable to schedule',
    caller[2..-1]
  ) unless @callable
  @tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s }
  @count = 0
  @last_work_time = 0.0
  @mean_work_time = 0.0
  # tidy up options
  if @opts[:allow_overlap] == false || @opts[:allow_overlapping] == false
    @opts[:overlap] = false
  end
  if m = @opts[:mutex]
    @opts[:mutex] = Array(m)
  end
end

def key?(key)

def key?(key)
  @local_mutex.synchronize { @locals.key?(key) }
end

def keys

def keys
  @local_mutex.synchronize { @locals.keys }
end

def kill


Kills all the threads this Job currently has going on.
def kill
  threads.each { |t| t.raise(KillSignal) }
end

def mutex(m)

def mutex(m)
  m.is_a?(Mutex) ? m : (@scheduler.mutexes[m.to_s] ||= Mutex.new)
end

def post_trigger(time)

def post_trigger(time)
  set_next_time(time, true)
  callback(:on_post_trigger, time)
end

def running?

def running?
  threads.any?
end

def scheduled?

def scheduled?
  @scheduler.scheduled?(self)
end

def start_work_thread

def start_work_thread
  thread =
    Thread.new do
      Thread.current[@scheduler.thread_key] = true
      Thread.current[:rufus_scheduler_work_thread] = true
      loop do
        job, time = @scheduler.work_queue.pop
        break if @scheduler.started_at == nil
        next if job.unscheduled_at
        begin
          (job.opts[:mutex] || []).reduce(
            lambda { job.do_trigger(time) }
          ) do |b, m|
            lambda { mutex(m).synchronize { b.call } }
          end.call
        rescue KillSignal
          # simply go on looping
        end
      end
    end
  thread[@scheduler.thread_key] = true
  thread[:rufus_scheduler_work_thread] = true
    #
    # same as above (in the thead block),
    # but since it has to be done as quickly as possible.
    # So, whoever is running first (scheduler thread vs job thread)
    # sets this information
end

def threads

def threads
  Thread.list.select { |t| t[:rufus_scheduler_job] == self }
end

def trigger(time)

def trigger(time)
  set_next_time(time)
  return if (
    opts[:overlap] == false &&
    running?
  )
  return if (
    callback(:confirm_lock, time) &&
    callback(:on_pre_trigger, time)
  ) == false
  @count += 1
  if opts[:blocking]
    do_trigger(time)
  else
    do_trigger_in_thread(time)
  end
end

def unschedule

def unschedule
  @unscheduled_at = Time.now
end