class Litejob::Processor

Experimental RBS support (using type sampling data from the type_fusion project).

# sig/litejob/processor.rbs

class Litejob::Processor
  def initialize: (String queue, String id, String serialized_job) -> void
  def log: (Symbol event, msg: nil) -> true?
  def process!: () -> true?
  def set_log_context!: (**Hash attributes) -> String
end

Litejob::Processor is responsible for processing job payloads

def err(exception, msg = nil)

def err(exception, msg = nil)
  prefix = "[litejob]:[ERR]"
  error_context = if exception.message == exception.class.name
    "failed with #<#{exception.class.name}>"
  else
    "failed with #{exception.inspect}"
  end
  Litejob.logger.error [prefix, @log_context, error_context, msg].compact.join(" ")
end

def initialize(queue, id, serialized_job)

Experimental RBS support (using type sampling data from the type_fusion project).

def initialize: (String queue, String id, String serialized_job) -> void

This signature was generated using 42 samples from 2 applications.

def initialize(queue, id, serialized_job)
  @queue = queue
  @id = id
  @serialized_job = serialized_job
  @job_hash = JSON.parse(@serialized_job)
  @litequeue = Litequeue.instance
  set_log_context!(queue: @queue, class: @job_hash["class"], job: @id)
end

def log(event, msg: nil)

Experimental RBS support (using type sampling data from the type_fusion project).

def log: (Symbol event, msg: nil) -> true?

This signature was generated using 70 samples from 4 applications.

def log(event, msg: nil)
  prefix = "[litejob]:[#{event.to_s.upcase}]"
  Litejob.logger.info [prefix, @log_context, msg].compact.join(" ")
end

def process!

Experimental RBS support (using type sampling data from the type_fusion project).

def process!: () -> true?

This signature was generated using 26 samples from 1 application.

def process!
  log(:deq)
  klass = Object.const_get(@job_hash["class"])
  instance = klass.new
  begin
    instance.perform(*@job_hash["params"])
    log(:end)
  rescue => e
    if @job_hash["retries_left"] == 0
      err(e, "retries exhausted, moving to _dead queue")
      repush(@id, @job_hash, 0, "_dead")
    else
      @job_hash["retries_left"] ||= @job_hash["attempts"]
      @job_hash["retries_left"] -= 1
      retry_delay = (@job_hash["attempts"] - @job_hash["retries_left"]) * 0.1
      err(e, "retrying in #{retry_delay} seconds")
      repush(@id, @job_hash, retry_delay, @job_hash["queue"])
    end
  end
rescue => e
  # this is an error in the extraction of job info, retrying here will not be useful
  err(e, "while processing job=#{@serialized_job}")
  raise e
end

def repush(id, job, delay = 0, queue = nil)

def repush(id, job, delay = 0, queue = nil)
  @litequeue.repush(id, JSON.dump(job), queue: queue, delay: delay)
end

def set_log_context!(**attributes)

Experimental RBS support (using type sampling data from the type_fusion project).

def set_log_context!: (**queue | String | class | String | job | String attributes) -> String

This signature was generated using 29 samples from 2 applications.

def set_log_context!(**attributes)
  @log_context = attributes.map { |k, v| [k, v].join("=") }.join(" ")
end