class ActiveJob::QueueAdapters::ResqueExt::ResqueJobs
def all
def all @all ||= fetch_resque_jobs.collect.with_index { |resque_job, index| deserialize_resque_job(resque_job, index) if resque_job.is_a?(Hash) }.compact end
def clear_failed_queue
def clear_failed_queue Resque::Failure.clear("failed") end
def count
def count if paginated? count_fetched_jobs # no direct way of counting jobs else direct_jobs_count end end
def count_fetched_jobs
def count_fetched_jobs all.size end
def deserialize_resque_job(resque_job_hash, index)
def deserialize_resque_job(resque_job_hash, index) args_hash = resque_job_hash.dig("payload", "args") || resque_job_hash.dig("args") ActiveJob::JobProxy.new(args_hash&.first).tap do |job| job.last_execution_error = execution_error_from_resque_job(resque_job_hash) job.raw_data = resque_job_hash job.position = jobs_relation.offset_value + index job.failed_at = resque_job_hash["failed_at"]&.to_datetime job.status = job.failed_at.present? ? :failed : :pending end end
def direct_jobs_count
def direct_jobs_count jobs_relation.failed? ? failed_jobs_count : pending_jobs_count end
def discard(job)
def discard(job) redis.multi do |multi| multi.lset(queue_redis_key, job.position, SENTINEL) multi.lrem(queue_redis_key, 1, SENTINEL) end rescue Redis::CommandError => error handle_resque_job_error(job, error) end
def discard_all
def discard_all if jobs_relation.failed? && targeting_all_jobs? clear_failed_queue else discard_all_one_by_one end end
def discard_all_in_batches
def discard_all_in_batches jobs_relation.in_batches(order: :desc, &:discard_all) end
def discard_all_one_by_one
def discard_all_one_by_one if use_batches? discard_all_in_batches else discard_jobs(jobs_relation.to_a.reverse) end end
def discard_jobs(jobs)
def discard_jobs(jobs) in_transactional_jobs_batches(jobs) do |jobs_batch| jobs_batch.each { |job| discard(job) } end end
def execution_error_from_resque_job(resque_job_hash)
def execution_error_from_resque_job(resque_job_hash) if resque_job_hash["exception"].present? ActiveJob::ExecutionError.new \ error_class: resque_job_hash["exception"], message: resque_job_hash["error"], backtrace: resque_job_hash["backtrace"] end end
def failed_jobs_count
def failed_jobs_count Resque.data_store.num_failed end
def fetch_failed_resque_jobs
def fetch_failed_resque_jobs Array.wrap(Resque::Failure.all(jobs_relation.offset_value, jobs_relation.limit_value)) end
def fetch_queue_resque_jobs
def fetch_queue_resque_jobs unless jobs_relation.queue_name.present? raise ActiveJob::Errors::QueryError, "This adapter requires a queue name unless fetching failed jobs" end Array.wrap(Resque.peek(jobs_relation.queue_name, jobs_relation.offset_value, jobs_relation.limit_value)) end
def fetch_resque_jobs
def fetch_resque_jobs if jobs_relation.failed? || jobs_relation.queue_name.blank? fetch_failed_resque_jobs else fetch_queue_resque_jobs end end
def find_job(job_id)
def find_job(job_id) jobs_by_id[job_id] end
def handle_resque_job_error(job, error)
def handle_resque_job_error(job, error) if error.message =~/no such key/i raise ActiveJob::Errors::JobNotFoundError.new(job, jobs_relation) else raise error end end
def in_transactional_jobs_batches(jobs)
def in_transactional_jobs_batches(jobs) jobs.each_slice(MAX_REDIS_TRANSACTION_SIZE) do |jobs_batch| redis.multi do |multi| yield jobs_batch end end end
def initialize(jobs_relation, redis:)
def initialize(jobs_relation, redis:) @jobs_relation = jobs_relation @redis = redis end
def jobs_by_id
def jobs_by_id @jobs_by_id ||= all.index_by(&:job_id) end
def pending_jobs_count
def pending_jobs_count Resque.queue_sizes.inject(0) do |sum, (queue_name, queue_size)| if jobs_relation.queue_name.blank? || jobs_relation.queue_name == queue_name sum + queue_size else sum end end end
def queue_redis_key
def queue_redis_key jobs_relation.failed? ? "failed" : "queue:#{jobs_relation.queue_name}" end
def requeue(job)
def requeue(job) resque_job = job.raw_data resque_job["retried_at"] = Time.now.strftime("%Y/%m/%d %H:%M:%S") redis.lset(queue_redis_key, job.position, Resque.encode(resque_job)) Resque::Job.create(resque_job["queue"], resque_job["payload"]["class"], *resque_job["payload"]["args"]) rescue Redis::CommandError => error handle_resque_job_error(job, error) end
def resque_requeue_and_discard(job)
def resque_requeue_and_discard(job) requeue(job) discard(job) end
def retry_all
def retry_all if use_batches? retry_all_in_batches else retry_jobs(jobs_relation.to_a.reverse) end end
def retry_all_in_batches
def retry_all_in_batches jobs_relation.in_batches(order: :desc, &:retry_all) end
def retry_job(job)
def retry_job(job) # Not named just +retry+ because it collides with reserved Ruby keyword. resque_requeue_and_discard(job) end
def retry_jobs(jobs)
def retry_jobs(jobs) in_transactional_jobs_batches(jobs) do |jobs_batch| jobs_batch.each { |job| retry_job(job) } end end
def targeting_all_jobs?
def targeting_all_jobs? !paginated? && !jobs_relation.filtering_needed? end
def use_batches?
def use_batches? !jobs_relation.limit_value_provided? && jobs_relation.count > default_page_size end