class Litejob::Processor
Experimental RBS support (using type sampling data from the type_fusion
project).
# sig/litejob/processor.rbs class Litejob::Processor def initialize: (String queue, String id, String serialized_job) -> void def log: (Symbol event, msg: nil) -> true? def process!: () -> true? def set_log_context!: (**Hash attributes) -> String end
Litejob::Processor is responsible for processing job payloads
def err(exception, msg = nil)
def err(exception, msg = nil) prefix = "[litejob]:[ERR]" error_context = if exception.message == exception.class.name "failed with #<#{exception.class.name}>" else "failed with #{exception.inspect}" end Litejob.logger.error [prefix, @log_context, error_context, msg].compact.join(" ") end
def initialize(queue, id, serialized_job)
Experimental RBS support (using type sampling data from the type_fusion
project).
def initialize: (String queue, String id, String serialized_job) -> void
This signature was generated using 42 samples from 2 applications.
def initialize(queue, id, serialized_job) @queue = queue @id = id @serialized_job = serialized_job @job_hash = JSON.parse(@serialized_job) @litequeue = Litequeue.instance set_log_context!(queue: @queue, class: @job_hash["class"], job: @id) end
def log(event, msg: nil)
Experimental RBS support (using type sampling data from the type_fusion
project).
def log: (Symbol event, msg: nil) -> true?
This signature was generated using 70 samples from 4 applications.
def log(event, msg: nil) prefix = "[litejob]:[#{event.to_s.upcase}]" Litejob.logger.info [prefix, @log_context, msg].compact.join(" ") end
def process!
Experimental RBS support (using type sampling data from the type_fusion
project).
def process!: () -> true?
This signature was generated using 26 samples from 1 application.
def process! log(:deq) klass = Object.const_get(@job_hash["class"]) instance = klass.new begin instance.perform(*@job_hash["params"]) log(:end) rescue => e if @job_hash["retries_left"] == 0 err(e, "retries exhausted, moving to _dead queue") repush(@id, @job_hash, 0, "_dead") else @job_hash["retries_left"] ||= @job_hash["attempts"] @job_hash["retries_left"] -= 1 retry_delay = (@job_hash["attempts"] - @job_hash["retries_left"]) * 0.1 err(e, "retrying in #{retry_delay} seconds") repush(@id, @job_hash, retry_delay, @job_hash["queue"]) end end rescue => e # this is an error in the extraction of job info, retrying here will not be useful err(e, "while processing job=#{@serialized_job}") raise e end
def repush(id, job, delay = 0, queue = nil)
def repush(id, job, delay = 0, queue = nil) @litequeue.repush(id, JSON.dump(job), queue: queue, delay: delay) end
def set_log_context!(**attributes)
Experimental RBS support (using type sampling data from the type_fusion
project).
def set_log_context!: (**queue | String | class | String | job | String attributes) -> String
This signature was generated using 29 samples from 2 applications.
def set_log_context!(**attributes) @log_context = attributes.map { |k, v| [k, v].join("=") }.join(" ") end