module SidekiqUniqueJobs::Orphans::Manager

def current_timestamp

Returns:
  • (Integer) -
def current_timestamp
  Time.now.to_i
end

def default_task

Returns:
  • (SidekiqUniqueJobs::TimerTask) -
def default_task
  SidekiqUniqueJobs::TimerTask.new(timer_task_options) do
    with_logging_context do
      redis do |conn|
        refresh_reaper_mutex
        Orphans::Reaper.call(conn)
      end
    end
  end
end

def disabled?

Returns:
  • (true, false) -

Other tags:
    See: enabled? -
def disabled?
  !enabled?
end

def drift_reaper_interval

Returns:
  • (Integer) -
def drift_reaper_interval
  reaper_interval + (reaper_interval * DRIFT_FACTOR).to_i
end

def enabled?

Returns:
  • (true, false) -
def enabled?
  REAPERS.include?(reaper)
end

def logging_context

Returns:
  • (String) - when logger does not responds to `:with_context`
  • (Hash) - when logger responds to `:with_context`
def logging_context
  if logger_context_hash?
    { "uniquejobs" => "reaper" }
  else
    "uniquejobs=orphan-reaper"
  end
end

def reaper

Other tags:
    See: SidekiqUniqueJobs::Config#reaper -
def reaper
  SidekiqUniqueJobs.config.reaper
end

def reaper_interval

Other tags:
    See: SidekiqUniqueJobs::Config#reaper_interval -
def reaper_interval
  SidekiqUniqueJobs.config.reaper_interval
end

def refresh_reaper_mutex

Returns:
  • (void) -
def refresh_reaper_mutex
  redis { |conn| conn.set(UNIQUE_REAPER, current_timestamp, "ex", drift_reaper_interval) }
end

def register_reaper_process

Returns:
  • (void) -
def register_reaper_process
  redis { |conn| conn.set(UNIQUE_REAPER, current_timestamp, "nx", "ex", drift_reaper_interval) }
end

def registered?

Returns:
  • (true, false) -
def registered?
  redis do |conn|
    conn.get(UNIQUE_REAPER).to_i + drift_reaper_interval > current_timestamp
  end
end

def start(test_task = nil)

Returns:
  • (SidekiqUniqueJobs::TimerTask) - the task that was started
def start(test_task = nil)
  return if disabled?
  return if registered?
  self.task = test_task || default_task
  with_logging_context do
    if register_reaper_process
      log_info("Starting Reaper")
      task.add_observer(Observer.new)
      task.execute
      task
    end
  end
end

def stop

Returns:
  • (Boolean) -
def stop
  return if disabled?
  return if unregistered?
  with_logging_context do
    log_info("Stopping Reaper")
    unregister_reaper_process
    task.shutdown
  end
end

def task

Returns:
  • () -
def task
  @task ||= default_task # rubocop:disable ThreadSafety/ClassInstanceVariable
end

def task=(task)

Returns:
  • (void) -

Parameters:
  • task (SidekiqUniqueJobs::TimerTask) -- the task to use
def task=(task)
  @task = task # rubocop:disable ThreadSafety/ClassInstanceVariable
end

def timer_task_options

Returns:
  • (Hash) -
def timer_task_options
  { run_now: true, execution_interval: reaper_interval }
end

def unregister_reaper_process

Returns:
  • (void) -
def unregister_reaper_process
  redis { |conn| conn.del(UNIQUE_REAPER) }
end

def unregistered?

Returns:
  • (true, false) -

Other tags:
    See: registered? -
def unregistered?
  !registered?
end