lib/litejob/processor.rb
# frozen_string_literal: true require "json" require "litequeue" module Litejob # Litejob::Processor is responsible for processing job payloads class Processor def initialize(queue, id, serialized_job) @queue = queue @id = id @serialized_job = serialized_job @job_hash = JSON.parse(@serialized_job) @litequeue = Litequeue.instance set_log_context!(queue: @queue, class: @job_hash["class"], job: @id) end def repush(id, job, delay = 0, queue = nil) @litequeue.repush(id, JSON.dump(job), queue: queue, delay: delay) end def process! log(:deq) klass = Object.const_get(@job_hash["class"]) instance = klass.new begin instance.perform(*@job_hash["params"]) log(:end) rescue => e if @job_hash["retries_left"] == 0 err(e, "retries exhausted, moving to _dead queue") repush(@id, @job_hash, 0, "_dead") else @job_hash["retries_left"] ||= @job_hash["attempts"] @job_hash["retries_left"] -= 1 retry_delay = (@job_hash["attempts"] - @job_hash["retries_left"]) * 0.1 err(e, "retrying in #{retry_delay} seconds") repush(@id, @job_hash, retry_delay, @job_hash["queue"]) end end rescue => e # this is an error in the extraction of job info, retrying here will not be useful err(e, "while processing job=#{@serialized_job}") raise e end private def set_log_context!(**attributes) @log_context = attributes.map { |k, v| [k, v].join("=") }.join(" ") end def log(event, msg: nil) prefix = "[litejob]:[#{event.to_s.upcase}]" Litejob.logger.info [prefix, @log_context, msg].compact.join(" ") end def err(exception, msg = nil) prefix = "[litejob]:[ERR]" error_context = if exception.message == exception.class.name "failed with #<#{exception.class.name}>" else "failed with #{exception.inspect}" end Litejob.logger.error [prefix, @log_context, error_context, msg].compact.join(" ") end end end