class ActiveJob::QueueAdapters::ResqueExt::ResqueJobs

def all

def all
  @all ||= fetch_resque_jobs.collect.with_index { |resque_job, index| deserialize_resque_job(resque_job, index) if resque_job.is_a?(Hash) }.compact
end

def clear_failed_queue

def clear_failed_queue
  Resque::Failure.clear("failed")
end

def count

def count
  if paginated?
    count_fetched_jobs # no direct way of counting jobs
  else
    direct_jobs_count
  end
end

def count_fetched_jobs

def count_fetched_jobs
  all.size
end

def deserialize_resque_job(resque_job_hash, index)

def deserialize_resque_job(resque_job_hash, index)
  args_hash = resque_job_hash.dig("payload", "args") || resque_job_hash.dig("args")
  ActiveJob::JobProxy.new(args_hash&.first).tap do |job|
    job.last_execution_error = execution_error_from_resque_job(resque_job_hash)
    job.raw_data = resque_job_hash
    job.position = jobs_relation.offset_value + index
    job.failed_at = resque_job_hash["failed_at"]&.to_datetime
    job.status = job.failed_at.present? ? :failed : :pending
  end
end

def direct_jobs_count

def direct_jobs_count
  jobs_relation.failed? ? failed_jobs_count : pending_jobs_count
end

def discard(job)

def discard(job)
  redis.multi do |multi|
    multi.lset(queue_redis_key, job.position, SENTINEL)
    multi.lrem(queue_redis_key, 1, SENTINEL)
  end
rescue Redis::CommandError => error
  handle_resque_job_error(job, error)
end

def discard_all

def discard_all
  if jobs_relation.failed? && targeting_all_jobs?
    clear_failed_queue
  else
    discard_all_one_by_one
  end
end

def discard_all_in_batches

def discard_all_in_batches
  jobs_relation.in_batches(order: :desc, &:discard_all)
end

def discard_all_one_by_one

def discard_all_one_by_one
  if use_batches?
    discard_all_in_batches
  else
    discard_jobs(jobs_relation.to_a.reverse)
  end
end

def discard_jobs(jobs)

def discard_jobs(jobs)
  in_transactional_jobs_batches(jobs) do |jobs_batch|
    jobs_batch.each { |job| discard(job) }
  end
end

def execution_error_from_resque_job(resque_job_hash)

def execution_error_from_resque_job(resque_job_hash)
  if resque_job_hash["exception"].present?
    ActiveJob::ExecutionError.new \
      error_class: resque_job_hash["exception"],
      message: resque_job_hash["error"],
      backtrace: resque_job_hash["backtrace"]
  end
end

def failed_jobs_count

def failed_jobs_count
  Resque.data_store.num_failed
end

def fetch_failed_resque_jobs

def fetch_failed_resque_jobs
  Array.wrap(Resque::Failure.all(jobs_relation.offset_value, jobs_relation.limit_value))
end

def fetch_queue_resque_jobs

def fetch_queue_resque_jobs
  unless jobs_relation.queue_name.present?
    raise ActiveJob::Errors::QueryError, "This adapter requires a queue name unless fetching failed jobs"
  end
  Array.wrap(Resque.peek(jobs_relation.queue_name, jobs_relation.offset_value, jobs_relation.limit_value))
end

def fetch_resque_jobs

def fetch_resque_jobs
  if jobs_relation.failed? || jobs_relation.queue_name.blank?
    fetch_failed_resque_jobs
  else
    fetch_queue_resque_jobs
  end
end

def find_job(job_id)

def find_job(job_id)
  jobs_by_id[job_id]
end

def handle_resque_job_error(job, error)

def handle_resque_job_error(job, error)
  if error.message =~/no such key/i
    raise ActiveJob::Errors::JobNotFoundError.new(job, jobs_relation)
  else
    raise error
  end
end

def in_transactional_jobs_batches(jobs)

def in_transactional_jobs_batches(jobs)
  jobs.each_slice(MAX_REDIS_TRANSACTION_SIZE) do |jobs_batch|
    redis.multi do |multi|
      yield jobs_batch
    end
  end
end

def initialize(jobs_relation, redis:)

def initialize(jobs_relation, redis:)
  @jobs_relation = jobs_relation
  @redis = redis
end

def jobs_by_id

def jobs_by_id
  @jobs_by_id ||= all.index_by(&:job_id)
end

def pending_jobs_count

def pending_jobs_count
  Resque.queue_sizes.inject(0) do |sum, (queue_name, queue_size)|
    if jobs_relation.queue_name.blank? || jobs_relation.queue_name == queue_name
      sum + queue_size
    else
      sum
    end
  end
end

def queue_redis_key

def queue_redis_key
  jobs_relation.failed? ? "failed" : "queue:#{jobs_relation.queue_name}"
end

def requeue(job)

def requeue(job)
  resque_job = job.raw_data
  resque_job["retried_at"] = Time.now.strftime("%Y/%m/%d %H:%M:%S")
  redis.lset(queue_redis_key, job.position, Resque.encode(resque_job))
  Resque::Job.create(resque_job["queue"], resque_job["payload"]["class"], *resque_job["payload"]["args"])
rescue Redis::CommandError => error
  handle_resque_job_error(job, error)
end

def resque_requeue_and_discard(job)

def resque_requeue_and_discard(job)
  requeue(job)
  discard(job)
end

def retry_all

def retry_all
  if use_batches?
    retry_all_in_batches
  else
    retry_jobs(jobs_relation.to_a.reverse)
  end
end

def retry_all_in_batches

def retry_all_in_batches
  jobs_relation.in_batches(order: :desc, &:retry_all)
end

def retry_job(job)

def retry_job(job)
  # Not named just +retry+ because it collides with reserved Ruby keyword.
  resque_requeue_and_discard(job)
end

def retry_jobs(jobs)

def retry_jobs(jobs)
  in_transactional_jobs_batches(jobs) do |jobs_batch|
    jobs_batch.each { |job| retry_job(job) }
  end
end

def targeting_all_jobs?

def targeting_all_jobs?
  !paginated? && !jobs_relation.filtering_needed?
end

def use_batches?

def use_batches?
  !jobs_relation.limit_value_provided? && jobs_relation.count > default_page_size
end