module SidekiqUniqueJobs::Orphans::Manager

def current_timestamp

def current_timestamp
  Time.now.to_i
end

def disabled?

def disabled?
  reaper == :none
end

def drift_reaper_interval

def drift_reaper_interval
  reaper_interval + (reaper_interval * DRIFT_FACTOR).to_i
end

def logging_context

Returns:
  • (String) - when logger does not responds to `:with_context`
  • (Hash) - when logger responds to `:with_context`
def logging_context
  if logger_context_hash?
    { "uniquejobs" => "reaper" }
  else
    "uniquejobs=orphan-reaper"
  end
end

def reaper

Other tags:
    See: SidekiqUniqueJobs::Config#reaper -
def reaper
  SidekiqUniqueJobs.config.reaper
end

def reaper_interval

Other tags:
    See: SidekiqUniqueJobs::Config#reaper_interval -
def reaper_interval
  SidekiqUniqueJobs.config.reaper_interval
end

def reaper_timeout

Other tags:
    See: SidekiqUniqueJobs::Config#reaper_timeout -
def reaper_timeout
  SidekiqUniqueJobs.config.reaper_timeout
end

def refresh_reaper_mutex

Returns:
  • (void) -
def refresh_reaper_mutex
  redis { |conn| conn.set(UNIQUE_REAPER, current_timestamp, ex: drift_reaper_interval) }
end

def register_reaper_process

Returns:
  • (void) -
def register_reaper_process
  redis { |conn| conn.set(UNIQUE_REAPER, current_timestamp, nx: true, ex: drift_reaper_interval) }
end

def registered?

Returns:
  • (true, false) -
def registered?
  redis do |conn|
    conn.get(UNIQUE_REAPER).to_i + drift_reaper_interval > current_timestamp
  end
end

def start # rubocop:disable

Returns:
  • (Concurrent::TimerTask) - the task that was started
def start # rubocop:disable
  return if registered?
  return if disabled?
  with_logging_context do
    register_reaper_process
    log_info("Starting Reaper")
    task.add_observer(Observer.new)
    task.execute
    task
  end
end

def stop

Returns:
  • (Boolean) -
def stop
  with_logging_context do
    log_info("Stopping Reaper")
    unregister_reaper_process
    task.shutdown
  end
end

def task

Returns:
  • () -
def task
  @task ||= Concurrent::TimerTask.new(timer_task_options) do
    with_logging_context do
      redis do |conn|
        refresh_reaper_mutex
        Orphans::Reaper.call(conn)
      end
    end
  end
end

def timer_task_options

Returns:
  • (Hash) -
def timer_task_options
  { run_now: true,
    execution_interval: reaper_interval,
    timeout_interval: reaper_timeout }
end

def unregister_reaper_process

Returns:
  • (void) -
def unregister_reaper_process
  redis { |conn| conn.del(UNIQUE_REAPER) }
end